Merge branch 'master' into yingyi/fullstack_fix
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/UnnestOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/UnnestOperator.java
index 7efee75..b9ac62f 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/UnnestOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/UnnestOperator.java
@@ -23,12 +23,22 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy;
 import edu.uci.ics.hyracks.algebricks.core.algebra.typing.ITypingContext;
 import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
 
 public class UnnestOperator extends AbstractUnnestOperator {
 
     private LogicalVariable positionalVariable;
+
+    /**
+     * Used to set the position offset for positional variable
+     */
+    private ILogicalExpression positionOffsetExpr;
+
+    /**
+     * Specify the type of the positional variable
+     */
     private Object positionalVariableType;
 
     public UnnestOperator(LogicalVariable variable, Mutable<ILogicalExpression> expression) {
@@ -67,6 +77,14 @@
         return positionalVariableType;
     }
 
+    public void setPositionOffsetExpr(ILogicalExpression posOffsetExpr) {
+        this.positionOffsetExpr = posOffsetExpr;
+    }
+
+    public ILogicalExpression getPositionOffsetExpr() {
+        return this.positionOffsetExpr;
+    }
+
     @Override
     public <R, T> R accept(ILogicalOperatorVisitor<R, T> visitor, T arg) throws AlgebricksException {
         return visitor.visitUnnestOperator(this, arg);
@@ -88,4 +106,24 @@
         }
         return env;
     }
+
+    @Override
+    public VariablePropagationPolicy getVariablePropagationPolicy() {
+        return new VariablePropagationPolicy() {
+
+            @Override
+            public void propagateVariables(IOperatorSchema target, IOperatorSchema... sources)
+                    throws AlgebricksException {
+                if (sources.length > 0) {
+                    target.addAllVariables(sources[0]);
+                }
+                for (LogicalVariable v : variables) {
+                    target.addVariable(v);
+                }
+                if (positionalVariable != null) {
+                    target.addVariable(positionalVariable);
+                }
+            }
+        };
+    }
 }
\ No newline at end of file
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
index cebddee..3a2617f 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
@@ -404,7 +404,8 @@
     @Override
     public Void visitRunningAggregateOperator(RunningAggregateOperator op, IOptimizationContext ctx)
             throws AlgebricksException {
-        propagateFDsAndEquivClasses(op, ctx);
+        ctx.putEquivalenceClassMap(op, new HashMap<LogicalVariable, EquivalenceClass>());
+        ctx.putFDList(op, new ArrayList<FunctionalDependency>());
         return null;
     }
 
@@ -646,4 +647,4 @@
         return null;
     }
 
-}
+}
\ No newline at end of file
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/PreclusteredGroupByPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/PreclusteredGroupByPOperator.java
index 51b009f..bcd998d 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/PreclusteredGroupByPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/PreclusteredGroupByPOperator.java
@@ -24,6 +24,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
@@ -34,6 +35,7 @@
 import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
 import edu.uci.ics.hyracks.algebricks.runtime.base.AlgebricksPipeline;
 import edu.uci.ics.hyracks.algebricks.runtime.operators.aggreg.NestedPlansAccumulatingAggregatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.aggreg.NestedPlansRunningAggregatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
@@ -76,13 +78,19 @@
         }
         // compile subplans and set the gby op. schema accordingly
         AlgebricksPipeline[] subplans = compileSubplans(inputSchemas[0], gby, opSchema, context);
-        IAggregatorDescriptorFactory aggregatorFactory = new NestedPlansAccumulatingAggregatorFactory(subplans, keys,
-                fdColumns);
+        IAggregatorDescriptorFactory aggregatorFactory;
+
+        if (((AbstractLogicalOperator) (gby.getNestedPlans().get(0).getRoots().get(0).getValue())).getOperatorTag() == LogicalOperatorTag.RUNNINGAGGREGATE) {
+            aggregatorFactory = new NestedPlansRunningAggregatorFactory(subplans, keys, fdColumns);
+        } else {
+            aggregatorFactory = new NestedPlansAccumulatingAggregatorFactory(subplans, keys, fdColumns);
+        }
 
         IOperatorDescriptorRegistry spec = builder.getJobSpec();
         IBinaryComparatorFactory[] comparatorFactories = JobGenHelper.variablesToAscBinaryComparatorFactories(
                 columnList, context.getTypeEnvironment(op), context);
-        RecordDescriptor recordDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, context);
+        RecordDescriptor recordDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema,
+                context);
 
         PreclusteredGroupOperatorDescriptor opDesc = new PreclusteredGroupOperatorDescriptor(spec, keys,
                 comparatorFactories, aggregatorFactory, recordDescriptor);
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java
index 21def02..c5326ce 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java
@@ -30,8 +30,6 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
-import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
 import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
 import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
@@ -62,22 +60,8 @@
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
             IPhysicalPropertiesVector reqdByParent) {
-        IPartitioningProperty pp = null;
-        RunningAggregateOperator ragg = (RunningAggregateOperator) op;
-        for (Mutable<ILogicalExpression> exprRef : ragg.getExpressions()) {
-            StatefulFunctionCallExpression f = (StatefulFunctionCallExpression) exprRef.getValue();
-            IPartitioningProperty p = f.getRequiredPartitioningProperty();
-            if (p != null) {
-                if (pp == null) {
-                    pp = p;
-                } else {
-                    throw new IllegalStateException("Two stateful functions want to set partitioning requirements: "
-                            + pp + " and " + p);
-                }
-            }
-        }
-        StructuralPropertiesVector[] r = new StructuralPropertiesVector[] { new StructuralPropertiesVector(pp, null) };
-        return new PhysicalRequirements(r, IPartitioningRequirementsCoordinator.NO_COORDINATION);
+
+        return emptyUnaryRequirements();
     }
 
     @Override
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/UnnestPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/UnnestPOperator.java
index bf45e18..bf2f68a 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/UnnestPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/UnnestPOperator.java
@@ -30,6 +30,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
 import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
 import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
 import edu.uci.ics.hyracks.algebricks.runtime.base.IUnnestingEvaluatorFactory;
 import edu.uci.ics.hyracks.algebricks.runtime.operators.std.UnnestRuntimeFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
@@ -57,10 +58,7 @@
             IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
             throws AlgebricksException {
         UnnestOperator unnest = (UnnestOperator) op;
-        if (unnest.getPositionalVariable() != null) {
-            throw new AlgebricksException("Cannot generate runtime for unnest with positional variable "
-                    + unnest.getPositionalVariable());
-        }
+
         int outCol = opSchema.findVariable(unnest.getVariable());
         ILogicalExpression unnestExpr = unnest.getExpressionRef().getValue();
         IExpressionRuntimeProvider expressionRuntimeProvider = context.getExpressionRuntimeProvider();
@@ -79,8 +77,18 @@
         UnnestingFunctionCallExpression agg = (UnnestingFunctionCallExpression) unnestExpr;
         IUnnestingEvaluatorFactory unnestingFactory = expressionRuntimeProvider.createUnnestingFunctionFactory(agg,
                 context.getTypeEnvironment(op.getInputs().get(0).getValue()), inputSchemas, context);
+
+        // for position offset
+        ILogicalExpression posOffsetExpr = unnest.getPositionOffsetExpr();
+        IScalarEvaluatorFactory posOffsetExprEvalFactory = null;
+        if (posOffsetExpr != null) {
+            posOffsetExprEvalFactory = expressionRuntimeProvider.createEvaluatorFactory(posOffsetExpr,
+                    context.getTypeEnvironment(op.getInputs().get(0).getValue()), inputSchemas, context);
+        }
+
         int[] projectionList = JobGenHelper.projectAllVariables(opSchema);
-        UnnestRuntimeFactory unnestRuntime = new UnnestRuntimeFactory(outCol, unnestingFactory, projectionList);
+        UnnestRuntimeFactory unnestRuntime = new UnnestRuntimeFactory(outCol, unnestingFactory, projectionList,
+                unnest.getPositionalVariable() != null, posOffsetExprEvalFactory);
         RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, context);
         builder.contributeMicroOperator(unnest, unnestRuntime, recDesc);
         ILogicalOperator src = unnest.getInputs().get(0).getValue();
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroHashPartitionMergeExchange.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroHashPartitionMergeExchange.java
index af11b70..82e6970 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroHashPartitionMergeExchange.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroHashPartitionMergeExchange.java
@@ -28,6 +28,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.HashPartitionMergeExchangePOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.SortMergeExchangePOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.properties.OrderColumn;
+import edu.uci.ics.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
 import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
 
 public class IntroHashPartitionMergeExchange implements IAlgebraicRewriteRule {
@@ -60,6 +61,7 @@
                 hpe.getDomain());
         op1.setPhysicalOperator(hpme);
         op1.getInputs().get(0).setValue(op2.getInputs().get(0).getValue());
+        op1.computeDeliveredPhysicalProperties(context);
         return true;
     }
 
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
index 8f2eaac..09354a5 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
@@ -28,11 +28,11 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.std.group.AbstractAccumulatingAggregatorDescriptorFactory;
 import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
 import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
 
-public class NestedPlansAccumulatingAggregatorFactory implements IAggregatorDescriptorFactory {
+public class NestedPlansAccumulatingAggregatorFactory extends AbstractAccumulatingAggregatorDescriptorFactory {
 
     private static final long serialVersionUID = 1L;
     private AlgebricksPipeline[] subplans;
@@ -95,7 +95,7 @@
             }
 
             @Override
-            public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+            public boolean outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
                     AggregateState state) throws HyracksDataException {
                 for (int i = 0; i < pipelines.length; i++) {
                     outputWriter.setInputIdx(i);
@@ -114,6 +114,7 @@
                     offset = fieldEnds[i] - start;
                     tupleBuilder.addField(data, start, offset);
                 }
+                return true;
             }
 
             @Override
@@ -127,7 +128,7 @@
             }
 
             @Override
-            public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+            public boolean outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
                     AggregateState state) throws HyracksDataException {
                 throw new IllegalStateException("this method should not be called");
             }
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
new file mode 100644
index 0000000..dc9c805
--- /dev/null
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
@@ -0,0 +1,252 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.aggreg;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.AlgebricksPipeline;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntime;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.std.NestedTupleSourceRuntimeFactory.NestedTupleSourceRuntime;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+
+public class NestedPlansRunningAggregatorFactory implements IAggregatorDescriptorFactory {
+
+    private static final long serialVersionUID = 1L;
+    private AlgebricksPipeline[] subplans;
+    private int[] keyFieldIdx;
+    private int[] decorFieldIdx;
+
+    public NestedPlansRunningAggregatorFactory(AlgebricksPipeline[] subplans, int[] keyFieldIdx, int[] decorFieldIdx) {
+        this.subplans = subplans;
+        this.keyFieldIdx = keyFieldIdx;
+        this.decorFieldIdx = decorFieldIdx;
+    }
+
+    /* (non-Javadoc)
+     * @see edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory#createAggregator(edu.uci.ics.hyracks.api.context.IHyracksTaskContext, edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor, edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor, int[], int[])
+     */
+    @Override
+    public IAggregatorDescriptor createAggregator(final IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
+            RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults,
+            final IFrameWriter writer) throws HyracksDataException {
+        final RunningAggregatorOutput outputWriter = new RunningAggregatorOutput(ctx, subplans, keyFieldIdx.length,
+                decorFieldIdx.length, writer);
+        final NestedTupleSourceRuntime[] pipelines = new NestedTupleSourceRuntime[subplans.length];
+        for (int i = 0; i < subplans.length; i++) {
+            try {
+                pipelines[i] = (NestedTupleSourceRuntime) assemblePipeline(subplans[i], outputWriter, ctx);
+            } catch (AlgebricksException e) {
+                throw new HyracksDataException(e);
+            }
+        }
+
+        final ArrayTupleBuilder gbyTb = outputWriter.getGroupByTupleBuilder();
+
+        final ByteBuffer outputFrame = ctx.allocateFrame();
+        final FrameTupleAppender outputAppender = new FrameTupleAppender(ctx.getFrameSize());
+        outputAppender.reset(outputFrame, true);
+
+        return new IAggregatorDescriptor() {
+
+            @Override
+            public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+                    AggregateState state) throws HyracksDataException {
+
+                for (int i = 0; i < pipelines.length; ++i) {
+                    pipelines[i].open();
+                }
+
+                gbyTb.reset();
+                for (int i = 0; i < keyFieldIdx.length; ++i) {
+                    gbyTb.addField(accessor, tIndex, keyFieldIdx[i]);
+                }
+                for (int i = 0; i < decorFieldIdx.length; ++i) {
+                    gbyTb.addField(accessor, tIndex, decorFieldIdx[i]);
+                }
+
+                // aggregate the first tuple
+                for (int i = 0; i < pipelines.length; i++) {
+                    outputWriter.setInputIdx(i);
+                    pipelines[i].writeTuple(accessor.getBuffer(), tIndex);
+                    pipelines[i].forceFlush();
+                }
+            }
+
+            @Override
+            public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
+                    int stateTupleIndex, AggregateState state) throws HyracksDataException {
+                for (int i = 0; i < pipelines.length; i++) {
+                    outputWriter.setInputIdx(i);
+                    pipelines[i].writeTuple(accessor.getBuffer(), tIndex);
+                    pipelines[i].forceFlush();
+                }
+            }
+
+            @Override
+            public boolean outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+                    AggregateState state) throws HyracksDataException {
+                return false;
+            }
+
+            @Override
+            public AggregateState createAggregateStates() {
+                return new AggregateState();
+            }
+
+            @Override
+            public void reset() {
+
+            }
+
+            @Override
+            public boolean outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor,
+                    int tIndex, AggregateState state) throws HyracksDataException {
+                throw new IllegalStateException("this method should not be called");
+            }
+
+            @Override
+            public void close() {
+
+            }
+        };
+    }
+
+    private IFrameWriter assemblePipeline(AlgebricksPipeline subplan, IFrameWriter writer, IHyracksTaskContext ctx)
+            throws AlgebricksException, HyracksDataException {
+        // plug the operators
+        IFrameWriter start = writer;
+        IPushRuntimeFactory[] runtimeFactories = subplan.getRuntimeFactories();
+        RecordDescriptor[] recordDescriptors = subplan.getRecordDescriptors();
+        for (int i = runtimeFactories.length - 1; i >= 0; i--) {
+            IPushRuntime newRuntime = runtimeFactories[i].createPushRuntime(ctx);
+            newRuntime.setFrameWriter(0, start, recordDescriptors[i]);
+            if (i > 0) {
+                newRuntime.setInputRecordDescriptor(0, recordDescriptors[i - 1]);
+            } else {
+                // the nts has the same input and output rec. desc.
+                newRuntime.setInputRecordDescriptor(0, recordDescriptors[0]);
+            }
+            start = newRuntime;
+        }
+        return start;
+    }
+
+    private static class RunningAggregatorOutput implements IFrameWriter {
+
+        private FrameTupleAccessor[] tAccess;
+        private RecordDescriptor[] inputRecDesc;
+        private int inputIdx;
+        private ArrayTupleBuilder tb;
+        private ArrayTupleBuilder gbyTb;
+        private AlgebricksPipeline[] subplans;
+        private IFrameWriter outputWriter;
+        private ByteBuffer outputFrame;
+        private FrameTupleAppender outputAppender;
+
+        public RunningAggregatorOutput(IHyracksTaskContext ctx, AlgebricksPipeline[] subplans, int numKeys,
+                int numDecors, IFrameWriter outputWriter) throws HyracksDataException {
+            this.subplans = subplans;
+            this.outputWriter = outputWriter;
+
+            // this.keyFieldIndexes = keyFieldIndexes;
+            int totalAggFields = 0;
+            this.inputRecDesc = new RecordDescriptor[subplans.length];
+            for (int i = 0; i < subplans.length; i++) {
+                RecordDescriptor[] rd = subplans[i].getRecordDescriptors();
+                this.inputRecDesc[i] = rd[rd.length - 1];
+                totalAggFields += subplans[i].getOutputWidth();
+            }
+            tb = new ArrayTupleBuilder(numKeys + numDecors + totalAggFields);
+            gbyTb = new ArrayTupleBuilder(numKeys + numDecors);
+
+            this.tAccess = new FrameTupleAccessor[inputRecDesc.length];
+            for (int i = 0; i < inputRecDesc.length; i++) {
+                tAccess[i] = new FrameTupleAccessor(ctx.getFrameSize(), inputRecDesc[i]);
+            }
+
+            this.outputFrame = ctx.allocateFrame();
+            this.outputAppender = new FrameTupleAppender(ctx.getFrameSize());
+            this.outputAppender.reset(outputFrame, true);
+        }
+
+        @Override
+        public void open() throws HyracksDataException {
+
+        }
+
+        @Override
+        public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+            int w = subplans[inputIdx].getOutputWidth();
+            IFrameTupleAccessor accessor = tAccess[inputIdx];
+            accessor.reset(buffer);
+            for (int tIndex = 0; tIndex < accessor.getTupleCount(); tIndex++) {
+                tb.reset();
+                byte[] data = gbyTb.getByteArray();
+                int[] fieldEnds = gbyTb.getFieldEndOffsets();
+                int start = 0;
+                int offset = 0;
+                for (int i = 0; i < fieldEnds.length; i++) {
+                    if (i > 0)
+                        start = fieldEnds[i - 1];
+                    offset = fieldEnds[i] - start;
+                    tb.addField(data, start, offset);
+                }
+                for (int f = 0; f < w; f++) {
+                    tb.addField(accessor, tIndex, f);
+                }
+                if (!outputAppender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+                    FrameUtils.flushFrame(outputFrame, outputWriter);
+                    outputAppender.reset(outputFrame, true);
+                    if (!outputAppender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+                        throw new HyracksDataException(
+                                "Failed to write a running aggregation result into an empty frame: possibly the size of the result is too large.");
+                    }
+                }
+            }
+        }
+
+        @Override
+        public void close() throws HyracksDataException {
+            // clearFrame();
+        }
+
+        public void setInputIdx(int inputIdx) {
+            this.inputIdx = inputIdx;
+        }
+
+        public ArrayTupleBuilder getGroupByTupleBuilder() {
+            return gbyTb;
+        }
+
+        @Override
+        public void fail() throws HyracksDataException {
+        }
+
+    }
+
+}
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java
index d4161fc..39db3e1 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java
@@ -25,11 +25,11 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import edu.uci.ics.hyracks.dataflow.std.group.AbstractAccumulatingAggregatorDescriptorFactory;
 import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
 import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
 
-public class SerializableAggregatorDescriptorFactory implements IAggregatorDescriptorFactory {
+public class SerializableAggregatorDescriptorFactory extends AbstractAccumulatingAggregatorDescriptorFactory {
     private static final long serialVersionUID = 1L;
     private ICopySerializableAggregateFunctionFactory[] aggFactories;
 
@@ -109,7 +109,7 @@
             }
 
             @Override
-            public void outputPartialResult(ArrayTupleBuilder tb, IFrameTupleAccessor accessor, int tIndex,
+            public boolean outputPartialResult(ArrayTupleBuilder tb, IFrameTupleAccessor accessor, int tIndex,
                     AggregateState state) throws HyracksDataException {
                 byte[] data = accessor.getBuffer().array();
                 int startOffset = accessor.getTupleStartOffset(tIndex);
@@ -125,10 +125,11 @@
                         throw new HyracksDataException(e);
                     }
                 }
+                return true;
             }
 
             @Override
-            public void outputFinalResult(ArrayTupleBuilder tb, IFrameTupleAccessor accessor, int tIndex,
+            public boolean outputFinalResult(ArrayTupleBuilder tb, IFrameTupleAccessor accessor, int tIndex,
                     AggregateState state) throws HyracksDataException {
                 byte[] data = accessor.getBuffer().array();
                 int startOffset = accessor.getTupleStartOffset(tIndex);
@@ -144,6 +145,7 @@
                         throw new HyracksDataException(e);
                     }
                 }
+                return true;
             }
 
             @Override
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SimpleAlgebricksAccumulatingAggregatorFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SimpleAlgebricksAccumulatingAggregatorFactory.java
index 8e472ec..9b638b4 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SimpleAlgebricksAccumulatingAggregatorFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SimpleAlgebricksAccumulatingAggregatorFactory.java
@@ -25,11 +25,11 @@
 import edu.uci.ics.hyracks.data.std.primitive.VoidPointable;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import edu.uci.ics.hyracks.dataflow.std.group.AbstractAccumulatingAggregatorDescriptorFactory;
 import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
 import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
 
-public class SimpleAlgebricksAccumulatingAggregatorFactory implements IAggregatorDescriptorFactory {
+public class SimpleAlgebricksAccumulatingAggregatorFactory extends AbstractAccumulatingAggregatorDescriptorFactory {
 
     private static final long serialVersionUID = 1L;
     private IAggregateEvaluatorFactory[] aggFactories;
@@ -86,7 +86,7 @@
             }
 
             @Override
-            public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+            public boolean outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
                     AggregateState state) throws HyracksDataException {
                 IAggregateEvaluator[] agg = (IAggregateEvaluator[]) state.state;
                 for (int i = 0; i < agg.length; i++) {
@@ -97,6 +97,7 @@
                         throw new HyracksDataException(e);
                     }
                 }
+                return true;
             }
 
             @Override
@@ -118,7 +119,7 @@
             }
 
             @Override
-            public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+            public boolean outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
                     AggregateState state) throws HyracksDataException {
                 throw new IllegalStateException("this method should not be called");
             }
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
index 6c1dd5e..b079c3e 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
@@ -51,8 +51,8 @@
             FrameUtils.flushFrame(frame, writer);
             appender.reset(frame, true);
             if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-                throw new IllegalStateException(
-                        "Could not write frame (AbstractOneInputOneOutputOneFramePushRuntime.appendToFrameFromTupleBuilder).");
+                throw new HyracksDataException(
+                        "Could not write frame: the size of the tuple is too long to be fit into a single frame. (AbstractOneInputOneOutputOneFramePushRuntime.appendToFrameFromTupleBuilder)");
             }
         }
         if (flushFrame) {
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java
index 200e5c5..7ecb288 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java
@@ -63,8 +63,6 @@
             for (int i = 0; i < comparatorFactories.length; ++i) {
                 comparators[i] = comparatorFactories[i].createBinaryComparator();
             }
-            final IAggregatorDescriptor aggregator = aggregatorFactory.createAggregator(ctx, inRecordDesc,
-                    outRecordDesc, groupFields, groupFields);
             final ByteBuffer copyFrame = ctx.allocateFrame();
             final FrameTupleAccessor copyFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecordDesc);
             copyFrameAccessor.reset(copyFrame);
@@ -78,6 +76,8 @@
 
                 @Override
                 public void open() throws HyracksDataException {
+                    IAggregatorDescriptor aggregator = aggregatorFactory.createAggregator(ctx, inRecordDesc,
+                            outRecordDesc, groupFields, groupFields, writer);
                     pgw = new PreclusteredGroupWriter(ctx, groupFields, comparators, aggregator, inRecordDesc,
                             outRecordDesc, writer);
                     pgw.open();
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
index 0499684..e5bede2 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
@@ -107,7 +107,8 @@
                             FrameUtils.flushFrame(frame, writer);
                             appender.reset(frame, true);
                             if (!appender.appendConcat(tRef.getFrameTupleAccessor(), tRef.getTupleIndex(), ta, t)) {
-                                throw new IllegalStateException("Could not write frame.");
+                                throw new HyracksDataException(
+                                        "Could not write frame: subplan result is larger than the single-frame limit.");
                             }
                         }
                     }
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
index 5486fae..55dbcbb 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
@@ -21,6 +21,7 @@
 import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
 
 public class NestedTupleSourceRuntimeFactory implements IPushRuntimeFactory {
 
@@ -64,5 +65,10 @@
         public void fail() throws HyracksDataException {
             writer.fail();
         }
+
+        public void forceFlush() throws HyracksDataException {
+            FrameUtils.flushFrame(frame, writer);
+            appender.reset(frame, true);
+        }
     }
 }
\ No newline at end of file
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
index e92724a..cfc2bb6 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
@@ -28,6 +28,7 @@
 import edu.uci.ics.hyracks.data.std.api.IPointable;
 import edu.uci.ics.hyracks.data.std.primitive.VoidPointable;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
 
 public class RunningAggregateRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
@@ -89,6 +90,10 @@
 
             @Override
             public void open() throws HyracksDataException {
+                if(!first){
+                    FrameUtils.flushFrame(frame, writer);
+                    appender.reset(frame, true);
+                }
                 initAccessAppendRef(ctx);
                 if (first) {
                     first = false;
@@ -96,12 +101,18 @@
                     for (int i = 0; i < n; i++) {
                         try {
                             raggs[i] = runningAggregates[i].createRunningAggregateEvaluator();
-                            raggs[i].init();
                         } catch (AlgebricksException ae) {
                             throw new HyracksDataException(ae);
                         }
                     }
                 }
+                for (int i = 0; i < runningAggregates.length; i++) {
+                    try {
+                        raggs[i].init();
+                    } catch (AlgebricksException ae) {
+                        throw new HyracksDataException(ae);
+                    }
+                }
                 writer.open();
             }
 
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
index df70437..88dc19f 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
@@ -14,11 +14,15 @@
  */
 package edu.uci.ics.hyracks.algebricks.runtime.operators.std;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
 import edu.uci.ics.hyracks.algebricks.runtime.base.IUnnestingEvaluator;
 import edu.uci.ics.hyracks.algebricks.runtime.base.IUnnestingEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.evaluators.ConstantEvaluatorFactory;
 import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
 import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -26,6 +30,7 @@
 import edu.uci.ics.hyracks.data.std.api.IPointable;
 import edu.uci.ics.hyracks.data.std.primitive.VoidPointable;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 
 public class UnnestRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
 
@@ -36,12 +41,20 @@
     private int outColPos;
     private final boolean outColIsProjected;
 
+    private final boolean hasPositionalVariable;
+    private IScalarEvaluatorFactory posOffsetEvalFactory;
+
     // Each time step() is called on the aggregate, a new value is written in
     // its output. One byte is written before that value and is neglected.
     // By convention, if the aggregate function writes nothing, it means it
     // produced the last value.
 
     public UnnestRuntimeFactory(int outCol, IUnnestingEvaluatorFactory unnestingFactory, int[] projectionList) {
+        this(outCol, unnestingFactory, projectionList, false, null);
+    }
+
+    public UnnestRuntimeFactory(int outCol, IUnnestingEvaluatorFactory unnestingFactory, int[] projectionList,
+            boolean hashPositionalVariable, IScalarEvaluatorFactory posOffsetEvalFactory) {
         super(projectionList);
         this.outCol = outCol;
         this.unnestingFactory = unnestingFactory;
@@ -52,6 +65,11 @@
             }
         }
         outColIsProjected = outColPos >= 0;
+        this.hasPositionalVariable = hashPositionalVariable;
+        this.posOffsetEvalFactory = posOffsetEvalFactory;
+        if (this.posOffsetEvalFactory == null) {
+            this.posOffsetEvalFactory = new ConstantEvaluatorFactory(new byte[5]);
+        }
     }
 
     @Override
@@ -68,6 +86,9 @@
             private IUnnestingEvaluator agg;
             private ArrayTupleBuilder tupleBuilder;
 
+            private int tupleCount;
+            private IScalarEvaluator offsetEval = posOffsetEvalFactory.createScalarEvaluator(ctx);
+
             @Override
             public void open() throws HyracksDataException {
                 initAccessAppendRef(ctx);
@@ -77,6 +98,7 @@
                     throw new HyracksDataException(ae);
                 }
                 tupleBuilder = new ArrayTupleBuilder(projectionList.length);
+                tupleCount = 1;
                 writer.open();
             }
 
@@ -86,6 +108,16 @@
                 int nTuple = tAccess.getTupleCount();
                 for (int t = 0; t < nTuple; t++) {
                     tRef.reset(tAccess, t);
+
+                    try {
+                        offsetEval.evaluate(tRef, p);
+                    } catch (AlgebricksException e) {
+                        throw new HyracksDataException(e);
+                    }
+
+                    @SuppressWarnings("static-access")
+                    int offset = IntegerSerializerDeserializer.INSTANCE.getInt(p.getByteArray(), p.getStartOffset());
+
                     try {
                         agg.init(tRef);
                         boolean goon = true;
@@ -94,21 +126,33 @@
                             if (!agg.step(p)) {
                                 goon = false;
                             } else {
-                                if (!outColIsProjected) {
+
+                                if (!outColIsProjected && !hasPositionalVariable) {
                                     appendProjectionToFrame(t, projectionList);
                                 } else {
                                     for (int f = 0; f < outColPos; f++) {
                                         tupleBuilder.addField(tAccess, t, f);
                                     }
-                                    tupleBuilder.addField(p.getByteArray(), p.getStartOffset(), p.getLength());
-                                    for (int f = outColPos + 1; f < projectionList.length; f++) {
+                                    if (outColIsProjected) {
+                                        tupleBuilder.addField(p.getByteArray(), p.getStartOffset(), p.getLength());
+                                    } else {
+                                        tupleBuilder.addField(tAccess, t, outColPos);
+                                    }
+                                    for (int f = outColPos + 1; f < (hasPositionalVariable ? projectionList.length - 1
+                                            : projectionList.length); f++) {
                                         tupleBuilder.addField(tAccess, t, f);
                                     }
                                 }
+                                if (hasPositionalVariable) {
+                                    // Write the positional variable as an INT32
+                                    tupleBuilder.getDataOutput().writeByte(3);
+                                    tupleBuilder.getDataOutput().writeInt(offset + tupleCount++);
+                                    tupleBuilder.addFieldEndOffset();
+                                }
                                 appendToFrameFromTupleBuilder(tupleBuilder);
                             }
                         } while (goon);
-                    } catch (AlgebricksException ae) {
+                    } catch (AlgebricksException | IOException ae) {
                         throw new HyracksDataException(ae);
                     }
                 }
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksCommonContext.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksCommonContext.java
index c75f9f9..1af32de 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksCommonContext.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksCommonContext.java
@@ -25,4 +25,6 @@
     public IIOManager getIOManager();
 
     public ByteBuffer allocateFrame() throws HyracksDataException;
+    
+    public void deallocateFrames(int frameCount);
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/DatasetClientContext.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/DatasetClientContext.java
index aa7008f..3ea81ef 100644
--- a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/DatasetClientContext.java
+++ b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/DatasetClientContext.java
@@ -41,4 +41,10 @@
         return ByteBuffer.allocate(frameSize);
     }
 
+    @Override
+    public void deallocateFrames(int frameCount) {
+        // TODO Auto-generated method stub
+        
+    }
+
 }
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
index c72ced1..20075ff 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
@@ -115,6 +115,11 @@
     }
 
     @Override
+    public void deallocateFrames(int frameCount) {
+        joblet.deallocateFrames(frameCount);
+    }
+
+    @Override
     public int getFrameSize() {
         return joblet.getFrameSize();
     }
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/resources/memory/MemoryManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/resources/memory/MemoryManager.java
index 8add260..4829173 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/resources/memory/MemoryManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/resources/memory/MemoryManager.java
@@ -26,10 +26,11 @@
 
     @Override
     public boolean allocate(long memory) {
-        if (this.memory.addAndGet(-memory) < 0) {
-            this.memory.addAndGet(memory);
-            return false;
-        }
+        // commented as now the deallocation is not implemented yet.
+        //        if (this.memory.addAndGet(-memory) < 0) {
+        //            this.memory.addAndGet(memory);
+        //            return false;
+        //        }
         return true;
     }
 
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/AbstractAccumulatingAggregatorDescriptorFactory.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/AbstractAccumulatingAggregatorDescriptorFactory.java
new file mode 100644
index 0000000..48ee410
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/AbstractAccumulatingAggregatorDescriptorFactory.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.group;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public abstract class AbstractAccumulatingAggregatorDescriptorFactory implements IAggregatorDescriptorFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    /* (non-Javadoc)
+     * @see edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory#createAggregator(edu.uci.ics.hyracks.api.context.IHyracksTaskContext, edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor, edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor, int[], int[], edu.uci.ics.hyracks.api.comm.IFrameWriter)
+     */
+    @Override
+    public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
+            RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults, IFrameWriter writer)
+            throws HyracksDataException {
+        return this
+                .createAggregator(ctx, inRecordDescriptor, outRecordDescriptor, keyFields, keyFieldsInPartialResults);
+    }
+
+    abstract protected IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx,
+            RecordDescriptor inRecordDescriptor, RecordDescriptor outRecordDescriptor, int[] keyFields,
+            final int[] keyFieldsInPartialResults) throws HyracksDataException;
+
+}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/AbstractRunningAggregatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/AbstractRunningAggregatorDescriptor.java
new file mode 100644
index 0000000..3dcd4c3
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/AbstractRunningAggregatorDescriptor.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.group;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+
+public abstract class AbstractRunningAggregatorDescriptor implements IAggregatorDescriptor {
+
+    /* (non-Javadoc)
+     * @see edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor#outputPartialResult(edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder, edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor, int, edu.uci.ics.hyracks.dataflow.std.group.AggregateState)
+     */
+    @Override
+    public boolean outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+            AggregateState state) throws HyracksDataException {
+        return false;
+    }
+
+    /* (non-Javadoc)
+     * @see edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor#outputFinalResult(edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder, edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor, int, edu.uci.ics.hyracks.dataflow.std.group.AggregateState)
+     */
+    @Override
+    public boolean outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+            AggregateState state) throws HyracksDataException {
+        return false;
+    }
+
+}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java
index ecd5284..df74a93 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java
@@ -38,9 +38,6 @@
 import edu.uci.ics.hyracks.dataflow.std.structures.SerializableHashTable;
 import edu.uci.ics.hyracks.dataflow.std.structures.TuplePointer;
 
-/**
- *
- */
 public class HashSpillableTableFactory implements ISpillableTableFactory {
 
     private static final long serialVersionUID = 1L;
@@ -104,7 +101,7 @@
         }
 
         final IAggregatorDescriptor aggregator = aggregateFactory.createAggregator(ctx, inRecordDescriptor,
-                outRecordDescriptor, keyFields, keyFieldsInPartialResults);
+                outRecordDescriptor, keyFields, keyFieldsInPartialResults, null);
 
         final AggregateState aggregateState = aggregator.createAggregateStates();
 
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptor.java
index afe460c..f9d4cd3 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptor.java
@@ -18,9 +18,6 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 
-/**
- *
- */
 public interface IAggregatorDescriptor {
 
     /**
@@ -82,9 +79,10 @@
      * @param offset
      * @param state
      *            The aggregation state.
+     * @return TODO
      * @throws HyracksDataException
      */
-    public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+    public boolean outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
             AggregateState state) throws HyracksDataException;
 
     /**
@@ -97,9 +95,10 @@
      * @param offset
      * @param state
      *            The aggregation state.
+     * @return TODO
      * @throws HyracksDataException
      */
-    public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+    public boolean outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
             AggregateState state) throws HyracksDataException;
 
     public void close();
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptorFactory.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptorFactory.java
index a4c0564..241956b 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptorFactory.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptorFactory.java
@@ -16,17 +16,15 @@
 
 import java.io.Serializable;
 
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 
-/**
- *
- */
 public interface IAggregatorDescriptorFactory extends Serializable {
 
     IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
-            RecordDescriptor outRecordDescriptor, int[] keyFields, final int[] keyFieldsInPartialResults)
-            throws HyracksDataException;
+            RecordDescriptor outRecordDescriptor, int[] keyFields, final int[] keyFieldsInPartialResults,
+            IFrameWriter writer) throws HyracksDataException;
 
 }
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java
index 239f24f..08b2218 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java
@@ -21,16 +21,14 @@
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.std.group.AbstractAccumulatingAggregatorDescriptorFactory;
 import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
 import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
 import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory;
 
-/**
- *
- */
-public class MultiFieldsAggregatorFactory implements IAggregatorDescriptorFactory {
+public class MultiFieldsAggregatorFactory extends AbstractAccumulatingAggregatorDescriptorFactory {
 
     private static final long serialVersionUID = 1L;
     private final IFieldAggregateDescriptorFactory[] aggregatorFactories;
@@ -78,8 +76,8 @@
             }
 
             @Override
-            public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
-                    AggregateState state) throws HyracksDataException {
+            public boolean outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor,
+                    int tIndex, AggregateState state) throws HyracksDataException {
                 DataOutput dos = tupleBuilder.getDataOutput();
 
                 int tupleOffset = accessor.getTupleStartOffset(tIndex);
@@ -90,11 +88,11 @@
                             ((AggregateState[]) state.state)[i]);
                     tupleBuilder.addFieldEndOffset();
                 }
-
+                return true;
             }
 
             @Override
-            public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+            public boolean outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
                     AggregateState state) throws HyracksDataException {
                 DataOutput dos = tupleBuilder.getDataOutput();
 
@@ -110,6 +108,7 @@
                     }
                     tupleBuilder.addFieldEndOffset();
                 }
+                return true;
             }
 
             @Override
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/external/ExternalGroupMergeOperatorNodePushable.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/external/ExternalGroupMergeOperatorNodePushable.java
index bb3d0f3..fdb59a4 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/external/ExternalGroupMergeOperatorNodePushable.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/external/ExternalGroupMergeOperatorNodePushable.java
@@ -95,7 +95,7 @@
         }
 
         aggregator = mergerFactory.createAggregator(ctx, outRecordDescriptor, outRecordDescriptor, keyFields,
-                keyFieldsInPartialResults);
+                keyFieldsInPartialResults, writer);
         aggregateState = aggregator.createAggregateStates();
 
         storedKeys = new int[keyFields.length];
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hash/GroupingHashTable.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hash/GroupingHashTable.java
index f8ba003..3c0eb2b 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hash/GroupingHashTable.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hash/GroupingHashTable.java
@@ -121,7 +121,7 @@
         }
 
         this.aggregator = aggregatorFactory.createAggregator(ctx, inRecordDescriptor, outRecordDescriptor, fields,
-                keyFieldsInPartialResults);
+                keyFieldsInPartialResults, null);
 
         this.aggregateStates = new AggregateState[INIT_AGG_STATE_SIZE];
         accumulatorSize = 0;
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java
index 35cb9e1..71af928 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java
@@ -55,7 +55,7 @@
             comparators[i] = comparatorFactories[i].createBinaryComparator();
         }
         final IAggregatorDescriptor aggregator = aggregatorFactory.createAggregator(ctx, inRecordDescriptor,
-                outRecordDescriptor, groupFields, groupFields);
+                outRecordDescriptor, groupFields, groupFields, writer);
         final ByteBuffer copyFrame = ctx.allocateFrame();
         final FrameTupleAccessor copyFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecordDescriptor);
         copyFrameAccessor.reset(copyFrame);
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
index 7773765..c7a9f6d 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
@@ -44,6 +44,8 @@
 
     private boolean first;
 
+    private boolean isFailed = false;
+
     public PreclusteredGroupWriter(IHyracksTaskContext ctx, int[] groupFields, IBinaryComparator[] comparators,
             IAggregatorDescriptor aggregator, RecordDescriptor inRecordDesc, RecordDescriptor outRecordDesc,
             IFrameWriter writer) throws HyracksDataException {
@@ -119,9 +121,9 @@
         for (int j = 0; j < groupFields.length; j++) {
             tupleBuilder.addField(lastTupleAccessor, lastTupleIndex, groupFields[j]);
         }
-        aggregator.outputFinalResult(tupleBuilder, lastTupleAccessor, lastTupleIndex, aggregateState);
+        boolean hasOutput = aggregator.outputFinalResult(tupleBuilder, lastTupleAccessor, lastTupleIndex, aggregateState);
 
-        if (!appender.appendSkipEmptyField(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
+        if (hasOutput && !appender.appendSkipEmptyField(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
                 tupleBuilder.getSize())) {
             FrameUtils.flushFrame(outFrame, writer);
             appender.reset(outFrame, true);
@@ -149,12 +151,13 @@
 
     @Override
     public void fail() throws HyracksDataException {
+        isFailed = true;
         writer.fail();
     }
 
     @Override
     public void close() throws HyracksDataException {
-        if (!first) {
+        if (!isFailed && !first) {
             writeOutput(copyFrameAccessor, copyFrameAccessor.getTupleCount() - 1);
             if (appender.getTupleCount() > 0) {
                 FrameUtils.flushFrame(outFrame, writer);
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/AbstractfWordInvertedIndexTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/AbstractfWordInvertedIndexTest.java
index 1bd22d7..5ac2961 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/AbstractfWordInvertedIndexTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/AbstractfWordInvertedIndexTest.java
@@ -19,6 +19,8 @@
 import java.io.File;
 import java.text.SimpleDateFormat;
 import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
 
 import org.junit.Before;
 import org.junit.Test;
@@ -83,7 +85,11 @@
         TestStorageManagerComponentHolder.init(8192, 20, 20);
     }
 
-    protected static final int MERGE_THRESHOLD = 3;
+    protected static final Map<String, String> MERGE_POLICY_PROPERTIES;
+    static {
+        MERGE_POLICY_PROPERTIES = new HashMap<String, String>();
+        MERGE_POLICY_PROPERTIES.put("num-components", "3");
+    }
 
     protected IVirtualBufferCacheProvider virtualBufferCacheProvider = new TestVirtualBufferCacheProvider(
             DEFAULT_MEM_PAGE_SIZE, DEFAULT_MEM_NUM_PAGES);
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/PartitionedWordInvertedIndexTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/PartitionedWordInvertedIndexTest.java
index fc932a3..8addae7 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/PartitionedWordInvertedIndexTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/PartitionedWordInvertedIndexTest.java
@@ -26,7 +26,7 @@
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.ShortSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ConstantMergePolicyProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ConstantMergePolicyFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallback;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SynchronousSchedulerProvider;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ThreadCountingOperationTrackerProvider;
@@ -52,7 +52,7 @@
                 PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) };
 
         invertedIndexDataflowHelperFactory = new PartitionedLSMInvertedIndexDataflowHelperFactory(
-                virtualBufferCacheProvider, new ConstantMergePolicyProvider(MERGE_THRESHOLD),
+                virtualBufferCacheProvider, new ConstantMergePolicyFactory(), MERGE_POLICY_PROPERTIES,
                 ThreadCountingOperationTrackerProvider.INSTANCE, SynchronousSchedulerProvider.INSTANCE,
                 NoOpIOOperationCallback.INSTANCE, DEFAULT_BLOOM_FILTER_FALSE_POSITIVE_RATE);
     }
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/WordInvertedIndexTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/WordInvertedIndexTest.java
index 75045b4..82de0c9 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/WordInvertedIndexTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/WordInvertedIndexTest.java
@@ -24,7 +24,7 @@
 import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ConstantMergePolicyProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ConstantMergePolicyFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallback;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SynchronousSchedulerProvider;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ThreadCountingOperationTrackerProvider;
@@ -47,9 +47,9 @@
                 PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) };
 
         invertedIndexDataflowHelperFactory = new LSMInvertedIndexDataflowHelperFactory(virtualBufferCacheProvider,
-                new ConstantMergePolicyProvider(MERGE_THRESHOLD), ThreadCountingOperationTrackerProvider.INSTANCE,
-                SynchronousSchedulerProvider.INSTANCE, NoOpIOOperationCallback.INSTANCE,
-                DEFAULT_BLOOM_FILTER_FALSE_POSITIVE_RATE);
+                new ConstantMergePolicyFactory(), MERGE_POLICY_PROPERTIES,
+                ThreadCountingOperationTrackerProvider.INSTANCE, SynchronousSchedulerProvider.INSTANCE,
+                NoOpIOOperationCallback.INSTANCE, DEFAULT_BLOOM_FILTER_FALSE_POSITIVE_RATE);
     }
 
     @Override
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/btree/LSMBTreeOperatorTestHelper.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/btree/LSMBTreeOperatorTestHelper.java
index e98ecde..a4fd4c1 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/btree/LSMBTreeOperatorTestHelper.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/btree/LSMBTreeOperatorTestHelper.java
@@ -15,10 +15,13 @@
 
 package edu.uci.ics.hyracks.tests.am.lsm.btree;
 
+import java.util.HashMap;
+import java.util.Map;
+
 import edu.uci.ics.hyracks.control.nc.io.IOManager;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeDataflowHelperFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ConstantMergePolicyProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ConstantMergePolicyFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallback;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SynchronousSchedulerProvider;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ThreadCountingOperationTrackerProvider;
@@ -26,15 +29,19 @@
 
 public class LSMBTreeOperatorTestHelper extends LSMTreeOperatorTestHelper {
 
-    private static final int MERGE_THRESHOLD = 3;
-
+    private static final Map<String, String> MERGE_POLICY_PROPERTIES;
+    static {
+        MERGE_POLICY_PROPERTIES = new HashMap<String, String>();
+        MERGE_POLICY_PROPERTIES.put("num-components", "3");
+    }
+    
     public LSMBTreeOperatorTestHelper(IOManager ioManager) {
         super(ioManager);
     }
 
     public IIndexDataflowHelperFactory createDataFlowHelperFactory() {
-        return new LSMBTreeDataflowHelperFactory(virtualBufferCacheProvider, new ConstantMergePolicyProvider(
-                MERGE_THRESHOLD), ThreadCountingOperationTrackerProvider.INSTANCE,
+        return new LSMBTreeDataflowHelperFactory(virtualBufferCacheProvider, new ConstantMergePolicyFactory(),
+                MERGE_POLICY_PROPERTIES, ThreadCountingOperationTrackerProvider.INSTANCE,
                 SynchronousSchedulerProvider.INSTANCE, NoOpIOOperationCallback.INSTANCE,
                 DEFAULT_BLOOM_FILTER_FALSE_POSITIVE_RATE);
     }
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/rtree/LSMRTreeOperatorTestHelper.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/rtree/LSMRTreeOperatorTestHelper.java
index e7478d6..79b8eb7 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/rtree/LSMRTreeOperatorTestHelper.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/rtree/LSMRTreeOperatorTestHelper.java
@@ -15,12 +15,15 @@
 
 package edu.uci.ics.hyracks.tests.am.lsm.rtree;
 
+import java.util.HashMap;
+import java.util.Map;
+
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ILinearizeComparatorFactory;
 import edu.uci.ics.hyracks.control.nc.io.IOManager;
 import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ConstantMergePolicyProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ConstantMergePolicyFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallback;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SynchronousSchedulerProvider;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ThreadCountingOperationTrackerProvider;
@@ -30,7 +33,11 @@
 
 public class LSMRTreeOperatorTestHelper extends LSMTreeOperatorTestHelper {
 
-    private static final int MERGE_THRESHOLD = 3;
+    private static final Map<String, String> MERGE_POLICY_PROPERTIES;
+    static {
+        MERGE_POLICY_PROPERTIES = new HashMap<String, String>();
+        MERGE_POLICY_PROPERTIES.put("num-components", "3");
+    }
 
     public LSMRTreeOperatorTestHelper(IOManager ioManager) {
         super(ioManager);
@@ -40,7 +47,7 @@
             IPrimitiveValueProviderFactory[] valueProviderFactories, RTreePolicyType rtreePolicyType,
             IBinaryComparatorFactory[] btreeComparatorFactories, ILinearizeComparatorFactory linearizerCmpFactory) {
         return new LSMRTreeDataflowHelperFactory(valueProviderFactories, rtreePolicyType, btreeComparatorFactories,
-                virtualBufferCacheProvider, new ConstantMergePolicyProvider(MERGE_THRESHOLD),
+                virtualBufferCacheProvider, new ConstantMergePolicyFactory(), MERGE_POLICY_PROPERTIES,
                 ThreadCountingOperationTrackerProvider.INSTANCE, SynchronousSchedulerProvider.INSTANCE,
                 NoOpIOOperationCallback.INSTANCE, linearizerCmpFactory, DEFAULT_BLOOM_FILTER_FALSE_POSITIVE_RATE);
     }
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesOperatorTestHelper.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesOperatorTestHelper.java
index f883d90..fd6171a 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesOperatorTestHelper.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesOperatorTestHelper.java
@@ -15,12 +15,15 @@
 
 package edu.uci.ics.hyracks.tests.am.lsm.rtree;
 
+import java.util.HashMap;
+import java.util.Map;
+
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ILinearizeComparatorFactory;
 import edu.uci.ics.hyracks.control.nc.io.IOManager;
 import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ConstantMergePolicyProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ConstantMergePolicyFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallback;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SynchronousSchedulerProvider;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ThreadCountingOperationTrackerProvider;
@@ -30,7 +33,11 @@
 
 public class LSMRTreeWithAntiMatterTuplesOperatorTestHelper extends LSMTreeOperatorTestHelper {
 
-    private static final int MERGE_THRESHOLD = 3;
+    private static final Map<String, String> MERGE_POLICY_PROPERTIES;
+    static {
+        MERGE_POLICY_PROPERTIES = new HashMap<String, String>();
+        MERGE_POLICY_PROPERTIES.put("num-components", "3");
+    }
 
     public LSMRTreeWithAntiMatterTuplesOperatorTestHelper(IOManager ioManager) {
         super(ioManager);
@@ -40,9 +47,8 @@
             IPrimitiveValueProviderFactory[] valueProviderFactories, RTreePolicyType rtreePolicyType,
             IBinaryComparatorFactory[] btreeComparatorFactories, ILinearizeComparatorFactory linearizerCmpFactory) {
         return new LSMRTreeWithAntiMatterTuplesDataflowHelperFactory(valueProviderFactories, rtreePolicyType,
-                btreeComparatorFactories, virtualBufferCacheProvider, new ConstantMergePolicyProvider(MERGE_THRESHOLD),
-                ThreadCountingOperationTrackerProvider.INSTANCE, SynchronousSchedulerProvider.INSTANCE,
-                NoOpIOOperationCallback.INSTANCE, linearizerCmpFactory);
+                btreeComparatorFactories, virtualBufferCacheProvider, new ConstantMergePolicyFactory(),
+                MERGE_POLICY_PROPERTIES, ThreadCountingOperationTrackerProvider.INSTANCE,
+                SynchronousSchedulerProvider.INSTANCE, NoOpIOOperationCallback.INSTANCE, linearizerCmpFactory);
     }
-
 }
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/datagen/DataGenThread.java b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/datagen/DataGenThread.java
index 6e17ff8..9c2268d 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/datagen/DataGenThread.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/datagen/DataGenThread.java
@@ -23,21 +23,20 @@
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 
 /**
- * Quick & dirty data generator for multi-thread testing. 
- *
+ * Quick & dirty data generator for multi-thread testing.
  */
 @SuppressWarnings("rawtypes")
 public class DataGenThread extends Thread {
     public final BlockingQueue<TupleBatch> tupleBatchQueue;
     private final int maxNumBatches;
-    private final int maxOutstandingBatches;        
+    private final int maxOutstandingBatches;
     private int numBatches = 0;
     private final Random rnd;
-    
+
     // maxOutstandingBatches pre-created tuple-batches for populating the queue.
     private TupleBatch[] tupleBatches;
     private int ringPos;
-    
+
     public DataGenThread(int numConsumers, int maxNumBatches, int batchSize, ISerializerDeserializer[] fieldSerdes,
             int payloadSize, int rndSeed, int maxOutstandingBatches, boolean sorted) {
         this.maxNumBatches = maxNumBatches;
@@ -51,7 +50,7 @@
         tupleBatchQueue = new LinkedBlockingQueue<TupleBatch>(maxOutstandingBatches);
         ringPos = 0;
     }
-    
+
     public DataGenThread(int numConsumers, int maxNumBatches, int batchSize, ISerializerDeserializer[] fieldSerdes,
             IFieldValueGenerator[] fieldGens, int rndSeed, int maxOutstandingBatches) {
         this.maxNumBatches = maxNumBatches;
@@ -64,13 +63,13 @@
         tupleBatchQueue = new LinkedBlockingQueue<TupleBatch>(maxOutstandingBatches);
         ringPos = 0;
     }
-    
+
     @Override
     public void run() {
-        while(numBatches < maxNumBatches) {
+        while (numBatches < maxNumBatches) {
             boolean added = false;
             try {
-                if (tupleBatches[ringPos].inUse.compareAndSet(false, true)) {                    
+                if (tupleBatches[ringPos].inUse.compareAndSet(false, true)) {
                     tupleBatches[ringPos].generate();
                     tupleBatchQueue.put(tupleBatches[ringPos]);
                     added = true;
@@ -89,11 +88,11 @@
             }
         }
     }
-    
+
     public TupleBatch getBatch() throws InterruptedException {
         return tupleBatchQueue.take();
     }
-    
+
     public void releaseBatch(TupleBatch batch) {
         batch.inUse.set(false);
     }
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/datagen/DataGenUtils.java b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/datagen/DataGenUtils.java
index d7234fa..cb9c545 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/datagen/DataGenUtils.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/datagen/DataGenUtils.java
@@ -23,7 +23,7 @@
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
 
-@SuppressWarnings("rawtypes") 
+@SuppressWarnings("rawtypes")
 public class DataGenUtils {
     public static IFieldValueGenerator getFieldGenFromSerde(ISerializerDeserializer serde, Random rnd, boolean sorted) {
         if (serde instanceof IntegerSerializerDeserializer) {
@@ -49,8 +49,9 @@
         }
         return null;
     }
-    
-    public static IFieldValueGenerator[] getFieldGensFromSerdes(ISerializerDeserializer[] serdes, Random rnd, boolean sorted) {
+
+    public static IFieldValueGenerator[] getFieldGensFromSerdes(ISerializerDeserializer[] serdes, Random rnd,
+            boolean sorted) {
         IFieldValueGenerator[] fieldValueGens = new IFieldValueGenerator[serdes.length];
         for (int i = 0; i < serdes.length; i++) {
             fieldValueGens[i] = getFieldGenFromSerde(serdes[i], rnd, sorted);
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/ophelpers/IndexOperation.java b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/ophelpers/IndexOperation.java
index 91207e8..b5c0a1d 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/ophelpers/IndexOperation.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/ophelpers/IndexOperation.java
@@ -25,5 +25,6 @@
     PHYSICALDELETE,
     NOOP,
     MERGE,
+    FULL_MERGE,
     FLUSH
 }
diff --git a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDataflowHelperFactory.java b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDataflowHelperFactory.java
index 104a70d..0fdef13 100644
--- a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDataflowHelperFactory.java
+++ b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDataflowHelperFactory.java
@@ -15,12 +15,14 @@
 
 package edu.uci.ics.hyracks.storage.am.lsm.btree.dataflow;
 
+import java.util.Map;
+
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexDataflowHelper;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationSchedulerProvider;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerProvider;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.IVirtualBufferCacheProvider;
 import edu.uci.ics.hyracks.storage.am.lsm.common.dataflow.AbstractLSMIndexDataflowHelperFactory;
@@ -30,11 +32,11 @@
     private static final long serialVersionUID = 1L;
 
     public LSMBTreeDataflowHelperFactory(IVirtualBufferCacheProvider virtualBufferCacheProvider,
-            ILSMMergePolicyProvider mergePolicyProvider, ILSMOperationTrackerProvider opTrackerFactory,
-            ILSMIOOperationSchedulerProvider ioSchedulerProvider, ILSMIOOperationCallbackFactory ioOpCallbackFactory,
-            double bloomFilterFalsePositiveRate) {
-        super(virtualBufferCacheProvider, mergePolicyProvider, opTrackerFactory, ioSchedulerProvider,
-                ioOpCallbackFactory, bloomFilterFalsePositiveRate);
+            ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyProperties,
+            ILSMOperationTrackerProvider opTrackerFactory, ILSMIOOperationSchedulerProvider ioSchedulerProvider,
+            ILSMIOOperationCallbackFactory ioOpCallbackFactory, double bloomFilterFalsePositiveRate) {
+        super(virtualBufferCacheProvider, mergePolicyFactory, mergePolicyProperties, opTrackerFactory,
+                ioSchedulerProvider, ioOpCallbackFactory, bloomFilterFalsePositiveRate);
     }
 
     @Override
@@ -42,7 +44,7 @@
             int partition) {
         return new LSMBTreeDataflowHelper(opDesc, ctx, partition,
                 virtualBufferCacheProvider.getVirtualBufferCaches(ctx), bloomFilterFalsePositiveRate,
-                mergePolicyProvider.getMergePolicy(ctx), opTrackerFactory, ioSchedulerProvider.getIOScheduler(ctx),
-                ioOpCallbackFactory);
+                mergePolicyFactory.createMergePolicy(mergePolicyProperties), opTrackerFactory,
+                ioSchedulerProvider.getIOScheduler(ctx), ioOpCallbackFactory);
     }
 }
diff --git a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
index 33ae7bb..28f44e6 100644
--- a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
+++ b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
@@ -54,7 +54,6 @@
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMHarness;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackProvider;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessorInternal;
@@ -164,8 +163,7 @@
         }
 
         if (flushOnExit) {
-            BlockingIOOperationCallbackWrapper cb = new BlockingIOOperationCallbackWrapper(
-                    ioOpCallback);
+            BlockingIOOperationCallbackWrapper cb = new BlockingIOOperationCallbackWrapper(ioOpCallback);
             ILSMIndexAccessor accessor = createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
             accessor.scheduleFlush(cb);
             try {
@@ -242,10 +240,10 @@
     public void getOperationalComponents(ILSMIndexOperationContext ctx) {
         List<ILSMComponent> immutableComponents = diskComponents;
         List<ILSMComponent> operationalComponents = ctx.getComponentHolder();
-        operationalComponents.clear();
         int cmc = currentMutableComponentId.get();
         ctx.setCurrentMutableComponentId(cmc);
         int numMutableComponents = memoryComponents.size();
+        operationalComponents.clear();
         switch (ctx.getOperation()) {
             case UPDATE:
             case UPSERT:
@@ -256,6 +254,7 @@
                 break;
             case SEARCH:
             case INSERT:
+
                 for (int i = 0; i < numMutableComponents - 1; i++) {
                     ILSMComponent c = memoryComponents.get((cmc + i + 1) % numMutableComponents);
                     LSMBTreeMemoryComponent mutableComponent = (LSMBTreeMemoryComponent) c;
@@ -269,6 +268,9 @@
                 operationalComponents.addAll(immutableComponents);
                 break;
             case MERGE:
+                operationalComponents.addAll(ctx.getComponentsToBeMerged());
+                break;
+            case FULL_MERGE:
                 operationalComponents.addAll(immutableComponents);
                 break;
             default:
@@ -366,7 +368,8 @@
         opCtx.getComponentHolder().add(flushingComponent);
         ILSMIndexAccessorInternal flushAccessor = new LSMBTreeAccessor(lsmHarness, opCtx);
         ioScheduler.scheduleOperation(new LSMBTreeFlushOperation(flushAccessor, flushingComponent, componentFileRefs
-                .getInsertIndexFileReference(), componentFileRefs.getBloomFilterFileReference(), callback));
+                .getInsertIndexFileReference(), componentFileRefs.getBloomFilterFileReference(), callback, fileManager
+                .getBaseDir()));
     }
 
     @Override
@@ -423,8 +426,12 @@
         LSMBTreeOpContext opCtx = createOpContext(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
         opCtx.setOperation(IndexOperation.MERGE);
         List<ILSMComponent> mergingComponents = ctx.getComponentHolder();
-        ITreeIndexCursor cursor = new LSMBTreeRangeSearchCursor(opCtx);
-
+        boolean returnDeletedTuples = false;
+        if (ctx.getComponentHolder().get(ctx.getComponentHolder().size() - 1) != diskComponents.get(diskComponents
+                .size() - 1)) {
+            returnDeletedTuples = true;
+        }
+        ITreeIndexCursor cursor = new LSMBTreeRangeSearchCursor(opCtx, returnDeletedTuples);
         BTree firstBTree = (BTree) ((LSMBTreeDiskComponent) mergingComponents.get(0)).getBTree();
         BTree lastBTree = (BTree) ((LSMBTreeDiskComponent) mergingComponents.get(mergingComponents.size() - 1))
                 .getBTree();
@@ -434,7 +441,8 @@
                 .getName(), lastFile.getFile().getName());
         ILSMIndexAccessorInternal accessor = new LSMBTreeAccessor(lsmHarness, opCtx);
         ioScheduler.scheduleOperation(new LSMBTreeMergeOperation(accessor, mergingComponents, cursor, relMergeFileRefs
-                .getInsertIndexFileReference(), relMergeFileRefs.getBloomFilterFileReference(), callback));
+                .getInsertIndexFileReference(), relMergeFileRefs.getBloomFilterFileReference(), callback, fileManager
+                .getBaseDir()));
     }
 
     @Override
@@ -455,8 +463,8 @@
         int maxBucketsPerElement = BloomCalculations.maxBucketsPerElement(numElements);
         BloomFilterSpecification bloomFilterSpec = BloomCalculations.computeBloomSpec(maxBucketsPerElement,
                 bloomFilterFalsePositiveRate);
-        LSMBTreeDiskComponent mergedComponent = createDiskComponent(componentFactory,
-                mergeOp.getBTreeMergeTarget(), mergeOp.getBloomFilterMergeTarget(), true);
+        LSMBTreeDiskComponent mergedComponent = createDiskComponent(componentFactory, mergeOp.getBTreeMergeTarget(),
+                mergeOp.getBloomFilterMergeTarget(), true);
 
         IIndexBulkLoader bulkLoader = mergedComponent.getBTree().createBulkLoader(1.0f, false, numElements, false);
         IIndexBulkLoader builder = mergedComponent.getBloomFilter().createBuilder(numElements,
@@ -476,9 +484,8 @@
         return mergedComponent;
     }
 
-    private LSMBTreeDiskComponent createDiskComponent(LSMBTreeDiskComponentFactory factory,
-            FileReference btreeFileRef, FileReference bloomFilterFileRef, boolean createComponent)
-            throws HyracksDataException, IndexException {
+    private LSMBTreeDiskComponent createDiskComponent(LSMBTreeDiskComponentFactory factory, FileReference btreeFileRef,
+            FileReference bloomFilterFileRef, boolean createComponent) throws HyracksDataException, IndexException {
         // Create new BTree instance.
         LSMBTreeDiskComponent component = (LSMBTreeDiskComponent) factory
                 .createLSMComponentInstance(new LSMComponentFileReferences(btreeFileRef, null, bloomFilterFileRef));
@@ -541,8 +548,8 @@
             } catch (HyracksDataException | IndexException e) {
                 throw new TreeIndexException(e);
             }
-            bulkLoader = (BTreeBulkLoader) ((LSMBTreeDiskComponent) component).getBTree().createBulkLoader(
-                    fillFactor, verifyInput, numElementsHint, false);
+            bulkLoader = (BTreeBulkLoader) ((LSMBTreeDiskComponent) component).getBTree().createBulkLoader(fillFactor,
+                    verifyInput, numElementsHint, false);
 
             int maxBucketsPerElement = BloomCalculations.maxBucketsPerElement(numElementsHint);
             BloomFilterSpecification bloomFilterSpec = BloomCalculations.computeBloomSpec(maxBucketsPerElement,
diff --git a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java
index 0b2d7cf..381b012 100644
--- a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java
+++ b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java
@@ -43,4 +43,9 @@
     public BloomFilter getBloomFilter() {
         return bloomFilter;
     }
+
+    @Override
+    public long getComponentSize() {
+        return btree.getFileReference().getFile().length() + bloomFilter.getFileReference().getFile().length();
+    }
 }
diff --git a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeFlushOperation.java b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeFlushOperation.java
index 668e727..94b1569 100644
--- a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeFlushOperation.java
+++ b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeFlushOperation.java
@@ -27,21 +27,24 @@
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessorInternal;
 
-public class LSMBTreeFlushOperation implements ILSMIOOperation {
+public class LSMBTreeFlushOperation implements ILSMIOOperation, Comparable<LSMBTreeFlushOperation> {
 
     private final ILSMIndexAccessorInternal accessor;
     private final ILSMComponent flushingComponent;
     private final FileReference btreeFlushTarget;
     private final FileReference bloomFilterFlushTarget;
     private final ILSMIOOperationCallback callback;
+    private final String indexIdentifier;
 
     public LSMBTreeFlushOperation(ILSMIndexAccessorInternal accessor, ILSMComponent flushingComponent,
-            FileReference btreeFlushTarget, FileReference bloomFilterFlushTarget, ILSMIOOperationCallback callback) {
+            FileReference btreeFlushTarget, FileReference bloomFilterFlushTarget, ILSMIOOperationCallback callback,
+            String indexIdentifier) {
         this.accessor = accessor;
         this.flushingComponent = flushingComponent;
         this.btreeFlushTarget = btreeFlushTarget;
         this.bloomFilterFlushTarget = bloomFilterFlushTarget;
         this.callback = callback;
+        this.indexIdentifier = indexIdentifier;
     }
 
     @Override
@@ -83,4 +86,19 @@
     public ILSMComponent getFlushingComponent() {
         return flushingComponent;
     }
+
+    @Override
+    public String getIndexUniqueIdentifier() {
+        return indexIdentifier;
+    }
+
+    @Override
+    public LSMIOOpertionType getIOOpertionType() {
+        return LSMIOOpertionType.FLUSH;
+    }
+
+    @Override
+    public int compareTo(LSMBTreeFlushOperation o) {
+        return btreeFlushTarget.getFile().getName().compareTo(o.getBTreeFlushTarget().getFile().getName());
+    }
 }
diff --git a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeMergeOperation.java b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeMergeOperation.java
index 3a608fe..a4f6875 100644
--- a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeMergeOperation.java
+++ b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeMergeOperation.java
@@ -37,16 +37,18 @@
     private final FileReference btreeMergeTarget;
     private final FileReference bloomFilterMergeTarget;
     private final ILSMIOOperationCallback callback;
+    private final String indexIdentifier;
 
     public LSMBTreeMergeOperation(ILSMIndexAccessorInternal accessor, List<ILSMComponent> mergingComponents,
             ITreeIndexCursor cursor, FileReference btreeMergeTarget, FileReference bloomFilterMergeTarget,
-            ILSMIOOperationCallback callback) {
+            ILSMIOOperationCallback callback, String indexIdentifier) {
         this.accessor = accessor;
         this.mergingComponents = mergingComponents;
         this.cursor = cursor;
         this.btreeMergeTarget = btreeMergeTarget;
         this.bloomFilterMergeTarget = bloomFilterMergeTarget;
         this.callback = callback;
+        this.indexIdentifier = indexIdentifier;
     }
 
     @Override
@@ -94,4 +96,14 @@
     public List<ILSMComponent> getMergingComponents() {
         return mergingComponents;
     }
+
+    @Override
+    public String getIndexUniqueIdentifier() {
+        return indexIdentifier;
+    }
+
+    @Override
+    public LSMIOOpertionType getIOOpertionType() {
+        return LSMIOOpertionType.MERGE;
+    }
 }
diff --git a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeOpContext.java b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeOpContext.java
index 6d2d7c0..cb7bae7 100644
--- a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeOpContext.java
+++ b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeOpContext.java
@@ -48,6 +48,7 @@
     public final IModificationOperationCallback modificationCallback;
     public final ISearchOperationCallback searchCallback;
     private final List<ILSMComponent> componentHolder;
+    private final List<ILSMComponent> componentsToBeMerged;
 
     public LSMBTreeOpContext(List<ILSMComponent> mutableComponents, ITreeIndexFrameFactory insertLeafFrameFactory,
             ITreeIndexFrameFactory deleteLeafFrameFactory, IModificationOperationCallback modificationCallback,
@@ -84,6 +85,7 @@
             deleteLeafFrame.setMultiComparator(cmp);
         }
         this.componentHolder = new LinkedList<ILSMComponent>();
+        this.componentsToBeMerged = new LinkedList<ILSMComponent>();
         this.modificationCallback = modificationCallback;
         this.searchCallback = searchCallback;
     }
@@ -107,6 +109,7 @@
     @Override
     public void reset() {
         componentHolder.clear();
+        componentsToBeMerged.clear();
     }
 
     public IndexOperation getOperation() {
@@ -153,4 +156,9 @@
                 break;
         }
     }
+
+    @Override
+    public List<ILSMComponent> getComponentsToBeMerged() {
+        return componentsToBeMerged;
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
index 6eada4b..668a12c 100644
--- a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
+++ b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
@@ -49,7 +49,11 @@
     private boolean proceed = true;
 
     public LSMBTreeRangeSearchCursor(ILSMIndexOperationContext opCtx) {
-        super(opCtx);
+        this(opCtx, false);
+    }
+
+    public LSMBTreeRangeSearchCursor(ILSMIndexOperationContext opCtx, boolean returnDeletedTuples) {
+        super(opCtx, returnDeletedTuples);
         this.copyTuple = new ArrayTupleReference();
         this.reusablePred = new RangePredicate(null, null, true, true, null, null);
     }
@@ -126,7 +130,7 @@
                 }
                 // If there is no previous tuple or the previous tuple can be ignored
                 if (outputElement == null) {
-                    if (isDeleted(checkElement)) {
+                    if (isDeleted(checkElement) && !returnDeletedTuples) {
                         // If the key has been deleted then pop it and set needPush to true.
                         // We cannot push immediately because the tuple may be
                         // modified if hasNext() is called
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMHarness.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMHarness.java
index bc7cbf7..1903998 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMHarness.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMHarness.java
@@ -37,6 +37,9 @@
     public void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
             throws HyracksDataException, IndexException;
 
+    public void scheduleFullMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
+            throws HyracksDataException, IndexException;
+
     public void merge(ILSMIndexOperationContext ctx, ILSMIOOperation operation) throws HyracksDataException,
             IndexException;
 
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java
index 2c3940f..1638a5d 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java
@@ -22,6 +22,12 @@
 import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
 
 public interface ILSMIOOperation extends Callable<Boolean> {
+
+    public enum LSMIOOpertionType {
+        FLUSH,
+        MERGE
+    }
+
     public Set<IODeviceHandle> getReadDevices();
 
     public Set<IODeviceHandle> getWriteDevices();
@@ -29,4 +35,8 @@
     public Boolean call() throws HyracksDataException, IndexException;
 
     public ILSMIOOperationCallback getCallback();
+
+    public String getIndexUniqueIdentifier();
+
+    public LSMIOOpertionType getIOOpertionType();
 }
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
index 3405b60..36a2ca1 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
@@ -15,6 +15,8 @@
 
 package edu.uci.ics.hyracks.storage.am.lsm.common.api;
 
+import java.util.List;
+
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
@@ -30,7 +32,10 @@
 public interface ILSMIndexAccessor extends IIndexAccessor {
     public void scheduleFlush(ILSMIOOperationCallback callback) throws HyracksDataException;
 
-    public void scheduleMerge(ILSMIOOperationCallback callback) throws HyracksDataException, IndexException;
+    public void scheduleMerge(ILSMIOOperationCallback callback, List<ILSMComponent> components)
+            throws HyracksDataException, IndexException;
+
+    public void scheduleFullMerge(ILSMIOOperationCallback callback) throws HyracksDataException, IndexException;
 
     /**
      * Deletes the tuple from the memory component only.
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java
index fcd4037..80264bc 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java
@@ -22,6 +22,8 @@
 
 public interface ILSMIndexOperationContext extends IIndexOperationContext {
     public List<ILSMComponent> getComponentHolder();
+    
+    public List<ILSMComponent> getComponentsToBeMerged();
 
     public ISearchOperationCallback getSearchOperationCallback();
 
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMMergePolicy.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMMergePolicy.java
index 1473071..279605f 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMMergePolicy.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMMergePolicy.java
@@ -15,9 +15,14 @@
 
 package edu.uci.ics.hyracks.storage.am.lsm.common.api;
 
+import java.util.Map;
+
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
 
 public interface ILSMMergePolicy {
-    public void diskComponentAdded(ILSMIndex index) throws HyracksDataException, IndexException;
+    public void diskComponentAdded(ILSMIndex index, boolean fullMergeIsRequested) throws HyracksDataException,
+            IndexException;
+
+    public void configure(Map<String, String> properties);
 }
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMMergePolicyProvider.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMMergePolicyFactory.java
similarity index 73%
rename from hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMMergePolicyProvider.java
rename to hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMMergePolicyFactory.java
index cf56750..c90b5f7 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMMergePolicyProvider.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMMergePolicyFactory.java
@@ -15,9 +15,13 @@
 package edu.uci.ics.hyracks.storage.am.lsm.common.api;
 
 import java.io.Serializable;
+import java.util.Map;
+import java.util.Set;
 
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+public interface ILSMMergePolicyFactory extends Serializable {
+    public ILSMMergePolicy createMergePolicy(Map<String, String> configuration);
 
-public interface ILSMMergePolicyProvider extends Serializable {
-    public ILSMMergePolicy getMergePolicy(IHyracksTaskContext ctx);
-}
+    public String getName();
+
+    public Set<String> getPropertiesNames();
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/dataflow/AbstractLSMIndexDataflowHelperFactory.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/dataflow/AbstractLSMIndexDataflowHelperFactory.java
index 2c082bb..4276ba7 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/dataflow/AbstractLSMIndexDataflowHelperFactory.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/dataflow/AbstractLSMIndexDataflowHelperFactory.java
@@ -15,10 +15,12 @@
 
 package edu.uci.ics.hyracks.storage.am.lsm.common.dataflow;
 
+import java.util.Map;
+
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationSchedulerProvider;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerProvider;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.IVirtualBufferCacheProvider;
 
@@ -26,21 +28,23 @@
     protected static final long serialVersionUID = 1L;
 
     protected final IVirtualBufferCacheProvider virtualBufferCacheProvider;
-    protected final ILSMMergePolicyProvider mergePolicyProvider;
+    protected final ILSMMergePolicyFactory mergePolicyFactory;
+    protected final Map<String, String> mergePolicyProperties;
     protected final ILSMOperationTrackerProvider opTrackerFactory;
     protected final ILSMIOOperationSchedulerProvider ioSchedulerProvider;
     protected final ILSMIOOperationCallbackFactory ioOpCallbackFactory;
     protected final double bloomFilterFalsePositiveRate;
 
     public AbstractLSMIndexDataflowHelperFactory(IVirtualBufferCacheProvider virtualBufferCacheProvider,
-            ILSMMergePolicyProvider mergePolicyProvider, ILSMOperationTrackerProvider opTrackerFactory,
-            ILSMIOOperationSchedulerProvider ioSchedulerProvider, ILSMIOOperationCallbackFactory ioOpCallbackFactory,
-            double bloomFilterFalsePositiveRate) {
+            ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyProperties,
+            ILSMOperationTrackerProvider opTrackerFactory, ILSMIOOperationSchedulerProvider ioSchedulerProvider,
+            ILSMIOOperationCallbackFactory ioOpCallbackFactory, double bloomFilterFalsePositiveRate) {
         this.virtualBufferCacheProvider = virtualBufferCacheProvider;
-        this.mergePolicyProvider = mergePolicyProvider;
+        this.mergePolicyFactory = mergePolicyFactory;
         this.opTrackerFactory = opTrackerFactory;
         this.ioSchedulerProvider = ioSchedulerProvider;
         this.ioOpCallbackFactory = ioOpCallbackFactory;
         this.bloomFilterFalsePositiveRate = bloomFilterFalsePositiveRate;
+        this.mergePolicyProperties = mergePolicyProperties;
     }
 }
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/dataflow/LSMIndexCompactOperatorNodePushable.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/dataflow/LSMIndexCompactOperatorNodePushable.java
new file mode 100644
index 0000000..a61dec4
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/dataflow/LSMIndexCompactOperatorNodePushable.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.am.lsm.common.dataflow;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorNodePushable;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexDataflowHelper;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallback;
+
+public class LSMIndexCompactOperatorNodePushable extends AbstractOperatorNodePushable {
+    private final IIndexDataflowHelper indexHelper;
+
+    public LSMIndexCompactOperatorNodePushable(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx, int partition) {
+        this.indexHelper = opDesc.getIndexDataflowHelperFactory().createIndexDataflowHelper(opDesc, ctx, partition);
+    }
+
+    @Override
+    public void deinitialize() throws HyracksDataException {
+        indexHelper.close();
+    }
+
+    @Override
+    public int getInputArity() {
+        return 0;
+    }
+
+    @Override
+    public IFrameWriter getInputFrameWriter(int index) {
+        return null;
+    }
+
+    @Override
+    public void initialize() throws HyracksDataException {
+        indexHelper.open();
+        ILSMIndex index = (ILSMIndex) indexHelper.getIndexInstance();
+        ILSMIndexAccessor accessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
+                NoOpOperationCallback.INSTANCE);
+        try {
+            accessor.scheduleFullMerge(NoOpIOOperationCallback.INSTANCE);
+        } catch (Exception e) {
+            indexHelper.close();
+            throw new HyracksDataException(e);
+        }
+    }
+
+    @Override
+    public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
+    }
+}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/dataflow/LSMTreeIndexCompactOperatorDescriptor.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/dataflow/LSMTreeIndexCompactOperatorDescriptor.java
new file mode 100644
index 0000000..3c40f94
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/dataflow/LSMTreeIndexCompactOperatorDescriptor.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.common.dataflow;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
+import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
+import edu.uci.ics.hyracks.storage.common.file.NoOpLocalResourceFactoryProvider;
+
+public class LSMTreeIndexCompactOperatorDescriptor extends AbstractTreeIndexOperatorDescriptor {
+
+    private static final long serialVersionUID = 1L;
+
+    public LSMTreeIndexCompactOperatorDescriptor(IOperatorDescriptorRegistry spec,
+            IStorageManagerInterface storageManager, IIndexLifecycleManagerProvider lifecycleManagerProvider,
+            IFileSplitProvider fileSplitProvider, ITypeTraits[] typeTraits,
+            IBinaryComparatorFactory[] comparatorFactories, int[] bloomFilterKeyFields,
+            IIndexDataflowHelperFactory dataflowHelperFactory,
+            IModificationOperationCallbackFactory modificationOpCallbackProvider) {
+        super(spec, 0, 0, null, storageManager, lifecycleManagerProvider, fileSplitProvider, typeTraits,
+                comparatorFactories, bloomFilterKeyFields, dataflowHelperFactory, null, false,
+                NoOpLocalResourceFactoryProvider.INSTANCE, NoOpOperationCallbackFactory.INSTANCE,
+                modificationOpCallbackProvider);
+    }
+
+    @Override
+    public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+        return new LSMIndexCompactOperatorNodePushable(this, ctx, partition);
+    }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractDiskLSMComponent.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractDiskLSMComponent.java
index bc6baeb..ec12c23 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractDiskLSMComponent.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractDiskLSMComponent.java
@@ -85,4 +85,6 @@
 
     protected abstract void destroy() throws HyracksDataException;
 
+    public abstract long getComponentSize();
+
 }
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AsynchronousScheduler.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AsynchronousScheduler.java
index 25894f1..b785764 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AsynchronousScheduler.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AsynchronousScheduler.java
@@ -14,24 +14,87 @@
  */
 package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
 
+import java.util.HashMap;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOpertionType;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
 
 public class AsynchronousScheduler implements ILSMIOOperationScheduler {
+    // Since this is a asynchronous scheduler, we make sure that flush operations coming from the same lsm index
+    // will be executed serially in same order of scheduling the operations. Look at asterix issue 630.
+
     public final static AsynchronousScheduler INSTANCE = new AsynchronousScheduler();
     private ExecutorService executor;
+    private final Map<String, ILSMIOOperation> runningFlushOperations = new HashMap<String, ILSMIOOperation>();
+    private final Map<String, PriorityQueue<ILSMIOOperation>> waitingFlushOperations = new HashMap<String, PriorityQueue<ILSMIOOperation>>();
 
     public void init(ThreadFactory threadFactory) {
-        executor = Executors.newCachedThreadPool(threadFactory);
+        // Creating an executor with the same configuration of Executors.newCachedThreadPool. 
+        executor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
+                new SynchronousQueue<Runnable>(), threadFactory) {
+
+            @Override
+            protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
+                return new LSMIOOperationTask<T>(callable);
+            }
+
+            @SuppressWarnings("unchecked")
+            @Override
+            protected void afterExecute(Runnable r, Throwable t) {
+                super.afterExecute(r, t);
+                LSMIOOperationTask<Boolean> task = (LSMIOOperationTask<Boolean>) r;
+                ILSMIOOperation executedOp = task.getOperation();
+                String id = executedOp.getIndexUniqueIdentifier();
+                synchronized (this) {
+                    runningFlushOperations.remove(id);
+                    if (waitingFlushOperations.containsKey(id)) {
+                        try {
+                            ILSMIOOperation op = waitingFlushOperations.get(id).poll();
+                            if (op != null) {
+                                scheduleOperation(op);
+                            } else {
+                                waitingFlushOperations.remove(id);
+                            }
+                        } catch (HyracksDataException e) {
+                            t = e.getCause();
+                        }
+                    }
+                }
+            }
+        };
     }
 
     @Override
     public void scheduleOperation(ILSMIOOperation operation) throws HyracksDataException {
-        executor.submit(operation);
+        if (operation.getIOOpertionType() == LSMIOOpertionType.MERGE) {
+            executor.submit(operation);
+        } else {
+            String id = operation.getIndexUniqueIdentifier();
+            synchronized (executor) {
+                if (runningFlushOperations.containsKey(id)) {
+                    if (waitingFlushOperations.containsKey(id)) {
+                        waitingFlushOperations.get(id).offer(operation);
+                    } else {
+                        PriorityQueue<ILSMIOOperation> q = new PriorityQueue<ILSMIOOperation>();
+                        q.offer(operation);
+                        waitingFlushOperations.put(id, q);
+                    }
+                } else {
+                    runningFlushOperations.put(id, operation);
+                    executor.submit(operation);
+                }
+            }
+        }
     }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java
index b6f5657..8a6b8bd 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java
@@ -15,27 +15,43 @@
 
 package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
 
+import java.util.List;
+import java.util.Map;
+
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
 import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
 
 public class ConstantMergePolicy implements ILSMMergePolicy {
+    private int numComponents;
 
-    private final int threshold;
-
-    public ConstantMergePolicy(int threshold) {
-        this.threshold = threshold;
+    @Override
+    public void diskComponentAdded(final ILSMIndex index, boolean fullMergeIsRequested) throws HyracksDataException,
+            IndexException {
+        List<ILSMComponent> immutableComponents = index.getImmutableComponents();
+        for (ILSMComponent c : immutableComponents) {
+            if (c.getState() != ComponentState.READABLE_UNWRITABLE) {
+                return;
+            }
+        }
+        if (fullMergeIsRequested) {
+            ILSMIndexAccessor accessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
+                    NoOpOperationCallback.INSTANCE);
+            accessor.scheduleFullMerge(index.getIOOperationCallback());
+        } else if (immutableComponents.size() >= numComponents) {
+            ILSMIndexAccessor accessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
+                    NoOpOperationCallback.INSTANCE);
+            accessor.scheduleMerge(index.getIOOperationCallback(), immutableComponents);
+        }
     }
 
     @Override
-    public void diskComponentAdded(final ILSMIndex index) throws HyracksDataException, IndexException {
-        if (index.getImmutableComponents().size() >= threshold) {
-            ILSMIndexAccessor accessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
-                    NoOpOperationCallback.INSTANCE);
-            accessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE);
-        }
+    public void configure(Map<String, String> properties) {
+        numComponents = Integer.parseInt(properties.get("num-components"));
     }
 }
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicyFactory.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicyFactory.java
new file mode 100644
index 0000000..13f5ad9
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicyFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
+
+public class ConstantMergePolicyFactory implements ILSMMergePolicyFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final String[] SET_VALUES = new String[] { "num-components" };
+    private static final Set<String> PROPERTIES_NAMES = new HashSet<String>(Arrays.asList(SET_VALUES));
+
+    @Override
+    public ILSMMergePolicy createMergePolicy(Map<String, String> properties) {
+        ILSMMergePolicy policy = new ConstantMergePolicy();
+        policy.configure(properties);
+        return policy;
+    }
+
+    @Override
+    public String getName() {
+        return "constant";
+    }
+
+    @Override
+    public Set<String> getPropertiesNames() {
+        return PROPERTIES_NAMES;
+    }
+}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicyProvider.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicyProvider.java
deleted file mode 100644
index a7383c1..0000000
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicyProvider.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
-
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyProvider;
-
-public class ConstantMergePolicyProvider implements ILSMMergePolicyProvider {
-
-    private static final long serialVersionUID = 1L;
-
-    private final int threshold;
-
-    public ConstantMergePolicyProvider(int threshold) {
-        this.threshold = threshold;
-    }
-
-    @Override
-    public ILSMMergePolicy getMergePolicy(IHyracksTaskContext ctx) {
-        return new ConstantMergePolicy(threshold);
-    }
-
-}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index ca775b7..443ad2b 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -16,6 +16,7 @@
 package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
 
 import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -41,11 +42,13 @@
     private final ILSMIndexInternal lsmIndex;
     private final ILSMMergePolicy mergePolicy;
     private final ILSMOperationTracker opTracker;
+    private final AtomicBoolean fullMergeIsRequested;
 
     public LSMHarness(ILSMIndexInternal lsmIndex, ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker) {
         this.lsmIndex = lsmIndex;
         this.opTracker = opTracker;
         this.mergePolicy = mergePolicy;
+        fullMergeIsRequested = new AtomicBoolean();
     }
 
     private boolean getAndEnterComponents(ILSMIndexOperationContext ctx, LSMOperationType opType, boolean isTryOperation)
@@ -180,13 +183,14 @@
                         // newComponent is null if the flush op. was not performed.
                         if (newComponent != null) {
                             lsmIndex.addComponent(newComponent);
-                            mergePolicy.diskComponentAdded(lsmIndex);
+                            mergePolicy.diskComponentAdded(lsmIndex, false);
                         }
                         break;
                     case MERGE:
                         // newComponent is null if the merge op. was not performed.
                         if (newComponent != null) {
                             lsmIndex.subsumeMergedComponents(newComponent, ctx.getComponentHolder());
+                            mergePolicy.diskComponentAdded(lsmIndex, fullMergeIsRequested.get());
                         }
                         break;
                     default:
@@ -287,7 +291,6 @@
     @Override
     public void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
             throws HyracksDataException, IndexException {
-        // Merge should always be a try operation, because it should never fail to enter the components unless the merge policy is erroneous.
         if (!getAndEnterComponents(ctx, LSMOperationType.MERGE, true)) {
             callback.afterFinalize(LSMOperationType.MERGE, null);
             return;
@@ -296,6 +299,20 @@
     }
 
     @Override
+    public void scheduleFullMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
+            throws HyracksDataException, IndexException {
+        fullMergeIsRequested.set(true);
+        if (!getAndEnterComponents(ctx, LSMOperationType.MERGE, true)) {
+            // If the merge cannot be scheduled because there is already an ongoing merge on subset/all of the components, then
+            // whenever the current merge has finished, it will schedule the full merge again.
+            callback.afterFinalize(LSMOperationType.MERGE, null);
+            return;
+        }
+        fullMergeIsRequested.set(false);
+        lsmIndex.scheduleMerge(ctx, callback);
+    }
+
+    @Override
     public void merge(ILSMIndexOperationContext ctx, ILSMIOOperation operation) throws HyracksDataException,
             IndexException {
         if (LOGGER.isLoggable(Level.INFO)) {
@@ -320,7 +337,7 @@
     public void addBulkLoadedComponent(ILSMComponent c) throws HyracksDataException, IndexException {
         lsmIndex.markAsValid(c);
         lsmIndex.addComponent(c);
-        mergePolicy.diskComponentAdded(lsmIndex);
+        mergePolicy.diskComponentAdded(lsmIndex, false);
     }
 
     @Override
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMIOOperationTask.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMIOOperationTask.java
new file mode 100644
index 0000000..9743afe
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMIOOperationTask.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.FutureTask;
+
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
+
+public class LSMIOOperationTask<T> extends FutureTask<T> {
+    private final ILSMIOOperation operation;
+
+    public LSMIOOperationTask(Callable<T> callable) {
+        super(callable);
+        this.operation = (ILSMIOOperation) callable;
+    }
+
+    public ILSMIOOperation getOperation() {
+        return operation;
+    }
+}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java
index 45cc69b..2bc45a9 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java
@@ -42,11 +42,13 @@
     protected boolean includeMutableComponent;
     protected ILSMHarness lsmHarness;
     protected final ILSMIndexOperationContext opCtx;
+    protected final boolean returnDeletedTuples;
 
     protected List<ILSMComponent> operationalComponents;
 
-    public LSMIndexSearchCursor(ILSMIndexOperationContext opCtx) {
+    public LSMIndexSearchCursor(ILSMIndexOperationContext opCtx, boolean returnDeletedTuples) {
         this.opCtx = opCtx;
+        this.returnDeletedTuples = returnDeletedTuples;
         outputElement = null;
         needPush = false;
     }
@@ -110,14 +112,14 @@
 
     @Override
     public void close() throws HyracksDataException {
-        if (lsmHarness != null) {
-            try {
-                outputPriorityQueue.clear();
-                for (int i = 0; i < rangeCursors.length; i++) {
-                    rangeCursors[i].close();
-                }
-                rangeCursors = null;
-            } finally {
+        try {
+            outputPriorityQueue.clear();
+            for (int i = 0; i < rangeCursors.length; i++) {
+                rangeCursors[i].close();
+            }
+            rangeCursors = null;
+        } finally {
+            if (lsmHarness != null) {
                 lsmHarness.endSearch(opCtx);
             }
         }
@@ -154,7 +156,50 @@
         return ((ILSMTreeTupleReference) checkElement.getTuple()).isAntimatter();
     }
 
-    abstract protected void checkPriorityQueue() throws HyracksDataException, IndexException;
+    protected void checkPriorityQueue() throws HyracksDataException, IndexException {
+        while (!outputPriorityQueue.isEmpty() || needPush == true) {
+            if (!outputPriorityQueue.isEmpty()) {
+                PriorityQueueElement checkElement = outputPriorityQueue.peek();
+                // If there is no previous tuple or the previous tuple can be ignored
+                if (outputElement == null) {
+                    if (isDeleted(checkElement) && !returnDeletedTuples) {
+                        // If the key has been deleted then pop it and set needPush to true.
+                        // We cannot push immediately because the tuple may be
+                        // modified if hasNext() is called
+                        outputElement = outputPriorityQueue.poll();
+                        needPush = true;
+                    } else {
+                        break;
+                    }
+                } else {
+                    // Compare the previous tuple and the head tuple in the PQ
+                    if (compare(cmp, outputElement.getTuple(), checkElement.getTuple()) == 0) {
+                        // If the previous tuple and the head tuple are
+                        // identical
+                        // then pop the head tuple and push the next tuple from
+                        // the tree of head tuple
+
+                        // the head element of PQ is useless now
+                        PriorityQueueElement e = outputPriorityQueue.poll();
+                        pushIntoPriorityQueue(e);
+                    } else {
+                        // If the previous tuple and the head tuple are different
+                        // the info of previous tuple is useless
+                        if (needPush == true) {
+                            pushIntoPriorityQueue(outputElement);
+                            needPush = false;
+                        }
+                        outputElement = null;
+                    }
+                }
+            } else {
+                // the priority queue is empty and needPush
+                pushIntoPriorityQueue(outputElement);
+                needPush = false;
+                outputElement = null;
+            }
+        }
+    }
 
     @Override
     public boolean exclusiveLatchNodes() {
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
index f11a061..c828bd2 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
@@ -15,12 +15,15 @@
 
 package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
 
+import java.util.List;
+
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
 import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
 import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMHarness;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
@@ -116,12 +119,21 @@
     }
 
     @Override
-    public void scheduleMerge(ILSMIOOperationCallback callback) throws HyracksDataException, IndexException {
+    public void scheduleMerge(ILSMIOOperationCallback callback, List<ILSMComponent> components)
+            throws HyracksDataException, IndexException {
         ctx.setOperation(IndexOperation.MERGE);
+        ctx.getComponentsToBeMerged().clear();
+        ctx.getComponentsToBeMerged().addAll(components);
         lsmHarness.scheduleMerge(ctx, callback);
     }
 
     @Override
+    public void scheduleFullMerge(ILSMIOOperationCallback callback) throws HyracksDataException, IndexException {
+        ctx.setOperation(IndexOperation.FULL_MERGE);
+        lsmHarness.scheduleFullMerge(ctx, callback);
+    }
+
+    @Override
     public void forcePhysicalDelete(ITupleReference tuple) throws HyracksDataException, IndexException {
         ctx.setOperation(IndexOperation.PHYSICALDELETE);
         lsmHarness.forceModify(ctx, tuple);
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/NoMergePolicy.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/NoMergePolicy.java
index 17d1b17..ca22268 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/NoMergePolicy.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/NoMergePolicy.java
@@ -14,6 +14,8 @@
  */
 package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
 
+import java.util.Map;
+
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
 
@@ -21,8 +23,12 @@
     INSTANCE;
 
     @Override
-    public void diskComponentAdded(ILSMIndex index) {
+    public void diskComponentAdded(ILSMIndex index, boolean fullMergeIsRequested) {
         // Do nothing
     }
 
+    @Override
+    public void configure(Map<String, String> properties) {
+        // Do nothing
+    }
 }
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java
new file mode 100644
index 0000000..fe04db6
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
+
+public class PrefixMergePolicy implements ILSMMergePolicy {
+
+    private long maxMergableComponentSize;
+    private int maxTolernaceComponentCount;
+
+    @Override
+    public void diskComponentAdded(final ILSMIndex index, boolean fullMergeIsRequested) throws HyracksDataException,
+            IndexException {
+        // 1.  Look at the candidate components for merging in oldest-first order.  If one exists, identify the prefix of the sequence of
+        // all such components for which the sum of their sizes exceeds MaxMrgCompSz.  Schedule a merge of those components into a new component.
+        // 2.  If a merge from 1 doesn't happen, see if the set of candidate components for merging exceeds MaxTolCompCnt.  If so, schedule
+        // a merge all of the current candidates into a new single component.
+        List<ILSMComponent> immutableComponents = index.getImmutableComponents();
+        for (ILSMComponent c : immutableComponents) {
+            if (c.getState() != ComponentState.READABLE_UNWRITABLE) {
+                return;
+            }
+        }
+        if (fullMergeIsRequested) {
+            ILSMIndexAccessor accessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
+                    NoOpOperationCallback.INSTANCE);
+            accessor.scheduleFullMerge(index.getIOOperationCallback());
+            return;
+        }
+        long totalSize = 0;
+        int startIndex = -1;
+        for (int i = 0; i < immutableComponents.size(); i++) {
+            ILSMComponent c = immutableComponents.get(i);
+            long componentSize = ((AbstractDiskLSMComponent) c).getComponentSize();
+            if (componentSize > maxMergableComponentSize) {
+                startIndex = i;
+                totalSize = 0;
+                continue;
+            }
+            totalSize += componentSize;
+            boolean isLastComponent = i + 1 == immutableComponents.size() ? true : false;
+            if (totalSize > maxMergableComponentSize
+                    || (isLastComponent && i - startIndex >= maxTolernaceComponentCount)) {
+                List<ILSMComponent> mergableCopments = new ArrayList<ILSMComponent>();
+                for (int j = startIndex + 1; j <= i; j++) {
+                    mergableCopments.add(immutableComponents.get(j));
+                }
+                ILSMIndexAccessor accessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
+                        NoOpOperationCallback.INSTANCE);
+                accessor.scheduleMerge(index.getIOOperationCallback(), mergableCopments);
+                break;
+            }
+        }
+    }
+
+    @Override
+    public void configure(Map<String, String> properties) {
+        maxMergableComponentSize = Long.parseLong(properties.get("max-mergable-component-size"));
+        maxTolernaceComponentCount = Integer.parseInt(properties.get("max-tolernace-component-count"));
+    }
+}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/PrefixMergePolicyFactory.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/PrefixMergePolicyFactory.java
new file mode 100644
index 0000000..981ec6c
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/PrefixMergePolicyFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
+
+public class PrefixMergePolicyFactory implements ILSMMergePolicyFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final String[] SET_VALUES = new String[] { "max-mergable-component-size",
+            "max-tolernace-component-count" };
+    private static final Set<String> PROPERTIES_NAMES = new HashSet<String>(Arrays.asList(SET_VALUES));
+
+    @Override
+    public ILSMMergePolicy createMergePolicy(Map<String, String> properties) {
+        ILSMMergePolicy policy = new PrefixMergePolicy();
+        policy.configure(properties);
+        return policy;
+    }
+
+    @Override
+    public String getName() {
+        return "prefix";
+    }
+
+    @Override
+    public Set<String> getPropertiesNames() {
+        return PROPERTIES_NAMES;
+    }
+}
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexCompactOperator.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexCompactOperator.java
new file mode 100644
index 0000000..22e7505
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexCompactOperator.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.common.dataflow.LSMIndexCompactOperatorNodePushable;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory;
+import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
+import edu.uci.ics.hyracks.storage.common.file.NoOpLocalResourceFactoryProvider;
+
+public class LSMInvertedIndexCompactOperator extends AbstractLSMInvertedIndexOperatorDescriptor {
+
+    private static final long serialVersionUID = 1L;
+
+    public LSMInvertedIndexCompactOperator(IOperatorDescriptorRegistry spec, IStorageManagerInterface storageManager,
+            IFileSplitProvider fileSplitProvider, IIndexLifecycleManagerProvider lifecycleManagerProvider,
+            ITypeTraits[] tokenTypeTraits, IBinaryComparatorFactory[] tokenComparatorFactories,
+            ITypeTraits[] invListsTypeTraits, IBinaryComparatorFactory[] invListComparatorFactories,
+            IBinaryTokenizerFactory tokenizerFactory, IIndexDataflowHelperFactory dataflowHelperFactory,
+            IModificationOperationCallbackFactory modificationOpCallbackFactory) {
+        super(spec, 1, 1, null, storageManager, fileSplitProvider, lifecycleManagerProvider, tokenTypeTraits,
+                tokenComparatorFactories, invListsTypeTraits, invListComparatorFactories, tokenizerFactory,
+                dataflowHelperFactory, null, false, NoOpLocalResourceFactoryProvider.INSTANCE,
+                NoOpOperationCallbackFactory.INSTANCE, modificationOpCallbackFactory);
+    }
+
+    @Override
+    public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+        return new LSMIndexCompactOperatorNodePushable(this, ctx, partition);
+    }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexDataflowHelperFactory.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexDataflowHelperFactory.java
index 84c7150..c018f16 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexDataflowHelperFactory.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexDataflowHelperFactory.java
@@ -15,12 +15,14 @@
 
 package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow;
 
+import java.util.Map;
+
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexDataflowHelper;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationSchedulerProvider;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerProvider;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.IVirtualBufferCacheProvider;
 import edu.uci.ics.hyracks.storage.am.lsm.common.dataflow.AbstractLSMIndexDataflowHelperFactory;
@@ -30,11 +32,11 @@
     private static final long serialVersionUID = 1L;
 
     public LSMInvertedIndexDataflowHelperFactory(IVirtualBufferCacheProvider virtualBufferCacheProvider,
-            ILSMMergePolicyProvider mergePolicyProvider, ILSMOperationTrackerProvider opTrackerProvider,
-            ILSMIOOperationSchedulerProvider ioSchedulerProvider, ILSMIOOperationCallbackFactory ioOpCallbackFactory,
-            double bloomFilterFalsePositiveRate) {
-        super(virtualBufferCacheProvider, mergePolicyProvider, opTrackerProvider, ioSchedulerProvider,
-                ioOpCallbackFactory, bloomFilterFalsePositiveRate);
+            ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyProperties,
+            ILSMOperationTrackerProvider opTrackerProvider, ILSMIOOperationSchedulerProvider ioSchedulerProvider,
+            ILSMIOOperationCallbackFactory ioOpCallbackFactory, double bloomFilterFalsePositiveRate) {
+        super(virtualBufferCacheProvider, mergePolicyFactory, mergePolicyProperties, opTrackerProvider,
+                ioSchedulerProvider, ioOpCallbackFactory, bloomFilterFalsePositiveRate);
     }
 
     @Override
@@ -42,8 +44,8 @@
             int partition) {
         return new LSMInvertedIndexDataflowHelper(opDesc, ctx, partition,
                 virtualBufferCacheProvider.getVirtualBufferCaches(ctx), bloomFilterFalsePositiveRate,
-                mergePolicyProvider.getMergePolicy(ctx), opTrackerFactory, ioSchedulerProvider.getIOScheduler(ctx),
-                ioOpCallbackFactory);
+                mergePolicyFactory.createMergePolicy(mergePolicyProperties), opTrackerFactory,
+                ioSchedulerProvider.getIOScheduler(ctx), ioOpCallbackFactory);
     }
 
 }
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/PartitionedLSMInvertedIndexDataflowHelperFactory.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/PartitionedLSMInvertedIndexDataflowHelperFactory.java
index d78ae7e..aef0863 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/PartitionedLSMInvertedIndexDataflowHelperFactory.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/PartitionedLSMInvertedIndexDataflowHelperFactory.java
@@ -15,12 +15,14 @@
 
 package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow;
 
+import java.util.Map;
+
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexDataflowHelper;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationSchedulerProvider;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerProvider;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.IVirtualBufferCacheProvider;
 import edu.uci.ics.hyracks.storage.am.lsm.common.dataflow.AbstractLSMIndexDataflowHelperFactory;
@@ -30,11 +32,11 @@
     private static final long serialVersionUID = 1L;
 
     public PartitionedLSMInvertedIndexDataflowHelperFactory(IVirtualBufferCacheProvider virtualBufferCacheProvider,
-            ILSMMergePolicyProvider mergePolicyProvider, ILSMOperationTrackerProvider opTrackerProvider,
-            ILSMIOOperationSchedulerProvider ioSchedulerProvider, ILSMIOOperationCallbackFactory ioOpCallbackFactory,
-            double bloomFilterFalsePositiveRate) {
-        super(virtualBufferCacheProvider, mergePolicyProvider, opTrackerProvider, ioSchedulerProvider,
-                ioOpCallbackFactory, bloomFilterFalsePositiveRate);
+            ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyProperties,
+            ILSMOperationTrackerProvider opTrackerProvider, ILSMIOOperationSchedulerProvider ioSchedulerProvider,
+            ILSMIOOperationCallbackFactory ioOpCallbackFactory, double bloomFilterFalsePositiveRate) {
+        super(virtualBufferCacheProvider, mergePolicyFactory, mergePolicyProperties, opTrackerProvider,
+                ioSchedulerProvider, ioOpCallbackFactory, bloomFilterFalsePositiveRate);
     }
 
     @Override
@@ -42,7 +44,7 @@
             int partition) {
         return new PartitionedLSMInvertedIndexDataflowHelper(opDesc, ctx, partition,
                 virtualBufferCacheProvider.getVirtualBufferCaches(ctx), bloomFilterFalsePositiveRate,
-                mergePolicyProvider.getMergePolicy(ctx), opTrackerFactory, ioSchedulerProvider.getIOScheduler(ctx),
-                ioOpCallbackFactory);
+                mergePolicyFactory.createMergePolicy(mergePolicyProperties), opTrackerFactory,
+                ioSchedulerProvider.getIOScheduler(ctx), ioOpCallbackFactory);
     }
 }
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
index cef6aee..93c5a58 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
@@ -267,10 +267,10 @@
     public void getOperationalComponents(ILSMIndexOperationContext ctx) {
         List<ILSMComponent> immutableComponents = diskComponents;
         List<ILSMComponent> operationalComponents = ctx.getComponentHolder();
-        operationalComponents.clear();
         int cmc = currentMutableComponentId.get();
         ctx.setCurrentMutableComponentId(cmc);
         int numMutableComponents = memoryComponents.size();
+        operationalComponents.clear();
         switch (ctx.getOperation()) {
             case FLUSH:
             case DELETE:
@@ -291,8 +291,10 @@
                 operationalComponents.addAll(immutableComponents);
                 break;
             case MERGE:
-                operationalComponents.addAll(immutableComponents);
+                operationalComponents.addAll(ctx.getComponentsToBeMerged());
                 break;
+            case FULL_MERGE:
+                operationalComponents.addAll(immutableComponents);
             default:
                 throw new UnsupportedOperationException("Operation " + ctx.getOperation() + " not supported.");
         }
@@ -436,7 +438,7 @@
         ioScheduler.scheduleOperation(new LSMInvertedIndexFlushOperation(
                 new LSMInvertedIndexAccessor(lsmHarness, opCtx), flushingComponent, componentFileRefs
                         .getInsertIndexFileReference(), componentFileRefs.getDeleteIndexFileReference(),
-                componentFileRefs.getBloomFilterFileReference(), callback));
+                componentFileRefs.getBloomFilterFileReference(), callback, fileManager.getBaseDir()));
     }
 
     @Override
@@ -541,7 +543,7 @@
         ILSMIndexAccessorInternal accessor = new LSMInvertedIndexAccessor(lsmHarness, ctx);
         ioScheduler.scheduleOperation(new LSMInvertedIndexMergeOperation(accessor, mergingComponents, cursor,
                 relMergeFileRefs.getInsertIndexFileReference(), relMergeFileRefs.getDeleteIndexFileReference(),
-                relMergeFileRefs.getBloomFilterFileReference(), callback));
+                relMergeFileRefs.getBloomFilterFileReference(), callback, fileManager.getBaseDir()));
     }
 
     @Override
@@ -561,6 +563,31 @@
                 mergeOp.getBloomFilterMergeTarget(), true);
 
         IInvertedIndex mergedDiskInvertedIndex = component.getInvIndex();
+
+        // In case we must keep the deleted-keys BTrees, then they must be merged *before* merging the inverted indexes so that
+        // lsmHarness.endSearch() is called once when the inverted indexes have been merged.
+        if (mergeOp.getMergingComponents().get(mergeOp.getMergingComponents().size() - 1) != diskComponents
+                .get(diskComponents.size() - 1)) {
+            // Keep the deleted tuples since the oldest disk component is not included in the merge operation
+
+            LSMInvertedIndexDeletedKeysBTreeMergeCursor btreeCursor = new LSMInvertedIndexDeletedKeysBTreeMergeCursor(
+                    opCtx);
+            search(opCtx, btreeCursor, mergePred);
+
+            BTree btree = component.getDeletedKeysBTree();
+            IIndexBulkLoader btreeBulkLoader = btree.createBulkLoader(1.0f, true, 0L, false);
+            try {
+                while (btreeCursor.hasNext()) {
+                    btreeCursor.next();
+                    ITupleReference tuple = btreeCursor.getTuple();
+                    btreeBulkLoader.add(tuple);
+                }
+            } finally {
+                btreeCursor.close();
+            }
+            btreeBulkLoader.end();
+        }
+
         IIndexBulkLoader invIndexBulkLoader = mergedDiskInvertedIndex.createBulkLoader(1.0f, true, 0L, false);
         try {
             while (cursor.hasNext()) {
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
index e31af9a..7e34dfe 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
@@ -15,12 +15,15 @@
 
 package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.impls;
 
+import java.util.List;
+
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
 import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
 import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMHarness;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
@@ -84,12 +87,21 @@
     }
 
     @Override
-    public void scheduleMerge(ILSMIOOperationCallback callback) throws HyracksDataException, IndexException {
+    public void scheduleMerge(ILSMIOOperationCallback callback, List<ILSMComponent> components)
+            throws HyracksDataException, IndexException {
         ctx.setOperation(IndexOperation.MERGE);
+        ctx.getComponentsToBeMerged().clear();
+        ctx.getComponentsToBeMerged().addAll(components);
         lsmHarness.scheduleMerge(ctx, callback);
     }
 
     @Override
+    public void scheduleFullMerge(ILSMIOOperationCallback callback) throws HyracksDataException, IndexException {
+        ctx.setOperation(IndexOperation.FULL_MERGE);
+        lsmHarness.scheduleFullMerge(ctx, callback);
+    }
+
+    @Override
     public void merge(ILSMIOOperation operation) throws HyracksDataException, IndexException {
         lsmHarness.merge(ctx, operation);
     }
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDeletedKeysBTreeMergeCursor.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDeletedKeysBTreeMergeCursor.java
new file mode 100644
index 0000000..3efaaba
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDeletedKeysBTreeMergeCursor.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.impls;
+
+import java.util.ArrayList;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
+import edu.uci.ics.hyracks.storage.am.common.api.ICursorInitialState;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMIndexSearchCursor;
+
+public class LSMInvertedIndexDeletedKeysBTreeMergeCursor extends LSMIndexSearchCursor {
+
+    public LSMInvertedIndexDeletedKeysBTreeMergeCursor(ILSMIndexOperationContext opCtx) {
+        super(opCtx, true);
+    }
+
+    @Override
+    protected boolean isDeleted(PriorityQueueElement checkElement) throws HyracksDataException, IndexException {
+        return false;
+    }
+
+    @Override
+    public void open(ICursorInitialState initialState, ISearchPredicate searchPred) throws HyracksDataException,
+            IndexException {
+        LSMInvertedIndexRangeSearchCursorInitialState lsmInitialState = (LSMInvertedIndexRangeSearchCursorInitialState) initialState;
+        cmp = lsmInitialState.getOriginalKeyComparator();
+        operationalComponents = lsmInitialState.getOperationalComponents();
+        // We intentionally set the lsmHarness to null so that we don't call lsmHarness.endSearch() because we already do that when we merge the inverted indexes.
+        lsmHarness = null;
+        int numBTrees = operationalComponents.size();
+        rangeCursors = new IIndexCursor[numBTrees];
+
+        MultiComparator keyCmp = lsmInitialState.getKeyComparator();
+        RangePredicate btreePredicate = new RangePredicate(null, null, true, true, keyCmp, keyCmp);
+        ArrayList<IIndexAccessor> btreeAccessors = lsmInitialState.getDeletedKeysBTreeAccessors();
+        for (int i = 0; i < numBTrees; i++) {
+            rangeCursors[i] = btreeAccessors.get(i).createSearchCursor();
+            btreeAccessors.get(i).search(rangeCursors[i], btreePredicate);
+        }
+        setPriorityQueueComparator();
+        initPriorityQueue();
+    }
+}
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponent.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponent.java
index 323edd1..446e807 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponent.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponent.java
@@ -19,6 +19,7 @@
 import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.AbstractDiskLSMComponent;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.OnDiskInvertedIndex;
 
 public class LSMInvertedIndexDiskComponent extends AbstractDiskLSMComponent {
 
@@ -53,4 +54,12 @@
     public BloomFilter getBloomFilter() {
         return bloomFilter;
     }
+
+    @Override
+    public long getComponentSize() {
+        return ((OnDiskInvertedIndex) invIndex).getInvListsFile().getFile().length()
+                + ((OnDiskInvertedIndex) invIndex).getBTree().getFileReference().getFile().length()
+                + deletedKeysBTree.getFileReference().getFile().length()
+                + bloomFilter.getFileReference().getFile().length();
+    }
 }
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFlushOperation.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFlushOperation.java
index 45433e7..beb7643 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFlushOperation.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFlushOperation.java
@@ -28,23 +28,25 @@
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessorInternal;
 
-public class LSMInvertedIndexFlushOperation implements ILSMIOOperation {
+public class LSMInvertedIndexFlushOperation implements ILSMIOOperation, Comparable<LSMInvertedIndexFlushOperation> {
     private final ILSMIndexAccessorInternal accessor;
     private final ILSMComponent flushingComponent;
     private final FileReference dictBTreeFlushTarget;
     private final FileReference deletedKeysBTreeFlushTarget;
     private final FileReference bloomFilterFlushTarget;
     private final ILSMIOOperationCallback callback;
+    private final String indexIdentifier;
 
     public LSMInvertedIndexFlushOperation(ILSMIndexAccessorInternal accessor, ILSMComponent flushingComponent,
             FileReference dictBTreeFlushTarget, FileReference deletedKeysBTreeFlushTarget,
-            FileReference bloomFilterFlushTarget, ILSMIOOperationCallback callback) {
+            FileReference bloomFilterFlushTarget, ILSMIOOperationCallback callback, String indexIdentifier) {
         this.accessor = accessor;
         this.flushingComponent = flushingComponent;
         this.dictBTreeFlushTarget = dictBTreeFlushTarget;
         this.deletedKeysBTreeFlushTarget = deletedKeysBTreeFlushTarget;
         this.bloomFilterFlushTarget = bloomFilterFlushTarget;
         this.callback = callback;
+        this.indexIdentifier = indexIdentifier;
     }
 
     @Override
@@ -88,4 +90,19 @@
     public ILSMComponent getFlushingComponent() {
         return flushingComponent;
     }
+
+    @Override
+    public String getIndexUniqueIdentifier() {
+        return indexIdentifier;
+    }
+
+    @Override
+    public LSMIOOpertionType getIOOpertionType() {
+        return LSMIOOpertionType.FLUSH;
+    }
+
+    @Override
+    public int compareTo(LSMInvertedIndexFlushOperation o) {
+        return dictBTreeFlushTarget.getFile().getName().compareTo(o.getDictBTreeFlushTarget().getFile().getName());
+    }
 }
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMergeOperation.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMergeOperation.java
index 7cd921a..5a1e413 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMergeOperation.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMergeOperation.java
@@ -38,10 +38,11 @@
     private final FileReference deletedKeysBTreeMergeTarget;
     private final FileReference bloomFilterMergeTarget;
     private final ILSMIOOperationCallback callback;
+    private final String indexIdentifier;
 
     public LSMInvertedIndexMergeOperation(ILSMIndexAccessorInternal accessor, List<ILSMComponent> mergingComponents,
             IIndexCursor cursor, FileReference dictBTreeMergeTarget, FileReference deletedKeysBTreeMergeTarget,
-            FileReference bloomFilterMergeTarget, ILSMIOOperationCallback callback) {
+            FileReference bloomFilterMergeTarget, ILSMIOOperationCallback callback, String indexIdentifier) {
         this.accessor = accessor;
         this.mergingComponents = mergingComponents;
         this.cursor = cursor;
@@ -49,6 +50,7 @@
         this.deletedKeysBTreeMergeTarget = deletedKeysBTreeMergeTarget;
         this.bloomFilterMergeTarget = bloomFilterMergeTarget;
         this.callback = callback;
+        this.indexIdentifier = indexIdentifier;
     }
 
     @Override
@@ -103,4 +105,14 @@
     public List<ILSMComponent> getMergingComponents() {
         return mergingComponents;
     }
+
+    @Override
+    public String getIndexUniqueIdentifier() {
+        return indexIdentifier;
+    }
+
+    @Override
+    public LSMIOOpertionType getIOOpertionType() {
+        return LSMIOOpertionType.MERGE;
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexOpContext.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexOpContext.java
index 1a9303f..671e3f8 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexOpContext.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexOpContext.java
@@ -35,7 +35,8 @@
 
     private IndexOperation op;
     private final List<ILSMComponent> componentHolder;
-
+    private final List<ILSMComponent> componentsToBeMerged;
+    
     public final IModificationOperationCallback modificationCallback;
     public final ISearchOperationCallback searchCallback;
 
@@ -54,6 +55,7 @@
             IModificationOperationCallback modificationCallback, ISearchOperationCallback searchCallback)
             throws HyracksDataException {
         this.componentHolder = new LinkedList<ILSMComponent>();
+        this.componentsToBeMerged = new LinkedList<ILSMComponent>();
         this.modificationCallback = modificationCallback;
         this.searchCallback = searchCallback;
 
@@ -84,6 +86,7 @@
     @Override
     public void reset() {
         componentHolder.clear();
+        componentsToBeMerged.clear();
     }
 
     @Override
@@ -118,4 +121,9 @@
         currentMutableInvIndexAccessors = mutableInvIndexAccessors[currentMutableComponentId];
         currentDeletedKeysBTreeAccessors = deletedKeysBTreeAccessors[currentMutableComponentId];
     }
+    
+    @Override
+    public List<ILSMComponent> getComponentsToBeMerged() {
+        return componentsToBeMerged;
+    }
 }
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexRangeSearchCursor.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexRangeSearchCursor.java
index cd0dde3..a6ff07e 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexRangeSearchCursor.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexRangeSearchCursor.java
@@ -28,8 +28,8 @@
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
 import edu.uci.ics.hyracks.storage.am.common.tuples.PermutingTupleReference;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent.LSMComponentType;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.BloomFilterAwareBTreePointSearchCursor;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMIndexSearchCursor;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexAccessor;
@@ -43,7 +43,7 @@
     protected RangePredicate keySearchPred;
 
     public LSMInvertedIndexRangeSearchCursor(ILSMIndexOperationContext opCtx) {
-        super(opCtx);
+        super(opCtx, false);
     }
 
     @Override
@@ -76,12 +76,12 @@
             for (int i = 0; i < operationalComponents.size(); i++) {
                 ILSMComponent component = operationalComponents.get(i);
                 if (component.getType() == LSMComponentType.MEMORY) {
-                 // No need for a bloom filter for the in-memory BTree.
+                    // No need for a bloom filter for the in-memory BTree.
                     deletedKeysBTreeCursors[i] = deletedKeysBTreeAccessors.get(i).createSearchCursor();
                 } else {
-                    deletedKeysBTreeCursors[i] = new BloomFilterAwareBTreePointSearchCursor((IBTreeLeafFrame) lsmInitState
-                            .getgetDeletedKeysBTreeLeafFrameFactory().createFrame(), false,
-                            ((LSMInvertedIndexDiskComponent) operationalComponents.get(i)).getBloomFilter());
+                    deletedKeysBTreeCursors[i] = new BloomFilterAwareBTreePointSearchCursor(
+                            (IBTreeLeafFrame) lsmInitState.getgetDeletedKeysBTreeLeafFrameFactory().createFrame(),
+                            false, ((LSMInvertedIndexDiskComponent) operationalComponents.get(i)).getBloomFilter());
                 }
             }
         }
@@ -114,50 +114,4 @@
         }
         return false;
     }
-
-    @Override
-    protected void checkPriorityQueue() throws HyracksDataException, IndexException {
-        while (!outputPriorityQueue.isEmpty() || needPush == true) {
-            if (!outputPriorityQueue.isEmpty()) {
-                PriorityQueueElement checkElement = outputPriorityQueue.peek();
-                // If there is no previous tuple or the previous tuple can be ignored
-                if (outputElement == null) {
-                    if (isDeleted(checkElement)) {
-                        // If the key has been deleted then pop it and set needPush to true.
-                        // We cannot push immediately because the tuple may be
-                        // modified if hasNext() is called
-                        outputElement = outputPriorityQueue.poll();
-                        needPush = true;
-                    } else {
-                        break;
-                    }
-                } else {
-                    // Compare the previous tuple and the head tuple in the PQ
-                    if (compare(cmp, outputElement.getTuple(), checkElement.getTuple()) == 0) {
-                        // If the previous tuple and the head tuple are
-                        // identical
-                        // then pop the head tuple and push the next tuple from
-                        // the tree of head tuple
-
-                        // the head element of PQ is useless now
-                        PriorityQueueElement e = outputPriorityQueue.poll();
-                        pushIntoPriorityQueue(e);
-                    } else {
-                        // If the previous tuple and the head tuple are different
-                        // the info of previous tuple is useless
-                        if (needPush == true) {
-                            pushIntoPriorityQueue(outputElement);
-                            needPush = false;
-                        }
-                        outputElement = null;
-                    }
-                }
-            } else {
-                // the priority queue is empty and needPush
-                pushIntoPriorityQueue(outputElement);
-                needPush = false;
-                outputElement = null;
-            }
-        }
-    }
 }
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
index 0ab2f4d..4f586f9 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
@@ -462,6 +462,10 @@
     public BTree getBTree() {
         return btree;
     }
+    
+    public FileReference getInvListsFile() {
+        return invListsFile;
+    }
 
     public class OnDiskInvertedIndexAccessor implements IInvertedIndexAccessor {
         private final OnDiskInvertedIndex index;
@@ -561,6 +565,12 @@
         public ByteBuffer allocateFrame() {
             return ByteBuffer.allocate(FRAME_SIZE);
         }
+        
+        @Override
+        public void deallocateFrames(int frameCount) {
+            // TODO Auto-generated method stub
+            
+        }
     }
 
     @Override
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeDataflowHelperFactory.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeDataflowHelperFactory.java
index 8254689..7b54f19 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeDataflowHelperFactory.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeDataflowHelperFactory.java
@@ -15,6 +15,8 @@
 
 package edu.uci.ics.hyracks.storage.am.lsm.rtree.dataflow;
 
+import java.util.Map;
+
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ILinearizeComparatorFactory;
@@ -23,7 +25,7 @@
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexDataflowHelper;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationSchedulerProvider;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerProvider;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.IVirtualBufferCacheProvider;
 import edu.uci.ics.hyracks.storage.am.lsm.common.dataflow.AbstractLSMIndexDataflowHelperFactory;
@@ -40,12 +42,12 @@
 
     public LSMRTreeDataflowHelperFactory(IPrimitiveValueProviderFactory[] valueProviderFactories,
             RTreePolicyType rtreePolicyType, IBinaryComparatorFactory[] btreeComparatorFactories,
-            IVirtualBufferCacheProvider virtualBufferCacheProvider, ILSMMergePolicyProvider mergePolicyProvider,
-            ILSMOperationTrackerProvider opTrackerFactory, ILSMIOOperationSchedulerProvider ioSchedulerProvider,
-            ILSMIOOperationCallbackFactory ioOpCallbackFactory, ILinearizeComparatorFactory linearizeCmpFactory,
-            double bloomFilterFalsePositiveRate) {
-        super(virtualBufferCacheProvider, mergePolicyProvider, opTrackerFactory, ioSchedulerProvider,
-                ioOpCallbackFactory, bloomFilterFalsePositiveRate);
+            IVirtualBufferCacheProvider virtualBufferCacheProvider, ILSMMergePolicyFactory mergePolicyFactory,
+            Map<String, String> mergePolicyProperties, ILSMOperationTrackerProvider opTrackerFactory,
+            ILSMIOOperationSchedulerProvider ioSchedulerProvider, ILSMIOOperationCallbackFactory ioOpCallbackFactory,
+            ILinearizeComparatorFactory linearizeCmpFactory, double bloomFilterFalsePositiveRate) {
+        super(virtualBufferCacheProvider, mergePolicyFactory, mergePolicyProperties, opTrackerFactory,
+                ioSchedulerProvider, ioOpCallbackFactory, bloomFilterFalsePositiveRate);
         this.btreeComparatorFactories = btreeComparatorFactories;
         this.valueProviderFactories = valueProviderFactories;
         this.rtreePolicyType = rtreePolicyType;
@@ -58,7 +60,7 @@
         return new LSMRTreeDataflowHelper(opDesc, ctx, partition,
                 virtualBufferCacheProvider.getVirtualBufferCaches(ctx), bloomFilterFalsePositiveRate,
                 btreeComparatorFactories, valueProviderFactories, rtreePolicyType,
-                mergePolicyProvider.getMergePolicy(ctx), opTrackerFactory, ioSchedulerProvider.getIOScheduler(ctx),
-                ioOpCallbackFactory, linearizeCmpFactory);
+                mergePolicyFactory.createMergePolicy(mergePolicyProperties), opTrackerFactory,
+                ioSchedulerProvider.getIOScheduler(ctx), ioOpCallbackFactory, linearizeCmpFactory);
     }
 }
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeWithAntiMatterTuplesDataflowHelperFactory.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeWithAntiMatterTuplesDataflowHelperFactory.java
index ef34876..2e1cfaa 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeWithAntiMatterTuplesDataflowHelperFactory.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeWithAntiMatterTuplesDataflowHelperFactory.java
@@ -15,47 +15,42 @@
 
 package edu.uci.ics.hyracks.storage.am.lsm.rtree.dataflow;
 
+import java.util.Map;
+
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ILinearizeComparatorFactory;
 import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexDataflowHelper;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationSchedulerProvider;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerProvider;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.IVirtualBufferCacheProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.dataflow.AbstractLSMIndexDataflowHelperFactory;
 import edu.uci.ics.hyracks.storage.am.rtree.frames.RTreePolicyType;
 
-public class LSMRTreeWithAntiMatterTuplesDataflowHelperFactory implements IIndexDataflowHelperFactory {
+public class LSMRTreeWithAntiMatterTuplesDataflowHelperFactory extends AbstractLSMIndexDataflowHelperFactory {
 
     private static final long serialVersionUID = 1L;
 
-    private final IVirtualBufferCacheProvider virtualBufferCacheProvider;
     private final IBinaryComparatorFactory[] btreeComparatorFactories;
     private final IPrimitiveValueProviderFactory[] valueProviderFactories;
     private final RTreePolicyType rtreePolicyType;
-    private final ILSMMergePolicyProvider mergePolicyProvider;
-    private final ILSMOperationTrackerProvider opTrackerProvider;
-    private final ILSMIOOperationSchedulerProvider ioSchedulerProvider;
-    private final ILSMIOOperationCallbackFactory ioOpCallbackFactory;
     private final ILinearizeComparatorFactory linearizeCmpFactory;
 
     public LSMRTreeWithAntiMatterTuplesDataflowHelperFactory(IPrimitiveValueProviderFactory[] valueProviderFactories,
             RTreePolicyType rtreePolicyType, IBinaryComparatorFactory[] btreeComparatorFactories,
-            IVirtualBufferCacheProvider virtualBufferCacheProvider, ILSMMergePolicyProvider mergePolicyProvider,
-            ILSMOperationTrackerProvider opTrackerProvider, ILSMIOOperationSchedulerProvider ioSchedulerProvider,
-            ILSMIOOperationCallbackFactory ioOpCallbackFactory, ILinearizeComparatorFactory linearizeCmpFactory) {
-        this.virtualBufferCacheProvider = virtualBufferCacheProvider;
+            IVirtualBufferCacheProvider virtualBufferCacheProvider, ILSMMergePolicyFactory mergePolicyFactory,
+            Map<String, String> mergePolicyProperties, ILSMOperationTrackerProvider opTrackerFactory,
+            ILSMIOOperationSchedulerProvider ioSchedulerProvider, ILSMIOOperationCallbackFactory ioOpCallbackFactory,
+            ILinearizeComparatorFactory linearizeCmpFactory) {
+        super(virtualBufferCacheProvider, mergePolicyFactory, mergePolicyProperties, opTrackerFactory, ioSchedulerProvider,
+                ioOpCallbackFactory, 1.0);
         this.btreeComparatorFactories = btreeComparatorFactories;
         this.valueProviderFactories = valueProviderFactories;
         this.rtreePolicyType = rtreePolicyType;
-        this.mergePolicyProvider = mergePolicyProvider;
-        this.ioSchedulerProvider = ioSchedulerProvider;
-        this.opTrackerProvider = opTrackerProvider;
-        this.ioOpCallbackFactory = ioOpCallbackFactory;
         this.linearizeCmpFactory = linearizeCmpFactory;
     }
 
@@ -64,7 +59,7 @@
             int partition) {
         return new LSMRTreeWithAntiMatterTuplesDataflowHelper(opDesc, ctx, partition,
                 virtualBufferCacheProvider.getVirtualBufferCaches(ctx), btreeComparatorFactories,
-                valueProviderFactories, rtreePolicyType, mergePolicyProvider.getMergePolicy(ctx), opTrackerProvider,
-                ioSchedulerProvider.getIOScheduler(ctx), ioOpCallbackFactory, linearizeCmpFactory);
+                valueProviderFactories, rtreePolicyType, mergePolicyFactory.createMergePolicy(mergePolicyProperties),
+                opTrackerFactory, ioSchedulerProvider.getIOScheduler(ctx), ioOpCallbackFactory, linearizeCmpFactory);
     }
 }
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java
index b09e115..673c7ae 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java
@@ -24,18 +24,15 @@
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
 import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
-import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
 import edu.uci.ics.hyracks.storage.am.common.api.IFreePageManager;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexOperationContext;
 import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
 import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
 import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
 import edu.uci.ics.hyracks.storage.am.common.exceptions.TreeIndexDuplicateKeyException;
-import edu.uci.ics.hyracks.storage.am.common.exceptions.TreeIndexNonExistentKeyException;
 import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
@@ -200,11 +197,11 @@
     @Override
     public void getOperationalComponents(ILSMIndexOperationContext ctx) {
         List<ILSMComponent> operationalComponents = ctx.getComponentHolder();
-        operationalComponents.clear();
         List<ILSMComponent> immutableComponents = diskComponents;
         int cmc = currentMutableComponentId.get();
         ctx.setCurrentMutableComponentId(cmc);
         int numMutableComponents = memoryComponents.size();
+        operationalComponents.clear();
         switch (ctx.getOperation()) {
             case INSERT:
             case DELETE:
@@ -225,8 +222,10 @@
                 operationalComponents.addAll(immutableComponents);
                 break;
             case MERGE:
-                operationalComponents.addAll(immutableComponents);
+                operationalComponents.addAll(ctx.getComponentsToBeMerged());
                 break;
+            case FULL_MERGE:
+                operationalComponents.addAll(immutableComponents);
             default:
                 throw new UnsupportedOperationException("Operation " + ctx.getOperation() + " not supported.");
         }
@@ -332,38 +331,11 @@
         ctx.modificationCallback.before(tuple);
         ctx.modificationCallback.found(null, tuple);
         if (ctx.getOperation() == IndexOperation.INSERT) {
-            // Before each insert, we must check whether there exist a killer
-            // tuple in the memBTree. If we find a killer tuple, we must truly
-            // delete the existing tuple from the BTree, and then insert it to
-            // memRTree. Otherwise, the old killer tuple will kill the newly
-            // added RTree tuple.
-            RangePredicate btreeRangePredicate = new RangePredicate(tuple, tuple, true, true,
-                    ctx.getBTreeMultiComparator(), ctx.getBTreeMultiComparator());
-            ITreeIndexCursor cursor = ctx.currentMutableBTreeAccessor.createSearchCursor();
-            ctx.currentMutableBTreeAccessor.search(cursor, btreeRangePredicate);
-            boolean foundTupleInMemoryBTree = false;
-            try {
-                if (cursor.hasNext()) {
-                    foundTupleInMemoryBTree = true;
-                }
-            } finally {
-                cursor.close();
-            }
-            if (foundTupleInMemoryBTree) {
-                try {
-                    ctx.currentMutableBTreeAccessor.delete(tuple);
-                } catch (TreeIndexNonExistentKeyException e) {
-                    // Tuple has been deleted in the meantime. Do nothing.
-                    // This normally shouldn't happen if we are dealing with
-                    // good citizens since LSMRTree is used as a secondary
-                    // index and a tuple shouldn't be deleted twice without
-                    // insert between them.
-                }
-            } else {
-                ctx.currentMutableRTreeAccessor.insert(tuple);
-            }
-
+            ctx.currentMutableRTreeAccessor.insert(tuple);
         } else {
+            // First remove all entries in the in-memory rtree (if any).
+            ctx.currentMutableRTreeAccessor.delete(tuple);
+            // Insert key into the deleted-keys BTree.
             try {
                 ctx.currentMutableBTreeAccessor.insert(tuple);
             } catch (TreeIndexDuplicateKeyException e) {
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
index a4b6139..3d2cc62 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
@@ -177,7 +177,7 @@
         LSMRTreeAccessor accessor = new LSMRTreeAccessor(lsmHarness, rctx);
         ioScheduler.scheduleOperation(new LSMRTreeFlushOperation(accessor, flushingComponent, componentFileRefs
                 .getInsertIndexFileReference(), componentFileRefs.getDeleteIndexFileReference(), componentFileRefs
-                .getBloomFilterFileReference(), callback));
+                .getBloomFilterFileReference(), callback, fileManager.getBaseDir()));
     }
 
     @Override
@@ -295,7 +295,8 @@
         ILSMIndexAccessorInternal accessor = new LSMRTreeAccessor(lsmHarness, rctx);
         ioScheduler.scheduleOperation(new LSMRTreeMergeOperation((ILSMIndexAccessorInternal) accessor,
                 mergingComponents, cursor, relMergeFileRefs.getInsertIndexFileReference(), relMergeFileRefs
-                        .getDeleteIndexFileReference(), relMergeFileRefs.getBloomFilterFileReference(), callback));
+                        .getDeleteIndexFileReference(), relMergeFileRefs.getBloomFilterFileReference(), callback,
+                fileManager.getBaseDir()));
     }
 
     @Override
@@ -309,8 +310,31 @@
 
         LSMRTreeDiskComponent mergedComponent = createDiskComponent(componentFactory, mergeOp.getRTreeMergeTarget(),
                 mergeOp.getBTreeMergeTarget(), mergeOp.getBloomFilterMergeTarget(), true);
-        IIndexBulkLoader bulkLoader = mergedComponent.getRTree().createBulkLoader(1.0f, false, 0L, false);
 
+        // In case we must keep the deleted-keys BTrees, then they must be merged *before* merging the r-trees so that
+        // lsmHarness.endSearch() is called once when the r-trees have been merged.
+        if (mergeOp.getMergingComponents().get(mergeOp.getMergingComponents().size() - 1) != diskComponents
+                .get(diskComponents.size() - 1)) {
+            // Keep the deleted tuples since the oldest disk component is not included in the merge operation
+
+            LSMRTreeDeletedKeysBTreeMergeCursor btreeCursor = new LSMRTreeDeletedKeysBTreeMergeCursor(opCtx);
+            search(opCtx, btreeCursor, rtreeSearchPred);
+
+            BTree btree = mergedComponent.getBTree();
+            IIndexBulkLoader btreeBulkLoader = btree.createBulkLoader(1.0f, true, 0L, false);
+            try {
+                while (btreeCursor.hasNext()) {
+                    btreeCursor.next();
+                    ITupleReference tuple = btreeCursor.getTuple();
+                    btreeBulkLoader.add(tuple);
+                }
+            } finally {
+                btreeCursor.close();
+            }
+            btreeBulkLoader.end();
+        }
+
+        IIndexBulkLoader bulkLoader = mergedComponent.getRTree().createBulkLoader(1.0f, false, 0L, false);
         try {
             while (cursor.hasNext()) {
                 cursor.next();
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDeletedKeysBTreeMergeCursor.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDeletedKeysBTreeMergeCursor.java
new file mode 100644
index 0000000..fe2ed96
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDeletedKeysBTreeMergeCursor.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.rtree.impls;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrame;
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTreeRangeSearchCursor;
+import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
+import edu.uci.ics.hyracks.storage.am.common.api.ICursorInitialState;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMIndexSearchCursor;
+
+public class LSMRTreeDeletedKeysBTreeMergeCursor extends LSMIndexSearchCursor {
+
+    public LSMRTreeDeletedKeysBTreeMergeCursor(ILSMIndexOperationContext opCtx) {
+        super(opCtx, true);
+    }
+
+    @Override
+    protected boolean isDeleted(PriorityQueueElement checkElement) throws HyracksDataException, IndexException {
+        return false;
+    }
+
+    @Override
+    public void open(ICursorInitialState initialState, ISearchPredicate searchPred) throws HyracksDataException,
+            IndexException {
+        LSMRTreeCursorInitialState lsmInitialState = (LSMRTreeCursorInitialState) initialState;
+        cmp = lsmInitialState.getBTreeCmp();
+        operationalComponents = lsmInitialState.getOperationalComponents();
+        // We intentionally set the lsmHarness to null so that we don't call lsmHarness.endSearch() because we already do that when we merge r-trees.
+        lsmHarness = null;
+        int numBTrees = operationalComponents.size();
+        rangeCursors = new IIndexCursor[numBTrees];
+
+        RangePredicate btreePredicate = new RangePredicate(null, null, true, true, cmp, cmp);
+        IIndexAccessor[] btreeAccessors = new ITreeIndexAccessor[numBTrees];
+        for (int i = 0; i < numBTrees; i++) {
+            ILSMComponent component = operationalComponents.get(i);
+            IBTreeLeafFrame leafFrame = (IBTreeLeafFrame) lsmInitialState.getBTreeLeafFrameFactory().createFrame();
+            rangeCursors[i] = new BTreeRangeSearchCursor(leafFrame, false);
+            BTree btree = (BTree) ((LSMRTreeDiskComponent) component).getBTree();
+            btreeAccessors[i] = btree.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+            btreeAccessors[i].search(rangeCursors[i], btreePredicate);
+        }
+        setPriorityQueueComparator();
+        initPriorityQueue();
+    }
+}
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDiskComponent.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDiskComponent.java
index 7bc3f79..c3c0a55 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDiskComponent.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDiskComponent.java
@@ -54,4 +54,14 @@
     public BloomFilter getBloomFilter() {
         return bloomFilter;
     }
+
+    @Override
+    public long getComponentSize() {
+        long size = rtree.getFileReference().getFile().length();
+        if (btree != null) {
+            size += btree.getFileReference().getFile().length();
+            size += bloomFilter.getFileReference().getFile().length();
+        }
+        return size;
+    }
 }
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFlushOperation.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFlushOperation.java
index 18d7a7e..0d40eb4 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFlushOperation.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFlushOperation.java
@@ -27,7 +27,7 @@
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessorInternal;
 
-public class LSMRTreeFlushOperation implements ILSMIOOperation {
+public class LSMRTreeFlushOperation implements ILSMIOOperation, Comparable<LSMRTreeFlushOperation> {
 
     private final ILSMIndexAccessorInternal accessor;
     private final ILSMComponent flushingComponent;
@@ -35,16 +35,18 @@
     private final FileReference btreeFlushTarget;
     private final FileReference bloomFilterFlushTarget;
     private final ILSMIOOperationCallback callback;
+    private final String indexIdentifier;
 
     public LSMRTreeFlushOperation(ILSMIndexAccessorInternal accessor, ILSMComponent flushingComponent,
             FileReference rtreeFlushTarget, FileReference btreeFlushTarget, FileReference bloomFilterFlushTarget,
-            ILSMIOOperationCallback callback) {
+            ILSMIOOperationCallback callback, String indexIdentifier) {
         this.accessor = accessor;
         this.flushingComponent = flushingComponent;
         this.rtreeFlushTarget = rtreeFlushTarget;
         this.btreeFlushTarget = btreeFlushTarget;
         this.bloomFilterFlushTarget = bloomFilterFlushTarget;
         this.callback = callback;
+        this.indexIdentifier = indexIdentifier;
     }
 
     @Override
@@ -89,4 +91,19 @@
     public ILSMComponent getFlushingComponent() {
         return flushingComponent;
     }
+
+    @Override
+    public String getIndexUniqueIdentifier() {
+        return indexIdentifier;
+    }
+
+    @Override
+    public LSMIOOpertionType getIOOpertionType() {
+        return LSMIOOpertionType.FLUSH;
+    }
+
+    @Override
+    public int compareTo(LSMRTreeFlushOperation o) {
+        return rtreeFlushTarget.getFile().getName().compareTo(o.getRTreeFlushTarget().getFile().getName());
+    }
 }
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMergeOperation.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMergeOperation.java
index da7a2fb..ddf3ebf 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMergeOperation.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMergeOperation.java
@@ -36,10 +36,11 @@
     private final FileReference btreeMergeTarget;
     private final FileReference bloomFilterMergeTarget;
     private final ILSMIOOperationCallback callback;
+    private final String indexIdentifier;
 
     public LSMRTreeMergeOperation(ILSMIndexAccessorInternal accessor, List<ILSMComponent> mergingComponents,
             ITreeIndexCursor cursor, FileReference rtreeMergeTarget, FileReference btreeMergeTarget,
-            FileReference bloomFilterMergeTarget, ILSMIOOperationCallback callback) {
+            FileReference bloomFilterMergeTarget, ILSMIOOperationCallback callback, String indexIdentifier) {
         this.accessor = accessor;
         this.mergingComponents = mergingComponents;
         this.cursor = cursor;
@@ -47,6 +48,7 @@
         this.btreeMergeTarget = btreeMergeTarget;
         this.bloomFilterMergeTarget = bloomFilterMergeTarget;
         this.callback = callback;
+        this.indexIdentifier = indexIdentifier;
     }
 
     @Override
@@ -104,4 +106,14 @@
     public List<ILSMComponent> getMergingComponents() {
         return mergingComponents;
     }
+
+    @Override
+    public String getIndexUniqueIdentifier() {
+        return indexIdentifier;
+    }
+
+    @Override
+    public LSMIOOpertionType getIOOpertionType() {
+        return LSMIOOpertionType.MERGE;
+    }
 }
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeOpContext.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeOpContext.java
index b94feba..132e55b 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeOpContext.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeOpContext.java
@@ -48,6 +48,7 @@
 
     private IndexOperation op;
     public final List<ILSMComponent> componentHolder;
+    private final List<ILSMComponent> componentsToBeMerged;
     public final IModificationOperationCallback modificationCallback;
     public final ISearchOperationCallback searchCallback;
 
@@ -76,6 +77,7 @@
         currentRTreeOpContext = rtreeOpContexts[0];
         currentBTreeOpContext = btreeOpContexts[0];
         this.componentHolder = new LinkedList<ILSMComponent>();
+        this.componentsToBeMerged = new LinkedList<ILSMComponent>();
         this.modificationCallback = modificationCallback;
         this.searchCallback = searchCallback;
     }
@@ -101,6 +103,7 @@
     @Override
     public void reset() {
         componentHolder.clear();
+        componentsToBeMerged.clear();
     }
 
     @Override
@@ -126,4 +129,9 @@
     public IModificationOperationCallback getModificationCallback() {
         return modificationCallback;
     }
+    
+    @Override
+    public List<ILSMComponent> getComponentsToBeMerged() {
+        return componentsToBeMerged;
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSearchCursor.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSearchCursor.java
index 30dd467..f669585 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSearchCursor.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSearchCursor.java
@@ -79,7 +79,7 @@
                 ITupleReference currentTuple = rtreeCursors[currentCursor].getTuple();
 
                 boolean killerTupleFound = false;
-                for (int i = 0; i <= currentCursor; i++) {
+                for (int i = 0; i < currentCursor; i++) {
                     btreeCursors[i].reset();
                     btreeRangePredicate.setHighKey(currentTuple, true);
                     btreeRangePredicate.setLowKey(currentTuple, true);
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSortedCursor.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSortedCursor.java
index dd31165..2e6fc78 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSortedCursor.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSortedCursor.java
@@ -25,6 +25,8 @@
 
 public class LSMRTreeSortedCursor extends LSMRTreeAbstractCursor {
 
+    // TODO: This class can be removed and instead use a search cursor that uses a logic similar 
+    // to the one in LSMRTreeWithAntiMatterTuplesSearchCursor
     private ILinearizeComparator linearizeCmp;
     private boolean[] depletedRtreeCursors;
     private int foundIn = -1;
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
index 812e942..bfd0260 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
@@ -40,7 +40,6 @@
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMHarness;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackProvider;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessorInternal;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexFileManager;
@@ -76,9 +75,9 @@
             ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker, ILSMIOOperationScheduler ioScheduler,
             ILSMIOOperationCallback ioOpCallback) {
         super(virtualBufferCaches, rtreeInteriorFrameFactory, rtreeLeafFrameFactory, btreeInteriorFrameFactory,
-                btreeLeafFrameFactory, fileManager, new LSMRTreeWithAntiMatterTuplesDiskComponentFactory(diskRTreeFactory),
-                diskFileMapProvider, fieldCount, rtreeCmpFactories, btreeCmpFactories, linearizer, comparatorFields,
-                linearizerArray, 0, mergePolicy, opTracker, ioScheduler, ioOpCallback);
+                btreeLeafFrameFactory, fileManager, new LSMRTreeWithAntiMatterTuplesDiskComponentFactory(
+                        diskRTreeFactory), diskFileMapProvider, fieldCount, rtreeCmpFactories, btreeCmpFactories,
+                linearizer, comparatorFields, linearizerArray, 0, mergePolicy, opTracker, ioScheduler, ioOpCallback);
         bulkLoaComponentFactory = new LSMRTreeWithAntiMatterTuplesDiskComponentFactory(bulkLoadRTreeFactory);
         this.bTreeTupleSorter = null;
     }
@@ -156,7 +155,7 @@
         opCtx.getComponentHolder().add(flushingComponent);
         ILSMIndexAccessorInternal accessor = new LSMRTreeWithAntiMatterTuplesAccessor(lsmHarness, opCtx);
         ioScheduler.scheduleOperation(new LSMRTreeFlushOperation(accessor, flushingComponent, relFlushFileRefs
-                .getInsertIndexFileReference(), null, null, callback));
+                .getInsertIndexFileReference(), null, null, callback, fileManager.getBaseDir()));
     }
 
     @Override
@@ -248,11 +247,16 @@
         LSMRTreeOpContext rctx = createOpContext(NoOpOperationCallback.INSTANCE);
         rctx.setOperation(IndexOperation.MERGE);
         List<ILSMComponent> mergingComponents = ctx.getComponentHolder();
-        ITreeIndexCursor cursor = new LSMRTreeWithAntiMatterTuplesSearchCursor(rctx);
+        boolean returnDeletedTuples = false;
+        if (ctx.getComponentHolder().get(ctx.getComponentHolder().size() - 1) != diskComponents.get(diskComponents
+                .size() - 1)) {
+            returnDeletedTuples = true;
+        }
+        ITreeIndexCursor cursor = new LSMRTreeWithAntiMatterTuplesSearchCursor(rctx, returnDeletedTuples);
         LSMComponentFileReferences relMergeFileRefs = getMergeTargetFileName(mergingComponents);
         ILSMIndexAccessorInternal accessor = new LSMRTreeWithAntiMatterTuplesAccessor(lsmHarness, rctx);
         ioScheduler.scheduleOperation(new LSMRTreeMergeOperation(accessor, mergingComponents, cursor, relMergeFileRefs
-                .getInsertIndexFileReference(), null, null, callback));
+                .getInsertIndexFileReference(), null, null, callback, fileManager.getBaseDir()));
     }
 
     @Override
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesSearchCursor.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesSearchCursor.java
index cbaf3b3..7099d7d 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesSearchCursor.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesSearchCursor.java
@@ -54,7 +54,11 @@
     private int numMutableComponents;
 
     public LSMRTreeWithAntiMatterTuplesSearchCursor(ILSMIndexOperationContext opCtx) {
-        super(opCtx);
+        this(opCtx, false);
+    }
+
+    public LSMRTreeWithAntiMatterTuplesSearchCursor(ILSMIndexOperationContext opCtx, boolean returnDeletedTuples) {
+        super(opCtx, returnDeletedTuples);
         currentCursor = 0;
     }
 
@@ -152,7 +156,7 @@
             while (super.hasNext()) {
                 super.next();
                 ITupleReference diskRTreeTuple = super.getTuple();
-                if (searchMemBTrees(diskRTreeTuple, numMutableComponents - 1)) {
+                if (searchMemBTrees(diskRTreeTuple, numMutableComponents)) {
                     foundNext = true;
                     frameTuple = diskRTreeTuple;
                     return true;
@@ -216,7 +220,7 @@
 
     private boolean searchMemBTrees(ITupleReference tuple, int lastBTreeToSearch) throws HyracksDataException,
             IndexException {
-        for (int i = 0; i <= lastBTreeToSearch; i++) {
+        for (int i = 0; i < lastBTreeToSearch; i++) {
             btreeCursors[i].reset();
             btreeRangePredicate.setHighKey(tuple, true);
             btreeRangePredicate.setLowKey(tuple, true);
@@ -261,50 +265,4 @@
             }
         }
     }
-
-    @Override
-    protected void checkPriorityQueue() throws HyracksDataException, IndexException {
-        while (!outputPriorityQueue.isEmpty() || needPush == true) {
-            if (!outputPriorityQueue.isEmpty()) {
-                PriorityQueueElement checkElement = outputPriorityQueue.peek();
-                // If there is no previous tuple or the previous tuple can be ignored
-                if (outputElement == null) {
-                    if (isDeleted(checkElement)) {
-                        // If the key has been deleted then pop it and set needPush to true.
-                        // We cannot push immediately because the tuple may be
-                        // modified if hasNext() is called
-                        outputElement = outputPriorityQueue.poll();
-                        needPush = true;
-                    } else {
-                        break;
-                    }
-                } else {
-                    // Compare the previous tuple and the head tuple in the PQ
-                    if (compare(cmp, outputElement.getTuple(), checkElement.getTuple()) == 0) {
-                        // If the previous tuple and the head tuple are
-                        // identical
-                        // then pop the head tuple and push the next tuple from
-                        // the tree of head tuple
-
-                        // the head element of PQ is useless now
-                        PriorityQueueElement e = outputPriorityQueue.poll();
-                        pushIntoPriorityQueue(e);
-                    } else {
-                        // If the previous tuple and the head tuple are different
-                        // the info of previous tuple is useless
-                        if (needPush == true) {
-                            pushIntoPriorityQueue(outputElement);
-                            needPush = false;
-                        }
-                        outputElement = null;
-                    }
-                }
-            } else {
-                // the priority queue is empty and needPush
-                pushIntoPriorityQueue(outputElement);
-                needPush = false;
-                outputElement = null;
-            }
-        }
-    }
 }
diff --git a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
index 8090564..2992dfe 100644
--- a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
+++ b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
@@ -119,7 +119,8 @@
 
     @Override
     public ICachedPage tryPin(long dpid) throws HyracksDataException {
-        pinSanityCheck(dpid);
+        // Calling the pinSanityCheck should be used only for debugging, since the synchronized block over the fileInfoMap is a hot spot.
+        //pinSanityCheck(dpid);
         CachedPage cPage = null;
         int hash = hash(dpid);
         CacheBucket bucket = pageMap[hash];
@@ -142,7 +143,8 @@
 
     @Override
     public ICachedPage pin(long dpid, boolean newPage) throws HyracksDataException {
-        pinSanityCheck(dpid);
+        // Calling the pinSanityCheck should be used only for debugging, since the synchronized block over the fileInfoMap is a hot spot.
+        //pinSanityCheck(dpid);
         CachedPage cPage = findPage(dpid, newPage);
         if (!newPage) {
             // Resolve race of multiple threads trying to read the page from
@@ -760,8 +762,8 @@
         BufferedFileHandle fInfo = null;
         synchronized (fileInfoMap) {
             fInfo = fileInfoMap.get(fileId);
-            ioManager.sync(fInfo.getFileHandle(), metadata);
         }
+        ioManager.sync(fInfo.getFileHandle(), metadata);
     }
 
     @Override
diff --git a/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestTaskContext.java b/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestTaskContext.java
index 10b5582..f18099b 100644
--- a/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestTaskContext.java
+++ b/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestTaskContext.java
@@ -46,6 +46,10 @@
     public ByteBuffer allocateFrame() {
         return jobletContext.allocateFrame();
     }
+    
+    @Override
+    public void deallocateFrames(int frameCount) {
+    }
 
     @Override
     public int getFrameSize() {
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeMergeTestDriver.java b/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeMergeTestDriver.java
index f7aa7f4..9729d51 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeMergeTestDriver.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeMergeTestDriver.java
@@ -24,6 +24,7 @@
 import edu.uci.ics.hyracks.storage.am.btree.OrderedIndexTestUtils;
 import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeLeafFrameType;
 import edu.uci.ics.hyracks.storage.am.config.AccessMethodTestsConfig;
+import edu.uci.ics.hyracks.storage.am.lsm.btree.impls.LSMBTree;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallback;
 
@@ -70,7 +71,8 @@
             }
 
             ILSMIndexAccessor accessor = (ILSMIndexAccessor) ctx.getIndexAccessor();
-            accessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE);
+            accessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE,
+                    ((LSMBTree) ctx.getIndex()).getImmutableComponents());
 
             orderedIndexTestUtils.checkPointSearches(ctx);
             orderedIndexTestUtils.checkScan(ctx);
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/multithread/LSMBTreeTestWorker.java b/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/multithread/LSMBTreeTestWorker.java
index 4a96131..35ecc20 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/multithread/LSMBTreeTestWorker.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/multithread/LSMBTreeTestWorker.java
@@ -106,7 +106,7 @@
                 break;
 
             case MERGE:
-                accessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE);
+                accessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE, lsmBTree.getImmutableComponents());
                 break;
 
             default:
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/perf/LSMTreeRunner.java b/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/perf/LSMTreeRunner.java
index 3591f78..fb06a7e 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/perf/LSMTreeRunner.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/perf/LSMTreeRunner.java
@@ -20,10 +20,12 @@
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
+import java.util.concurrent.ThreadFactory;
 
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.control.nc.io.IOManager;
@@ -37,9 +39,9 @@
 import edu.uci.ics.hyracks.storage.am.lsm.btree.util.LSMBTreeUtils;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.AsynchronousScheduler;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoMergePolicy;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallback;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SynchronousScheduler;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ThreadCountingTracker;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.VirtualBufferCache;
 import edu.uci.ics.hyracks.storage.common.buffercache.HeapBufferAllocator;
@@ -50,8 +52,8 @@
 
 public class LSMTreeRunner implements IExperimentRunner {
 
-    private static final int MAX_OPEN_FILES = 10000;
-    private static final int HYRACKS_FRAME_SIZE = 128;
+    private static final int MAX_OPEN_FILES = Integer.MAX_VALUE;
+    private static final int HYRACKS_FRAME_SIZE = 131072;
 
     protected IHyracksTaskContext ctx;
     protected IOManager ioManager;
@@ -61,7 +63,7 @@
 
     protected final static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("ddMMyy-hhmmssSS");
     protected final static String sep = System.getProperty("file.separator");
-    protected final static String classDir = "/lsmtree/";
+    protected final static String classDir = "/tmp/lsmtree/";
     protected String onDiskDir;
     protected FileReference file;
 
@@ -71,6 +73,11 @@
     protected IBufferCache memBufferCache;
     private final int onDiskPageSize;
     private final int onDiskNumPages;
+    private final static ThreadFactory threadFactory = new ThreadFactory() {
+        public Thread newThread(Runnable r) {
+            return new Thread(r);
+        }
+    };
 
     public LSMTreeRunner(int numBatches, int inMemPageSize, int inMemNumPages, int onDiskPageSize, int onDiskNumPages,
             ITypeTraits[] typeTraits, IBinaryComparatorFactory[] cmpFactories, int[] bloomFilterKeyFields,
@@ -93,11 +100,13 @@
         List<IVirtualBufferCache> virtualBufferCaches = new ArrayList<IVirtualBufferCache>();
         for (int i = 0; i < 2; i++) {
             IVirtualBufferCache virtualBufferCache = new VirtualBufferCache(new HeapBufferAllocator(), inMemPageSize,
-                    inMemNumPages);
+                    inMemNumPages / 2);
             virtualBufferCaches.add(virtualBufferCache);
         }
 
-        this.ioScheduler = SynchronousScheduler.INSTANCE;
+        this.ioScheduler = AsynchronousScheduler.INSTANCE;
+        AsynchronousScheduler.INSTANCE.init(threadFactory);
+
         lsmtree = LSMBTreeUtils.createLSMTree(virtualBufferCaches, file, bufferCache, fmp, typeTraits, cmpFactories,
                 bloomFilterKeyFields, bloomFilterFalsePositiveRate, NoMergePolicy.INSTANCE,
                 new ThreadCountingTracker(), ioScheduler, NoOpIOOperationCallback.INSTANCE);
@@ -133,15 +142,23 @@
 
     @Override
     public void reset() throws Exception {
+        try {
+            lsmtree.deactivate();
+        } catch (HyracksDataException e) {
+            // ignore
+        }
+        try {
+            lsmtree.destroy();
+        } catch (HyracksDataException e) {
+            // ignore
+        }
+
         lsmtree.create();
+        lsmtree.activate();
     }
 
     @Override
     public void deinit() throws Exception {
-        bufferCache.closeFile(lsmtreeFileId);
-        bufferCache.close();
-        memBufferCache.closeFile(lsmtreeFileId);
-        memBufferCache.close();
     }
 
     public class LSMTreeThread extends Thread {
@@ -166,6 +183,7 @@
                         } catch (TreeIndexException e) {
                         }
                     }
+                    dataGen.releaseBatch(batch);
                 }
             } catch (Exception e) {
                 e.printStackTrace();
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/perf/PerfExperiment.java b/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/perf/PerfExperiment.java
index 641362e..2354550 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/perf/PerfExperiment.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/perf/PerfExperiment.java
@@ -31,18 +31,20 @@
     public static void main(String[] args) throws Exception {
         // Disable logging so we can better see the output times.
         Enumeration<String> loggers = LogManager.getLogManager().getLoggerNames();
-        while(loggers.hasMoreElements()) {
+        while (loggers.hasMoreElements()) {
             String loggerName = loggers.nextElement();
             Logger logger = LogManager.getLogManager().getLogger(loggerName);
             logger.setLevel(Level.OFF);
         }
-        
-        int numTuples = 100000; // 100K
+        boolean sorted = Boolean.parseBoolean(args[0]);
+        int numThreads =  Integer.parseInt(args[1]);
+
+        //int numTuples = 100000; // 100K
         //int numTuples = 1000000; // 1M
         //int numTuples = 2000000; // 2M
         //int numTuples = 3000000; // 3M
         //int numTuples = 10000000; // 10M
-        //int numTuples = 20000000; // 20M
+        int numTuples = 20000000; // 20M
         //int numTuples = 30000000; // 30M
         //int numTuples = 40000000; // 40M
         //int numTuples = 60000000; // 60M
@@ -50,27 +52,41 @@
         //int numTuples = 200000000; // 200M
         int batchSize = 10000;
         int numBatches = numTuples / batchSize;
-        
+
+        int payLoadSize = 240;
         ISerializerDeserializer[] fieldSerdes = new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE };
-        ITypeTraits[] typeTraits = SerdeUtils.serdesToTypeTraits(fieldSerdes, 30);
-        
-        IBinaryComparatorFactory[] cmpFactories = SerdeUtils.serdesToComparatorFactories(fieldSerdes, fieldSerdes.length);
-        
+        ITypeTraits[] typeTraits = SerdeUtils.serdesToTypeTraits(fieldSerdes, payLoadSize);
+
+        IBinaryComparatorFactory[] cmpFactories = SerdeUtils.serdesToComparatorFactories(fieldSerdes,
+                fieldSerdes.length);
+        int[] bloomFilterKeyFields = new int[cmpFactories.length];
+        for (int i = 0; i < bloomFilterKeyFields.length; i++) {
+            bloomFilterKeyFields[i] = i;
+        }
+        double bloomFilterFalsePositiveRate = 0.01;
+
         //int repeats = 1000;
         int repeats = 1;
         long[] times = new long[repeats];
 
-        int numThreads = 2;
+//        int numThreads = 4;
+//        boolean sorted = true;
         for (int i = 0; i < repeats; i++) {
             //ConcurrentSkipListRunner runner = new ConcurrentSkipListRunner(numBatches, batchSize, tupleSize, typeTraits, cmp);
-            InMemoryBTreeRunner runner = new InMemoryBTreeRunner(numBatches, 8192, 100000, typeTraits, cmpFactories);
+            //InMemoryBTreeRunner runner = new InMemoryBTreeRunner(numBatches, 8192, 100000, typeTraits, cmpFactories);
             //BTreeBulkLoadRunner runner = new BTreeBulkLoadRunner(numBatches, 8192, 100000, typeTraits, cmp, 1.0f);
-        	//BTreeRunner runner = new BTreeRunner(numBatches, 8192, 100000, typeTraits, cmp);
-        	//String btreeName = "071211";
-        	//BTreeSearchRunner runner = new BTreeSearchRunner(btreeName, 10, numBatches, 8192, 25000, typeTraits, cmp);
-        	//LSMTreeRunner runner = new LSMTreeRunner(numBatches, 8192, 100, 8192, 250, typeTraits, cmp);
-        	//LSMTreeSearchRunner runner = new LSMTreeSearchRunner(100000, numBatches, 8192, 24750, 8192, 250, typeTraits, cmp); 
-            DataGenThread dataGen = new DataGenThread(numThreads, numBatches, batchSize, fieldSerdes, 30, 50, 10, false);
+            //BTreeRunner runner = new BTreeRunner(numBatches, 8192, 100000, typeTraits, cmp);
+            //String btreeName = "071211";
+            //BTreeSearchRunner runner = new BTreeSearchRunner(btreeName, 10, numBatches, 8192, 25000, typeTraits, cmp);
+            //LSMTreeRunner runner = new LSMTreeRunner(numBatches, 8192, 100, 8192, 250, typeTraits, cmp);
+            //LSMTreeSearchRunner runner = new LSMTreeSearchRunner(100000, numBatches, 8192, 24750, 8192, 250, typeTraits, cmp); 
+            int inMemPageSize = 131072; // 128kb
+            int onDiskPageSize = inMemPageSize;
+            int inMemNumPages = 8192; // 1GB
+            int onDiskNumPages = 16384; // 2GB
+            LSMTreeRunner runner = new LSMTreeRunner(numBatches, inMemPageSize, inMemNumPages, onDiskPageSize,
+                    onDiskNumPages, typeTraits, cmpFactories, bloomFilterKeyFields, bloomFilterFalsePositiveRate);
+            DataGenThread dataGen = new DataGenThread(numThreads, numBatches, batchSize, fieldSerdes, payLoadSize, 50, 10, sorted);
             dataGen.start();
             runner.reset();
             times[i] = runner.runExperiment(dataGen, numThreads);
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/LSMInvertedIndexMergeTest.java b/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/LSMInvertedIndexMergeTest.java
index 5115b7e..3713498 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/LSMInvertedIndexMergeTest.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/LSMInvertedIndexMergeTest.java
@@ -25,6 +25,7 @@
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallback;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.common.AbstractInvertedIndexLoadTest;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.impls.LSMInvertedIndex;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util.LSMInvertedIndexTestContext;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util.LSMInvertedIndexTestContext.InvertedIndexType;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util.LSMInvertedIndexTestUtils;
@@ -54,7 +55,8 @@
                 invIndex.activate();
             }
             // Perform merge.
-            invIndexAccessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE);
+            invIndexAccessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE,
+                    ((LSMInvertedIndex) invIndex).getImmutableComponents());
             validateAndCheckIndex(testCtx);
             runTinySearchWorkload(testCtx, tupleGen);
         }
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/PartitionedLSMInvertedIndexMergeTest.java b/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/PartitionedLSMInvertedIndexMergeTest.java
index 523557d..53076fa 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/PartitionedLSMInvertedIndexMergeTest.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/PartitionedLSMInvertedIndexMergeTest.java
@@ -25,6 +25,7 @@
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallback;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.common.AbstractInvertedIndexLoadTest;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.impls.LSMInvertedIndex;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util.LSMInvertedIndexTestContext;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util.LSMInvertedIndexTestContext.InvertedIndexType;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util.LSMInvertedIndexTestUtils;
@@ -54,7 +55,8 @@
                 invIndex.activate();
             }
             // Perform merge.
-            invIndexAccessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE);
+            invIndexAccessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE,
+                    ((LSMInvertedIndex) invIndex).getImmutableComponents());
             validateAndCheckIndex(testCtx);
             runTinySearchWorkload(testCtx, tupleGen);
         }
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/multithread/LSMInvertedIndexTestWorker.java b/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/multithread/LSMInvertedIndexTestWorker.java
index e1570af..35afd09 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/multithread/LSMInvertedIndexTestWorker.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/multithread/LSMInvertedIndexTestWorker.java
@@ -109,7 +109,7 @@
             }
 
             case MERGE: {
-                accessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE);
+                accessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE, invIndex.getImmutableComponents());
                 break;
             }
 
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeMergeTestDriver.java b/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeMergeTestDriver.java
index 18528c4..a410b2a 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeMergeTestDriver.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeMergeTestDriver.java
@@ -23,6 +23,7 @@
 import edu.uci.ics.hyracks.storage.am.config.AccessMethodTestsConfig;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallback;
+import edu.uci.ics.hyracks.storage.am.lsm.rtree.impls.AbstractLSMRTree;
 import edu.uci.ics.hyracks.storage.am.rtree.AbstractRTreeTestContext;
 import edu.uci.ics.hyracks.storage.am.rtree.AbstractRTreeTestDriver;
 import edu.uci.ics.hyracks.storage.am.rtree.RTreeTestUtils;
@@ -72,7 +73,8 @@
             }
 
             ILSMIndexAccessor accessor = (ILSMIndexAccessor) ctx.getIndexAccessor();
-            accessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE);
+            accessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE,
+                    ((AbstractLSMRTree) ctx.getIndex()).getImmutableComponents());
 
             rTreeTestUtils.checkScan(ctx);
             rTreeTestUtils.checkDiskOrderScan(ctx);
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeTestWorker.java b/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeTestWorker.java
index b3fddc8..c7a86d9 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeTestWorker.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeTestWorker.java
@@ -73,7 +73,7 @@
                 break;
 
             case MERGE:
-                accessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE);
+                accessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE, lsmRTree.getImmutableComponents());
                 break;
 
             default:
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeWithAntiMatterTuplesTestWorker.java b/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeWithAntiMatterTuplesTestWorker.java
index 12d7742..383cbc4 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeWithAntiMatterTuplesTestWorker.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeWithAntiMatterTuplesTestWorker.java
@@ -25,6 +25,7 @@
 import edu.uci.ics.hyracks.storage.am.common.datagen.DataGenThread;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallback;
+import edu.uci.ics.hyracks.storage.am.lsm.rtree.impls.AbstractLSMRTree;
 import edu.uci.ics.hyracks.storage.am.lsm.rtree.impls.LSMRTreeWithAntiMatterTuples.LSMRTreeWithAntiMatterTuplesAccessor;
 import edu.uci.ics.hyracks.storage.am.rtree.impls.SearchPredicate;
 
@@ -61,7 +62,8 @@
                 break;
 
             case MERGE:
-                accessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE);
+                accessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE,
+                        ((AbstractLSMRTree) lsmRTree).getImmutableComponents());
                 break;
 
             default:
diff --git a/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/edu/uci/ics/hyracks/storage/common/BufferCacheTest.java b/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/edu/uci/ics/hyracks/storage/common/BufferCacheTest.java
index 7e71aea..c67386e 100644
--- a/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/edu/uci/ics/hyracks/storage/common/BufferCacheTest.java
+++ b/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/edu/uci/ics/hyracks/storage/common/BufferCacheTest.java
@@ -96,8 +96,9 @@
         }
 
         bufferCache.closeFile(fileId);
-
-        boolean exceptionThrown = false;
+        
+        // This code is commented because the method pinSanityCheck in the BufferCache is commented.
+        /*boolean exceptionThrown = false;
 
         // tryPin should fail since file is not open
         try {
@@ -114,7 +115,7 @@
         } catch (HyracksDataException e) {
             exceptionThrown = true;
         }
-        Assert.assertTrue(exceptionThrown);
+        Assert.assertTrue(exceptionThrown);*/
 
         // open file again
         bufferCache.openFile(fileId);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
index 88edc91..b345e01 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
@@ -25,7 +25,9 @@
 import java.io.IOException;
 import java.lang.reflect.Type;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 import java.util.logging.Logger;
 
@@ -75,7 +77,7 @@
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexCreateOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeDataflowHelperFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ConstantMergePolicyProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ConstantMergePolicyFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallback;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpOperationTrackerProvider;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SynchronousSchedulerProvider;
@@ -136,6 +138,12 @@
     protected int frameSize = ClusterConfig.getFrameSize();
     protected int maxFrameNumber = (int) (((long) 32 * MB) / frameSize);
 
+    private static final Map<String, String> MERGE_POLICY_PROPERTIES;
+    static {
+        MERGE_POLICY_PROPERTIES = new HashMap<String, String>();
+        MERGE_POLICY_PROPERTIES.put("num-components", "3");
+    }
+
     protected static final String SECONDARY_INDEX_ODD = "secondary1";
     protected static final String SECONDARY_INDEX_EVEN = "secondary2";
 
@@ -478,9 +486,9 @@
 
     protected IIndexDataflowHelperFactory getIndexDataflowHelperFactory() {
         if (BspUtils.useLSM(conf)) {
-            return new LSMBTreeDataflowHelperFactory(new VirtualBufferCacheProvider(), new ConstantMergePolicyProvider(
-                    3), NoOpOperationTrackerProvider.INSTANCE, SynchronousSchedulerProvider.INSTANCE,
-                    NoOpIOOperationCallback.INSTANCE, 0.01);
+            return new LSMBTreeDataflowHelperFactory(new VirtualBufferCacheProvider(),
+                    new ConstantMergePolicyFactory(), MERGE_POLICY_PROPERTIES, NoOpOperationTrackerProvider.INSTANCE,
+                    SynchronousSchedulerProvider.INSTANCE, NoOpIOOperationCallback.INSTANCE, 0.01);
         } else {
             return new BTreeDataflowHelperFactory();
         }
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java
index acd766e..d243c8a 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java
@@ -99,7 +99,7 @@
             }
 
             @Override
-            public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+            public boolean outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
                     AggregateState state) throws HyracksDataException {
                 Pair<ArrayBackedValueStorage[], IAggregateFunction[]> aggState = (Pair<ArrayBackedValueStorage[], IAggregateFunction[]>) state.state;
                 ArrayBackedValueStorage[] aggOutput = aggState.getLeft();
@@ -113,6 +113,7 @@
                         throw new HyracksDataException(e);
                     }
                 }
+                return true;
             }
 
             @Override
@@ -121,7 +122,7 @@
             }
 
             @Override
-            public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+            public boolean outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
                     AggregateState state) throws HyracksDataException {
                 throw new IllegalStateException("this method should not be called");
             }