Add Support for Upsert Operation

This change adds support for upsert operations. it includes
creating a primary and secondary upsert operators in addition
to adding a new function "before" to the index operation call
back to correctly perform locking for the upsert operation.

Change-Id: I2705f43b6e6d187ee29b9ba5a7946d422990022a
Reviewed-on: https://asterix-gerrit.ics.uci.edu/476
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
index 5388092..7e9da44 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
@@ -29,9 +29,9 @@
     EXTENSION_OPERATOR,
     EXTERNAL_LOOKUP,
     GROUP,
-    INDEX_INSERT_DELETE,
+    INDEX_INSERT_DELETE_UPSERT,
     INNERJOIN,
-    INSERT_DELETE,
+    INSERT_DELETE_UPSERT,
     LEFTOUTERJOIN,
     LIMIT,
     MATERIALIZE,
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
index 53857d2..823ebae 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
@@ -48,13 +48,13 @@
             List<LogicalVariable> scanVariables, List<LogicalVariable> projectVariables, boolean projectPushed,
             List<LogicalVariable> minFilterVars, List<LogicalVariable> maxFilterVars, IOperatorSchema opSchema,
             IVariableTypeEnvironment typeEnv, JobGenContext context, JobSpecification jobSpec, Object implConfig)
-            throws AlgebricksException;
+                    throws AlgebricksException;
 
     public boolean scannerOperatorIsLeaf(IDataSource<S> dataSource);
 
     public Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> getWriteFileRuntime(IDataSink sink,
             int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc)
-            throws AlgebricksException;
+                    throws AlgebricksException;
 
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getResultHandleRuntime(IDataSink sink,
             int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc, boolean ordered,
@@ -63,7 +63,7 @@
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getWriteResultRuntime(IDataSource<S> dataSource,
             IOperatorSchema propagatedSchema, List<LogicalVariable> keys, LogicalVariable payLoadVar,
             List<LogicalVariable> additionalNonKeyFields, JobGenContext context, JobSpecification jobSpec)
-            throws AlgebricksException;
+                    throws AlgebricksException;
 
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInsertRuntime(IDataSource<S> dataSource,
             IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv, List<LogicalVariable> keys,
@@ -193,4 +193,16 @@
 
     public IFunctionInfo lookupFunction(FunctionIdentifier fid);
 
+    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getUpsertRuntime(IDataSource<S> dataSource,
+            IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv, List<LogicalVariable> keys,
+            LogicalVariable payLoadVar, List<LogicalVariable> additionalNonKeyFields, LogicalVariable prevPayload,
+            RecordDescriptor recordDesc, JobGenContext context, JobSpecification jobSpec) throws AlgebricksException;
+
+    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexUpsertRuntime(
+            IDataSourceIndex<I, S> dataSourceIndex, IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas,
+            IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
+            List<LogicalVariable> additionalFilteringKeys, ILogicalExpression filterExpr,
+            List<LogicalVariable> prevSecondaryKeys, LogicalVariable prevAdditionalFilteringKeys,
+            RecordDescriptor inputDesc, JobGenContext context, JobSpecification spec) throws AlgebricksException;
+
 }
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IndexInsertDeleteOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IndexInsertDeleteUpsertOperator.java
similarity index 72%
rename from algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IndexInsertDeleteOperator.java
rename to algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IndexInsertDeleteUpsertOperator.java
index e9e3b01..c06cc18 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IndexInsertDeleteOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IndexInsertDeleteUpsertOperator.java
@@ -22,21 +22,19 @@
 import java.util.List;
 
 import org.apache.commons.lang3.mutable.Mutable;
-
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
-import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSource;
 import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSourceIndex;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator.Kind;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator.Kind;
 import org.apache.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy;
 import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext;
 import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
 import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
 
-public class IndexInsertDeleteOperator extends AbstractLogicalOperator {
+public class IndexInsertDeleteUpsertOperator extends AbstractLogicalOperator {
 
     private final IDataSourceIndex<?, ?> dataSourceIndex;
     private final List<Mutable<ILogicalExpression>> primaryKeyExprs;
@@ -48,8 +46,11 @@
     private final Kind operation;
     private final boolean bulkload;
     private List<Mutable<ILogicalExpression>> additionalFilteringExpressions;
+    // used for upsert operations
+    private List<Mutable<ILogicalExpression>> prevSecondaryKeyExprs;
+    private Mutable<ILogicalExpression> prevAdditionalFilteringExpression;
 
-    public IndexInsertDeleteOperator(IDataSourceIndex<?, ?> dataSourceIndex,
+    public IndexInsertDeleteUpsertOperator(IDataSourceIndex<?, ?> dataSourceIndex,
             List<Mutable<ILogicalExpression>> primaryKeyExprs, List<Mutable<ILogicalExpression>> secondaryKeyExprs,
             Mutable<ILogicalExpression> filterExpr, Kind operation, boolean bulkload) {
         this.dataSourceIndex = dataSourceIndex;
@@ -69,22 +70,44 @@
     @Override
     public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform visitor) throws AlgebricksException {
         boolean b = false;
+        // Primary
         for (int i = 0; i < primaryKeyExprs.size(); i++) {
             if (visitor.transform(primaryKeyExprs.get(i))) {
                 b = true;
             }
         }
+        // Secondary
         for (int i = 0; i < secondaryKeyExprs.size(); i++) {
             if (visitor.transform(secondaryKeyExprs.get(i))) {
                 b = true;
             }
         }
+        // Additional Filtering <For upsert>
+        if (additionalFilteringExpressions != null) {
+            for (int i = 0; i < additionalFilteringExpressions.size(); i++) {
+                if (visitor.transform(additionalFilteringExpressions.get(i))) {
+                    b = true;
+                }
+            }
+        }
+        // Old secondary <For upsert>
+        if (prevSecondaryKeyExprs != null) {
+            for (int i = 0; i < prevSecondaryKeyExprs.size(); i++) {
+                if (visitor.transform(prevSecondaryKeyExprs.get(i))) {
+                    b = true;
+                }
+            }
+        }
+        // Old Filtering <For upsert>
+        if (prevAdditionalFilteringExpression != null) {
+            visitor.transform(prevAdditionalFilteringExpression);
+        }
         return b;
     }
 
     @Override
     public <R, T> R accept(ILogicalOperatorVisitor<R, T> visitor, T arg) throws AlgebricksException {
-        return visitor.visitIndexInsertDeleteOperator(this, arg);
+        return visitor.visitIndexInsertDeleteUpsertOperator(this, arg);
     }
 
     @Override
@@ -99,7 +122,7 @@
 
     @Override
     public LogicalOperatorTag getOperatorTag() {
-        return LogicalOperatorTag.INDEX_INSERT_DELETE;
+        return LogicalOperatorTag.INDEX_INSERT_DELETE_UPSERT;
     }
 
     @Override
@@ -143,4 +166,19 @@
         return additionalFilteringExpressions;
     }
 
+    public List<Mutable<ILogicalExpression>> getPrevSecondaryKeyExprs() {
+        return prevSecondaryKeyExprs;
+    }
+
+    public void setPrevSecondaryKeyExprs(List<Mutable<ILogicalExpression>> prevSecondaryKeyExprs) {
+        this.prevSecondaryKeyExprs = prevSecondaryKeyExprs;
+    }
+
+    public Mutable<ILogicalExpression> getPrevAdditionalFilteringExpression() {
+        return prevAdditionalFilteringExpression;
+    }
+
+    public void setPrevAdditionalFilteringExpression(Mutable<ILogicalExpression> prevAdditionalFilteringExpression) {
+        this.prevAdditionalFilteringExpression = prevAdditionalFilteringExpression;
+    }
 }
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteOperator.java
deleted file mode 100644
index b95d279..0000000
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteOperator.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.algebricks.core.algebra.operators.logical;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.lang3.mutable.Mutable;
-
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
-import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSource;
-import org.apache.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy;
-import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext;
-import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
-import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
-
-public class InsertDeleteOperator extends AbstractLogicalOperator {
-
-    public enum Kind {
-        INSERT,
-        DELETE
-    }
-
-    private final IDataSource<?> dataSource;
-
-    private final Mutable<ILogicalExpression> payloadExpr;
-
-    private final List<Mutable<ILogicalExpression>> primaryKeyExprs;
-
-    private final Kind operation;
-    private List<Mutable<ILogicalExpression>> additionalFilteringExpressions;
-
-    private final boolean bulkload;
-
-    public InsertDeleteOperator(IDataSource<?> dataSource, Mutable<ILogicalExpression> payloadExpr,
-            List<Mutable<ILogicalExpression>> primaryKeyExprs, Kind operation, boolean bulkload) {
-        this.dataSource = dataSource;
-        this.payloadExpr = payloadExpr;
-        this.primaryKeyExprs = primaryKeyExprs;
-        this.operation = operation;
-        this.bulkload = bulkload;
-    }
-
-    @Override
-    public void recomputeSchema() throws AlgebricksException {
-        schema = new ArrayList<LogicalVariable>();
-        schema.addAll(inputs.get(0).getValue().getSchema());
-    }
-
-    @Override
-    public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform transform) throws AlgebricksException {
-        boolean changed = false;
-        changed = transform.transform(payloadExpr);
-        for (Mutable<ILogicalExpression> e : primaryKeyExprs) {
-            changed |= transform.transform(e);
-        }
-        return changed;
-    }
-
-    @Override
-    public <R, T> R accept(ILogicalOperatorVisitor<R, T> visitor, T arg) throws AlgebricksException {
-        return visitor.visitInsertDeleteOperator(this, arg);
-    }
-
-    @Override
-    public boolean isMap() {
-        return false;
-    }
-
-    @Override
-    public VariablePropagationPolicy getVariablePropagationPolicy() {
-        return VariablePropagationPolicy.ALL;
-    }
-
-    @Override
-    public LogicalOperatorTag getOperatorTag() {
-        return LogicalOperatorTag.INSERT_DELETE;
-    }
-
-    @Override
-    public IVariableTypeEnvironment computeOutputTypeEnvironment(ITypingContext ctx) throws AlgebricksException {
-        return createPropagatingAllInputsTypeEnvironment(ctx);
-    }
-
-    public List<Mutable<ILogicalExpression>> getPrimaryKeyExpressions() {
-        return primaryKeyExprs;
-    }
-
-    public IDataSource<?> getDataSource() {
-        return dataSource;
-    }
-
-    public Mutable<ILogicalExpression> getPayloadExpression() {
-        return payloadExpr;
-    }
-
-    public Kind getOperation() {
-        return operation;
-    }
-
-    public boolean isBulkload() {
-        return bulkload;
-	}
-
-    public void setAdditionalFilteringExpressions(List<Mutable<ILogicalExpression>> additionalFilteringExpressions) {
-        this.additionalFilteringExpressions = additionalFilteringExpressions;
-    }
-
-    public List<Mutable<ILogicalExpression>> getAdditionalFilteringExpressions() {
-        return additionalFilteringExpressions;
-    }
-
-}
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteUpsertOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteUpsertOperator.java
new file mode 100644
index 0000000..607db69
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteUpsertOperator.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.core.algebra.operators.logical;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSource;
+import org.apache.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy;
+import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext;
+import org.apache.hyracks.algebricks.core.algebra.typing.PropagatingTypeEnvironment;
+import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
+import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
+
+public class InsertDeleteUpsertOperator extends AbstractLogicalOperator {
+
+    public enum Kind {
+        INSERT,
+        DELETE,
+        UPSERT
+    }
+
+    private final IDataSource<?> dataSource;
+    private final Mutable<ILogicalExpression> payloadExpr;
+    private final List<Mutable<ILogicalExpression>> primaryKeyExprs;
+    private final Kind operation;
+    private final boolean bulkload;
+    private List<Mutable<ILogicalExpression>> additionalFilteringExpressions;
+    private LogicalVariable prevRecordVar;
+    private Object prevRecordType;
+    private LogicalVariable prevFilterVar;
+    private Object prevFilterType;
+
+    public InsertDeleteUpsertOperator(IDataSource<?> dataSource, Mutable<ILogicalExpression> payloadExpr,
+            List<Mutable<ILogicalExpression>> primaryKeyExprs, Kind operation, boolean bulkload) {
+        this.dataSource = dataSource;
+        this.payloadExpr = payloadExpr;
+        this.primaryKeyExprs = primaryKeyExprs;
+        this.operation = operation;
+        this.bulkload = bulkload;
+    }
+
+    @Override
+    public void recomputeSchema() throws AlgebricksException {
+        schema = new ArrayList<LogicalVariable>();
+        schema.addAll(inputs.get(0).getValue().getSchema());
+        if (operation == Kind.UPSERT) {
+            // The upsert case also produces the previous record
+            schema.add(prevRecordVar);
+            if (prevFilterVar != null) {
+                schema.add(prevFilterVar);
+            }
+        }
+    }
+
+    public void getProducedVariables(Collection<LogicalVariable> producedVariables) {
+        if (prevRecordVar != null) {
+            producedVariables.add(prevRecordVar);
+        }
+        if (prevFilterVar != null) {
+            producedVariables.add(prevFilterVar);
+        }
+    }
+
+    @Override
+    public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform transform)
+            throws AlgebricksException {
+        boolean changed = false;
+        changed = transform.transform(payloadExpr);
+        for (Mutable<ILogicalExpression> e : primaryKeyExprs) {
+            changed |= transform.transform(e);
+        }
+        if (additionalFilteringExpressions != null) {
+            for (Mutable<ILogicalExpression> e : additionalFilteringExpressions) {
+                changed |= transform.transform(e);
+            }
+        }
+        return changed;
+    }
+
+    @Override
+    public <R, T> R accept(ILogicalOperatorVisitor<R, T> visitor, T arg) throws AlgebricksException {
+        return visitor.visitInsertDeleteUpsertOperator(this, arg);
+    }
+
+    @Override
+    public boolean isMap() {
+        return false;
+    }
+
+    @Override
+    public VariablePropagationPolicy getVariablePropagationPolicy() {
+        return new VariablePropagationPolicy() {
+            @Override
+            public void propagateVariables(IOperatorSchema target, IOperatorSchema... sources)
+                    throws AlgebricksException {
+                target.addAllVariables(sources[0]);
+                if (operation == Kind.UPSERT) {
+                    target.addVariable(prevRecordVar);
+                    if (prevFilterVar != null) {
+                        target.addVariable(prevFilterVar);
+                    }
+                }
+            }
+        };
+    }
+
+    @Override
+    public LogicalOperatorTag getOperatorTag() {
+        return LogicalOperatorTag.INSERT_DELETE_UPSERT;
+    }
+
+    @Override
+    public IVariableTypeEnvironment computeOutputTypeEnvironment(ITypingContext ctx) throws AlgebricksException {
+        PropagatingTypeEnvironment env = createPropagatingAllInputsTypeEnvironment(ctx);
+        if (operation == Kind.UPSERT) {
+            env.setVarType(prevRecordVar, prevRecordType);
+            if (prevFilterVar != null) {
+                env.setVarType(prevFilterVar, prevFilterType);
+            }
+        }
+        return env;
+    }
+
+    public List<Mutable<ILogicalExpression>> getPrimaryKeyExpressions() {
+        return primaryKeyExprs;
+    }
+
+    public IDataSource<?> getDataSource() {
+        return dataSource;
+    }
+
+    public Mutable<ILogicalExpression> getPayloadExpression() {
+        return payloadExpr;
+    }
+
+    public Kind getOperation() {
+        return operation;
+    }
+
+    public boolean isBulkload() {
+        return bulkload;
+    }
+
+    public void setAdditionalFilteringExpressions(List<Mutable<ILogicalExpression>> additionalFilteringExpressions) {
+        this.additionalFilteringExpressions = additionalFilteringExpressions;
+    }
+
+    public List<Mutable<ILogicalExpression>> getAdditionalFilteringExpressions() {
+        return additionalFilteringExpressions;
+    }
+
+    public LogicalVariable getPrevRecordVar() {
+        return prevRecordVar;
+    }
+
+    public void setPrevRecordVar(LogicalVariable prevRecordVar) {
+        this.prevRecordVar = prevRecordVar;
+    }
+
+    public void setPrevRecordType(Object recordType) {
+        prevRecordType = recordType;
+    }
+
+    public LogicalVariable getPrevFilterVar() {
+        return prevFilterVar;
+    }
+
+    public void setPrevFilterVar(LogicalVariable prevFilterVar) {
+        this.prevFilterVar = prevFilterVar;
+    }
+
+    public Object getPrevFilterType() {
+        return prevFilterType;
+    }
+
+    public void setPrevFilterType(Object prevFilterType) {
+        this.prevFilterType = prevFilterType;
+    }
+}
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ReplicateOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ReplicateOperator.java
index 2ba4969..343ace8 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ReplicateOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ReplicateOperator.java
@@ -22,7 +22,6 @@
 import java.util.List;
 
 import org.apache.commons.lang3.mutable.Mutable;
-
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
@@ -62,7 +61,8 @@
     }
 
     @Override
-    public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform transform) throws AlgebricksException {
+    public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform transform)
+            throws AlgebricksException {
         return false;
     }
 
@@ -73,7 +73,7 @@
 
     @Override
     public boolean isMap() {
-        return false;
+        return true;
     }
 
     @Override
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/TokenizeOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/TokenizeOperator.java
index e1a793c..c69ead7 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/TokenizeOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/TokenizeOperator.java
@@ -29,7 +29,7 @@
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
 import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSourceIndex;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator.Kind;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator.Kind;
 import org.apache.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy;
 import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext;
 import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
index 25993bb..b2c97c3 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
@@ -56,9 +56,9 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExternalDataLookupOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
@@ -509,14 +509,14 @@
     }
 
     @Override
-    public Void visitInsertDeleteOperator(InsertDeleteOperator op, IOptimizationContext ctx)
+    public Void visitInsertDeleteUpsertOperator(InsertDeleteUpsertOperator op, IOptimizationContext ctx)
             throws AlgebricksException {
         propagateFDsAndEquivClasses(op, ctx);
         return null;
     }
 
     @Override
-    public Void visitIndexInsertDeleteOperator(IndexInsertDeleteOperator op, IOptimizationContext ctx)
+    public Void visitIndexInsertDeleteUpsertOperator(IndexInsertDeleteUpsertOperator op, IOptimizationContext ctx)
             throws AlgebricksException {
         propagateFDsAndEquivClasses(op, ctx);
         return null;
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
index dc535ea..7a4e7e1 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
@@ -45,9 +45,9 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExternalDataLookupOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
@@ -454,11 +454,11 @@
     }
 
     @Override
-    public Boolean visitInsertDeleteOperator(InsertDeleteOperator op, ILogicalOperator arg) throws AlgebricksException {
+    public Boolean visitInsertDeleteUpsertOperator(InsertDeleteUpsertOperator op, ILogicalOperator arg) throws AlgebricksException {
         AbstractLogicalOperator aop = (AbstractLogicalOperator) arg;
-        if (aop.getOperatorTag() != LogicalOperatorTag.INSERT_DELETE)
+        if (aop.getOperatorTag() != LogicalOperatorTag.INSERT_DELETE_UPSERT)
             return Boolean.FALSE;
-        InsertDeleteOperator insertOpArg = (InsertDeleteOperator) copyAndSubstituteVar(op, arg);
+        InsertDeleteUpsertOperator insertOpArg = (InsertDeleteUpsertOperator) copyAndSubstituteVar(op, arg);
         boolean isomorphic = VariableUtilities.varListEqualUnordered(op.getSchema(), insertOpArg.getSchema());
         if (!op.getDataSource().equals(insertOpArg.getDataSource()))
             isomorphic = false;
@@ -468,12 +468,12 @@
     }
 
     @Override
-    public Boolean visitIndexInsertDeleteOperator(IndexInsertDeleteOperator op, ILogicalOperator arg)
+    public Boolean visitIndexInsertDeleteUpsertOperator(IndexInsertDeleteUpsertOperator op, ILogicalOperator arg)
             throws AlgebricksException {
         AbstractLogicalOperator aop = (AbstractLogicalOperator) arg;
-        if (aop.getOperatorTag() != LogicalOperatorTag.INDEX_INSERT_DELETE)
+        if (aop.getOperatorTag() != LogicalOperatorTag.INDEX_INSERT_DELETE_UPSERT)
             return Boolean.FALSE;
-        IndexInsertDeleteOperator insertOpArg = (IndexInsertDeleteOperator) copyAndSubstituteVar(op, arg);
+        IndexInsertDeleteUpsertOperator insertOpArg = (IndexInsertDeleteUpsertOperator) copyAndSubstituteVar(op, arg);
         boolean isomorphic = VariableUtilities.varListEqualUnordered(op.getSchema(), insertOpArg.getSchema());
         if (!op.getDataSourceIndex().equals(insertOpArg.getDataSourceIndex()))
             isomorphic = false;
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
index 376c2bc..c46ffde 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
@@ -46,9 +46,9 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExternalDataLookupOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
@@ -251,13 +251,13 @@
     }
 
     @Override
-    public Void visitInsertDeleteOperator(InsertDeleteOperator op, ILogicalOperator arg) throws AlgebricksException {
+    public Void visitInsertDeleteUpsertOperator(InsertDeleteUpsertOperator op, ILogicalOperator arg) throws AlgebricksException {
         mapVariablesStandard(op, arg);
         return null;
     }
 
     @Override
-    public Void visitIndexInsertDeleteOperator(IndexInsertDeleteOperator op, ILogicalOperator arg)
+    public Void visitIndexInsertDeleteUpsertOperator(IndexInsertDeleteUpsertOperator op, ILogicalOperator arg)
             throws AlgebricksException {
         mapVariablesStandard(op, arg);
         return null;
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
index 8e378bc..4e5b13e 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
@@ -37,9 +37,9 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExternalDataLookupOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
@@ -259,14 +259,14 @@
     }
 
     @Override
-    public Void visitInsertDeleteOperator(InsertDeleteOperator op, IOptimizationContext arg)
+    public Void visitInsertDeleteUpsertOperator(InsertDeleteUpsertOperator op, IOptimizationContext arg)
             throws AlgebricksException {
         // TODO Auto-generated method stub
         return null;
     }
 
     @Override
-    public Void visitIndexInsertDeleteOperator(IndexInsertDeleteOperator op, IOptimizationContext arg)
+    public Void visitIndexInsertDeleteUpsertOperator(IndexInsertDeleteUpsertOperator op, IOptimizationContext arg)
             throws AlgebricksException {
         // TODO Auto-generated method stub
         return null;
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
index 5cf30c7..2e402fc 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
@@ -42,9 +42,9 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExternalDataLookupOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
@@ -266,12 +266,12 @@
     }
 
     @Override
-    public ILogicalOperator visitInsertDeleteOperator(InsertDeleteOperator op, Void arg) throws AlgebricksException {
+    public ILogicalOperator visitInsertDeleteUpsertOperator(InsertDeleteUpsertOperator op, Void arg) throws AlgebricksException {
         List<Mutable<ILogicalExpression>> newKeyExpressions = new ArrayList<Mutable<ILogicalExpression>>();
         deepCopyExpressionRefs(newKeyExpressions, op.getPrimaryKeyExpressions());
         List<Mutable<ILogicalExpression>> newLSMComponentFilterExpressions = new ArrayList<Mutable<ILogicalExpression>>();
         deepCopyExpressionRefs(newKeyExpressions, op.getAdditionalFilteringExpressions());
-        InsertDeleteOperator insertDeleteOp = new InsertDeleteOperator(op.getDataSource(),
+        InsertDeleteUpsertOperator insertDeleteOp = new InsertDeleteUpsertOperator(op.getDataSource(),
                 deepCopyExpressionRef(op.getPayloadExpression()), newKeyExpressions, op.getOperation(),
                 op.isBulkload());
         insertDeleteOp.setAdditionalFilteringExpressions(newLSMComponentFilterExpressions);
@@ -279,7 +279,7 @@
     }
 
     @Override
-    public ILogicalOperator visitIndexInsertDeleteOperator(IndexInsertDeleteOperator op, Void arg)
+    public ILogicalOperator visitIndexInsertDeleteUpsertOperator(IndexInsertDeleteUpsertOperator op, Void arg)
             throws AlgebricksException {
         List<Mutable<ILogicalExpression>> newPrimaryKeyExpressions = new ArrayList<Mutable<ILogicalExpression>>();
         deepCopyExpressionRefs(newPrimaryKeyExpressions, op.getPrimaryKeyExpressions());
@@ -289,7 +289,7 @@
                 ((AbstractLogicalExpression) op.getFilterExpression()).cloneExpression());
         List<Mutable<ILogicalExpression>> newLSMComponentFilterExpressions = new ArrayList<Mutable<ILogicalExpression>>();
         deepCopyExpressionRefs(newLSMComponentFilterExpressions, op.getAdditionalFilteringExpressions());
-        IndexInsertDeleteOperator indexInsertDeleteOp = new IndexInsertDeleteOperator(op.getDataSourceIndex(),
+        IndexInsertDeleteUpsertOperator indexInsertDeleteOp = new IndexInsertDeleteUpsertOperator(op.getDataSourceIndex(),
                 newPrimaryKeyExpressions, newSecondaryKeyExpressions, newFilterExpression, op.getOperation(),
                 op.isBulkload());
         indexInsertDeleteOp.setAdditionalFilteringExpressions(newLSMComponentFilterExpressions);
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
index da5d2b2..8df772b 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
@@ -23,7 +23,6 @@
 import java.util.List;
 
 import org.apache.commons.lang3.mutable.Mutable;
-
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.common.utils.Triple;
@@ -42,9 +41,9 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExternalDataLookupOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
@@ -241,12 +240,14 @@
     }
 
     @Override
-    public Void visitInsertDeleteOperator(InsertDeleteOperator op, Void arg) throws AlgebricksException {
+    public Void visitInsertDeleteUpsertOperator(InsertDeleteUpsertOperator op, Void arg) throws AlgebricksException {
+        op.getProducedVariables(producedVariables);
         return null;
     }
 
     @Override
-    public Void visitIndexInsertDeleteOperator(IndexInsertDeleteOperator op, Void arg) throws AlgebricksException {
+    public Void visitIndexInsertDeleteUpsertOperator(IndexInsertDeleteUpsertOperator op, Void arg)
+            throws AlgebricksException {
         return null;
     }
 
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
index dc89c12..b488df1 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
@@ -42,9 +42,9 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExternalDataLookupOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
@@ -283,13 +283,13 @@
     }
 
     @Override
-    public Void visitInsertDeleteOperator(InsertDeleteOperator op, Void arg) throws AlgebricksException {
+    public Void visitInsertDeleteUpsertOperator(InsertDeleteUpsertOperator op, Void arg) throws AlgebricksException {
         standardLayout(op);
         return null;
     }
 
     @Override
-    public Void visitIndexInsertDeleteOperator(IndexInsertDeleteOperator op, Void arg) throws AlgebricksException {
+    public Void visitIndexInsertDeleteUpsertOperator(IndexInsertDeleteUpsertOperator op, Void arg) throws AlgebricksException {
         standardLayout(op);
         return null;
     }
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
index 4b791c1..91ff073 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
@@ -40,9 +40,9 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExternalDataLookupOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
@@ -388,7 +388,7 @@
     }
 
     @Override
-    public Void visitInsertDeleteOperator(InsertDeleteOperator op, Pair<LogicalVariable, LogicalVariable> pair)
+    public Void visitInsertDeleteUpsertOperator(InsertDeleteUpsertOperator op, Pair<LogicalVariable, LogicalVariable> pair)
             throws AlgebricksException {
         op.getPayloadExpression().getValue().substituteVar(pair.first, pair.second);
         for (Mutable<ILogicalExpression> e : op.getPrimaryKeyExpressions()) {
@@ -399,7 +399,7 @@
     }
 
     @Override
-    public Void visitIndexInsertDeleteOperator(IndexInsertDeleteOperator op,
+    public Void visitIndexInsertDeleteUpsertOperator(IndexInsertDeleteUpsertOperator op,
             Pair<LogicalVariable, LogicalVariable> pair) throws AlgebricksException {
         for (Mutable<ILogicalExpression> e : op.getPrimaryKeyExpressions()) {
             e.getValue().substituteVar(pair.first, pair.second);
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
index e82b4f7..cfc57c2 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
@@ -40,9 +40,9 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExternalDataLookupOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
@@ -344,11 +344,15 @@
     }
 
     @Override
-    public Void visitInsertDeleteOperator(InsertDeleteOperator op, Void arg) {
+    public Void visitInsertDeleteUpsertOperator(InsertDeleteUpsertOperator op, Void arg) {
+        //1. The record variable
         op.getPayloadExpression().getValue().getUsedVariables(usedVariables);
+
+        //2. The primary key variables
         for (Mutable<ILogicalExpression> e : op.getPrimaryKeyExpressions()) {
             e.getValue().getUsedVariables(usedVariables);
         }
+        //3. The filters variables
         if (op.getAdditionalFilteringExpressions() != null) {
             for (Mutable<ILogicalExpression> e : op.getAdditionalFilteringExpressions()) {
                 e.getValue().getUsedVariables(usedVariables);
@@ -358,7 +362,7 @@
     }
 
     @Override
-    public Void visitIndexInsertDeleteOperator(IndexInsertDeleteOperator op, Void arg) {
+    public Void visitIndexInsertDeleteUpsertOperator(IndexInsertDeleteUpsertOperator op, Void arg) {
         for (Mutable<ILogicalExpression> e : op.getPrimaryKeyExpressions()) {
             e.getValue().getUsedVariables(usedVariables);
         }
@@ -370,6 +374,14 @@
                 e.getValue().getUsedVariables(usedVariables);
             }
         }
+        if (op.getPrevAdditionalFilteringExpression() != null) {
+            op.getPrevAdditionalFilteringExpression().getValue().getUsedVariables(usedVariables);
+        }
+        if (op.getPrevSecondaryKeyExprs() != null) {
+            for (Mutable<ILogicalExpression> e : op.getPrevSecondaryKeyExprs()) {
+                e.getValue().getUsedVariables(usedVariables);
+            }
+        }
         return null;
     }
 
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/VariableUtilities.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/VariableUtilities.java
index 0352f83..28f783d 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/VariableUtilities.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/VariableUtilities.java
@@ -34,18 +34,43 @@
 
 public class VariableUtilities {
 
+    /***
+     * Adds the used variables in the logical operator to the list of used variables
+     *
+     * @param op
+     *          The target operator
+     * @param usedVariables
+     *          A list to be filled with variables used in the logical operator op.
+     * @throws AlgebricksException
+     */
     public static void getUsedVariables(ILogicalOperator op, Collection<LogicalVariable> usedVariables)
             throws AlgebricksException {
         ILogicalOperatorVisitor<Void, Void> visitor = new UsedVariableVisitor(usedVariables);
         op.accept(visitor, null);
     }
 
+    /**
+     * Adds the variables produced in the logical operator in the list of produced variables
+     * @param op
+     *          The target operator
+     * @param producedVariables
+     *          The variables produced in the logical operator
+     * @throws AlgebricksException
+     */
     public static void getProducedVariables(ILogicalOperator op, Collection<LogicalVariable> producedVariables)
             throws AlgebricksException {
         ILogicalOperatorVisitor<Void, Void> visitor = new ProducedVariableVisitor(producedVariables);
         op.accept(visitor, null);
     }
 
+    /**
+     * Adds the variables that are live after the execution of this operator to the list of schema variables.
+     * @param op
+     *          The target logical operator
+     * @param schemaVariables
+     *          The list of live variables. The output of the operator and the propagated outputs of its children
+     * @throws AlgebricksException
+     */
     public static void getLiveVariables(ILogicalOperator op, Collection<LogicalVariable> schemaVariables)
             throws AlgebricksException {
         ILogicalOperatorVisitor<Void, Void> visitor = new SchemaVariableVisitor(schemaVariables);
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BulkloadPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BulkloadPOperator.java
index 449c04f..036ac05 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BulkloadPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BulkloadPOperator.java
@@ -35,8 +35,8 @@
 import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator.Kind;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator.Kind;
 import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
 import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
 import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
@@ -92,7 +92,7 @@
     public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
             IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
             throws AlgebricksException {
-        InsertDeleteOperator insertDeleteOp = (InsertDeleteOperator) op;
+        InsertDeleteUpsertOperator insertDeleteOp = (InsertDeleteUpsertOperator) op;
         assert insertDeleteOp.getOperation() == Kind.INSERT;
         assert insertDeleteOp.isBulkload();
 
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexBulkloadPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexBulkloadPOperator.java
index 15144ab..b837bfa 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexBulkloadPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexBulkloadPOperator.java
@@ -38,8 +38,8 @@
 import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator.Kind;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator.Kind;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
 import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
 import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
@@ -117,7 +117,7 @@
     public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
             IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
             throws AlgebricksException {
-        IndexInsertDeleteOperator indexInsertDeleteOp = (IndexInsertDeleteOperator) op;
+        IndexInsertDeleteUpsertOperator indexInsertDeleteOp = (IndexInsertDeleteUpsertOperator) op;
         assert indexInsertDeleteOp.getOperation() == Kind.INSERT;
         assert indexInsertDeleteOp.isBulkload();
 
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeletePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeleteUpsertPOperator.java
similarity index 76%
rename from algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeletePOperator.java
rename to algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeleteUpsertPOperator.java
index 29673b4..4702361 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeletePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeleteUpsertPOperator.java
@@ -22,7 +22,6 @@
 import java.util.List;
 
 import org.apache.commons.lang3.mutable.Mutable;
-
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
@@ -37,29 +36,31 @@
 import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator.Kind;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator.Kind;
 import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
 import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
 import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
-import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
 import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.job.JobSpecification;
 
-public class IndexInsertDeletePOperator extends AbstractPhysicalOperator {
+public class IndexInsertDeleteUpsertPOperator extends AbstractPhysicalOperator {
 
     private final List<LogicalVariable> primaryKeys;
     private final List<LogicalVariable> secondaryKeys;
     private final ILogicalExpression filterExpr;
     private final IDataSourceIndex<?, ?> dataSourceIndex;
     private final List<LogicalVariable> additionalFilteringKeys;
+    private final List<LogicalVariable> prevSecondaryKeys;
+    private final LogicalVariable prevAdditionalFilteringKey;
 
-    public IndexInsertDeletePOperator(List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
+    public IndexInsertDeleteUpsertPOperator(List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
             List<LogicalVariable> additionalFilteringKeys, Mutable<ILogicalExpression> filterExpr,
-            IDataSourceIndex<?, ?> dataSourceIndex) {
+            IDataSourceIndex<?, ?> dataSourceIndex, List<LogicalVariable> prevSecondaryKeys,
+            LogicalVariable prevAdditionalFilteringKey) {
         this.primaryKeys = primaryKeys;
         this.secondaryKeys = secondaryKeys;
         if (filterExpr != null) {
@@ -69,6 +70,8 @@
         }
         this.dataSourceIndex = dataSourceIndex;
         this.additionalFilteringKeys = additionalFilteringKeys;
+        this.prevSecondaryKeys = prevSecondaryKeys;
+        this.prevAdditionalFilteringKey = prevAdditionalFilteringKey;
     }
 
     @Override
@@ -79,7 +82,7 @@
     @Override
     public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
         AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
-        deliveredProperties = (StructuralPropertiesVector) op2.getDeliveredPhysicalProperties().clone();
+        deliveredProperties = op2.getDeliveredPhysicalProperties().clone();
     }
 
     @Override
@@ -100,8 +103,8 @@
     @Override
     public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
             IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
-            throws AlgebricksException {
-        IndexInsertDeleteOperator insertDeleteOp = (IndexInsertDeleteOperator) op;
+                    throws AlgebricksException {
+        IndexInsertDeleteUpsertOperator insertDeleteUpsertOp = (IndexInsertDeleteUpsertOperator) op;
         IMetadataProvider mp = context.getMetadataProvider();
 
         JobSpecification spec = builder.getJobSpec();
@@ -109,18 +112,22 @@
                 context.getTypeEnvironment(op.getInputs().get(0).getValue()), inputSchemas[0], context);
 
         Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> runtimeAndConstraints = null;
-        IVariableTypeEnvironment typeEnv = context.getTypeEnvironment(insertDeleteOp);
-        if (insertDeleteOp.getOperation() == Kind.INSERT) {
+        IVariableTypeEnvironment typeEnv = context.getTypeEnvironment(insertDeleteUpsertOp);
+        if (insertDeleteUpsertOp.getOperation() == Kind.INSERT) {
             runtimeAndConstraints = mp.getIndexInsertRuntime(dataSourceIndex, propagatedSchema, inputSchemas, typeEnv,
                     primaryKeys, secondaryKeys, additionalFilteringKeys, filterExpr, inputDesc, context, spec, false);
-        } else {
+        } else if (insertDeleteUpsertOp.getOperation() == Kind.DELETE) {
             runtimeAndConstraints = mp.getIndexDeleteRuntime(dataSourceIndex, propagatedSchema, inputSchemas, typeEnv,
                     primaryKeys, secondaryKeys, additionalFilteringKeys, filterExpr, inputDesc, context, spec);
+        } else if (insertDeleteUpsertOp.getOperation() == Kind.UPSERT) {
+            runtimeAndConstraints = mp.getIndexUpsertRuntime(dataSourceIndex, propagatedSchema, inputSchemas, typeEnv,
+                    primaryKeys, secondaryKeys, additionalFilteringKeys, filterExpr, prevSecondaryKeys,
+                    prevAdditionalFilteringKey, inputDesc, context, spec);
         }
-        builder.contributeHyracksOperator(insertDeleteOp, runtimeAndConstraints.first);
+        builder.contributeHyracksOperator(insertDeleteUpsertOp, runtimeAndConstraints.first);
         builder.contributeAlgebricksPartitionConstraint(runtimeAndConstraints.first, runtimeAndConstraints.second);
-        ILogicalOperator src = insertDeleteOp.getInputs().get(0).getValue();
-        builder.contributeGraphEdge(src, 0, insertDeleteOp, 0);
+        ILogicalOperator src = insertDeleteUpsertOp.getInputs().get(0).getValue();
+        builder.contributeGraphEdge(src, 0, insertDeleteUpsertOp, 0);
     }
 
     @Override
@@ -132,4 +139,12 @@
     public boolean expensiveThanMaterialization() {
         return false;
     }
+
+    public List<LogicalVariable> getPrevSecondaryKeys() {
+        return prevSecondaryKeys;
+    }
+
+    public LogicalVariable getPrevFilteringKeys() {
+        return prevAdditionalFilteringKey;
+    }
 }
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeletePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeleteUpsertPOperator.java
similarity index 83%
rename from algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeletePOperator.java
rename to algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeleteUpsertPOperator.java
index c774f44..d844f37 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeletePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeleteUpsertPOperator.java
@@ -34,12 +34,11 @@
 import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator.Kind;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator.Kind;
 import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
 import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
 import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
-import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
 import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
@@ -47,19 +46,24 @@
 import org.apache.hyracks.api.job.JobSpecification;
 
 @SuppressWarnings("rawtypes")
-public class InsertDeletePOperator extends AbstractPhysicalOperator {
+public class InsertDeleteUpsertPOperator extends AbstractPhysicalOperator {
 
     private LogicalVariable payload;
     private List<LogicalVariable> keys;
     private IDataSource<?> dataSource;
     private final List<LogicalVariable> additionalFilteringKeys;
+    private final Kind operation;
+    private final LogicalVariable prevPayload;
 
-    public InsertDeletePOperator(LogicalVariable payload, List<LogicalVariable> keys,
-            List<LogicalVariable> additionalFilteringKeys, IDataSource dataSource) {
+    public InsertDeleteUpsertPOperator(LogicalVariable payload, List<LogicalVariable> keys,
+            List<LogicalVariable> additionalFilteringKeys, IDataSource dataSource, Kind operation,
+            LogicalVariable prevPayload) {
         this.payload = payload;
         this.keys = keys;
         this.dataSource = dataSource;
         this.additionalFilteringKeys = additionalFilteringKeys;
+        this.operation = operation;
+        this.prevPayload = prevPayload;
     }
 
     @Override
@@ -67,10 +71,11 @@
         return PhysicalOperatorTag.INSERT_DELETE;
     }
 
+    // Delivered Properties of this will be (Sorted on PK, Partitioned on PK)
     @Override
     public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
         AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
-        deliveredProperties = (StructuralPropertiesVector) op2.getDeliveredPhysicalProperties().clone();
+        deliveredProperties = op2.getDeliveredPhysicalProperties().clone();
     }
 
     @Override
@@ -90,8 +95,8 @@
     @Override
     public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
             IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
-            throws AlgebricksException {
-        InsertDeleteOperator insertDeleteOp = (InsertDeleteOperator) op;
+                    throws AlgebricksException {
+        InsertDeleteUpsertOperator insertDeleteOp = (InsertDeleteUpsertOperator) op;
         IMetadataProvider mp = context.getMetadataProvider();
         IVariableTypeEnvironment typeEnv = context.getTypeEnvironment(op);
         JobSpecification spec = builder.getJobSpec();
@@ -99,12 +104,17 @@
                 context.getTypeEnvironment(op.getInputs().get(0).getValue()), inputSchemas[0], context);
 
         Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> runtimeAndConstraints = null;
-        if (insertDeleteOp.getOperation() == Kind.INSERT) {
+        if (operation == Kind.INSERT) {
             runtimeAndConstraints = mp.getInsertRuntime(dataSource, propagatedSchema, typeEnv, keys, payload,
                     additionalFilteringKeys, inputDesc, context, spec, false);
-        } else {
+        } else if (operation == Kind.DELETE) {
             runtimeAndConstraints = mp.getDeleteRuntime(dataSource, propagatedSchema, typeEnv, keys, payload,
                     additionalFilteringKeys, inputDesc, context, spec);
+        } else if (operation == Kind.UPSERT) {
+            runtimeAndConstraints = mp.getUpsertRuntime(dataSource, propagatedSchema, typeEnv, keys, payload,
+                    additionalFilteringKeys, prevPayload, inputDesc, context, spec);
+        } else {
+            throw new AlgebricksException("Unsupported Operation " + operation);
         }
 
         builder.contributeHyracksOperator(insertDeleteOp, runtimeAndConstraints.first);
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/TokenizePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/TokenizePOperator.java
index e1983d2..98427fe 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/TokenizePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/TokenizePOperator.java
@@ -35,7 +35,7 @@
 import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator.Kind;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator.Kind;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
 import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
 import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
index 17f2285..cf0f1c2 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
@@ -38,10 +38,10 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExternalDataLookupOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator.Kind;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator.Kind;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
@@ -325,12 +325,16 @@
     }
 
     @Override
-    public String visitInsertDeleteOperator(InsertDeleteOperator op, Integer indent) throws AlgebricksException {
+    public String visitInsertDeleteUpsertOperator(InsertDeleteUpsertOperator op, Integer indent)
+            throws AlgebricksException {
         StringBuilder buffer = new StringBuilder();
-        String header = op.getOperation() == Kind.INSERT ? "insert into " : "delete from ";
+        String header = getIndexOpString(op.getOperation());
         addIndent(buffer, indent).append(header).append(op.getDataSource()).append(" from ")
                 .append(op.getPayloadExpression().getValue().accept(exprVisitor, indent)).append(" partitioned by ");
         pprintExprList(op.getPrimaryKeyExpressions(), buffer, indent);
+        if (op.getOperation() == Kind.UPSERT) {
+            buffer.append(" out: ([" + op.getPrevRecordVar() + "] <-{record-before-upsert}) ");
+        }
         if (op.isBulkload()) {
             buffer.append(" [bulkload]");
         }
@@ -338,19 +342,38 @@
     }
 
     @Override
-    public String visitIndexInsertDeleteOperator(IndexInsertDeleteOperator op, Integer indent)
+    public String visitIndexInsertDeleteUpsertOperator(IndexInsertDeleteUpsertOperator op, Integer indent)
             throws AlgebricksException {
         StringBuilder buffer = new StringBuilder();
-        String header = op.getOperation() == Kind.INSERT ? "insert into " : "delete from ";
+        String header = getIndexOpString(op.getOperation());
         addIndent(buffer, indent).append(header).append(op.getIndexName()).append(" on ")
                 .append(op.getDataSourceIndex().getDataSource()).append(" from ");
-        pprintExprList(op.getSecondaryKeyExpressions(), buffer, indent);
+        if (op.getOperation() == Kind.UPSERT) {
+            buffer.append(" replace:");
+            pprintExprList(op.getPrevSecondaryKeyExprs(), buffer, indent);
+            buffer.append(" with:");
+            pprintExprList(op.getSecondaryKeyExpressions(), buffer, indent);
+        } else {
+            pprintExprList(op.getSecondaryKeyExpressions(), buffer, indent);
+        }
         if (op.isBulkload()) {
             buffer.append(" [bulkload]");
         }
         return buffer.toString();
     }
 
+    public String getIndexOpString(Kind opKind) {
+        switch (opKind) {
+            case DELETE:
+                return "delete from ";
+            case INSERT:
+                return "insert into ";
+            case UPSERT:
+                return "upsert into ";
+        }
+        return null;
+    }
+
     @Override
     public String visitTokenizeOperator(TokenizeOperator op, Integer indent) throws AlgebricksException {
         StringBuilder buffer = new StringBuilder();
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
index 285812c..53c8b69 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
@@ -29,9 +29,9 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExternalDataLookupOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
@@ -113,9 +113,9 @@
 
     public R visitWriteResultOperator(WriteResultOperator op, T arg) throws AlgebricksException;
 
-    public R visitInsertDeleteOperator(InsertDeleteOperator op, T tag) throws AlgebricksException;
+    public R visitInsertDeleteUpsertOperator(InsertDeleteUpsertOperator op, T tag) throws AlgebricksException;
 
-    public R visitIndexInsertDeleteOperator(IndexInsertDeleteOperator op, T tag) throws AlgebricksException;
+    public R visitIndexInsertDeleteUpsertOperator(IndexInsertDeleteUpsertOperator op, T tag) throws AlgebricksException;
 
     public R visitExternalDataLookupOperator(ExternalDataLookupOperator op, T arg) throws AlgebricksException;
 
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/IQueryOperatorVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/IQueryOperatorVisitor.java
index b5ce212..73628c1 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/IQueryOperatorVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/IQueryOperatorVisitor.java
@@ -19,8 +19,8 @@
 package org.apache.hyracks.algebricks.core.algebra.visitors;
 
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
@@ -43,12 +43,12 @@
     }
 
     @Override
-    public default R visitInsertDeleteOperator(InsertDeleteOperator op, T arg) {
+    public default R visitInsertDeleteUpsertOperator(InsertDeleteUpsertOperator op, T arg) {
         throw new UnsupportedOperationException();
     }
 
     @Override
-    public default R visitIndexInsertDeleteOperator(IndexInsertDeleteOperator op, T arg) {
+    public default R visitIndexInsertDeleteUpsertOperator(IndexInsertDeleteUpsertOperator op, T arg) {
         throw new UnsupportedOperationException();
     }
 
diff --git a/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java b/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java
index 8f9ab9f..8bf3dbb 100644
--- a/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java
+++ b/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java
@@ -84,7 +84,7 @@
             List<LogicalVariable> scanVariables, List<LogicalVariable> projectVariables, boolean projectPushed,
             List<LogicalVariable> minFilterVars, List<LogicalVariable> maxFilterVars, IOperatorSchema opSchema,
             IVariableTypeEnvironment typeEnv, JobGenContext context, JobSpecification jobSpec, Object implConfig)
-            throws AlgebricksException {
+                    throws AlgebricksException {
         PigletFileDataSource ds = (PigletFileDataSource) dataSource;
 
         FileSplit[] fileSplits = ds.getFileSplits();
@@ -141,15 +141,15 @@
     @Override
     public Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> getWriteFileRuntime(IDataSink sink,
             int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc)
-            throws AlgebricksException {
+                    throws AlgebricksException {
         PigletFileDataSink ds = (PigletFileDataSink) sink;
         FileSplit[] fileSplits = ds.getFileSplits();
         String[] locations = new String[fileSplits.length];
         for (int i = 0; i < fileSplits.length; ++i) {
             locations[i] = fileSplits[i].getNodeName();
         }
-        IPushRuntimeFactory prf = new SinkWriterRuntimeFactory(printColumns, printerFactories, fileSplits[0]
-                .getLocalFile().getFile(), PrinterBasedWriterFactory.INSTANCE, inputDesc);
+        IPushRuntimeFactory prf = new SinkWriterRuntimeFactory(printColumns, printerFactories,
+                fileSplits[0].getLocalFile().getFile(), PrinterBasedWriterFactory.INSTANCE, inputDesc);
         AlgebricksAbsolutePartitionConstraint constraint = new AlgebricksAbsolutePartitionConstraint(locations);
         return new Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint>(prf, constraint);
     }
@@ -178,15 +178,11 @@
 
     @Override
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexInsertRuntime(
-            IDataSourceIndex<String, String> dataSource,
-            IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas,
-            IVariableTypeEnvironment typeEnv,
-            List<LogicalVariable> primaryKeys,
-            List<LogicalVariable> secondaryKeys,
-            List<LogicalVariable> additionalNonKeyFields,
-            ILogicalExpression filterExpr,
-            RecordDescriptor recordDesc, JobGenContext context,
-            JobSpecification spec, boolean bulkload) throws AlgebricksException {
+            IDataSourceIndex<String, String> dataSource, IOperatorSchema propagatedSchema,
+            IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys,
+            List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalNonKeyFields,
+            ILogicalExpression filterExpr, RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec,
+            boolean bulkload) throws AlgebricksException {
         // TODO Auto-generated method stub
         return null;
     }
@@ -197,16 +193,16 @@
             IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys,
             List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalNonKeyFields,
             ILogicalExpression filterExpr, RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec)
-            throws AlgebricksException {
+                    throws AlgebricksException {
         // TODO Auto-generated method stub
         return null;
     }
 
     @Override
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getTokenizerRuntime(
-            IDataSourceIndex<String, String> dataSource, IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas,
-            IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
-            ILogicalExpression filterExpr, RecordDescriptor recordDesc,
+            IDataSourceIndex<String, String> dataSource, IOperatorSchema propagatedSchema,
+            IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys,
+            List<LogicalVariable> secondaryKeys, ILogicalExpression filterExpr, RecordDescriptor recordDesc,
             JobGenContext context, JobSpecification spec, boolean bulkload) throws AlgebricksException {
         // TODO Auto-generated method stub
         return null;
@@ -235,4 +231,24 @@
         return null;
     }
 
+    @Override
+    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getUpsertRuntime(IDataSource<String> dataSource,
+            IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv, List<LogicalVariable> keys,
+            LogicalVariable payLoadVar, List<LogicalVariable> additionalNonKeyFields, LogicalVariable prevPayload,
+            RecordDescriptor recordDesc, JobGenContext context, JobSpecification jobSpec) throws AlgebricksException {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexUpsertRuntime(
+            IDataSourceIndex<String, String> dataSourceIndex, IOperatorSchema propagatedSchema,
+            IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys,
+            List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalFilteringKeys,
+            ILogicalExpression filterExpr, List<LogicalVariable> prevSecondaryKeys,
+            LogicalVariable prevAdditionalFilteringKeys, RecordDescriptor inputDesc, JobGenContext context,
+            JobSpecification spec) throws AlgebricksException {
+        // TODO Auto-generated method stub
+        return null;
+    }
 }
diff --git a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
index 394524b..d85ffe9 100644
--- a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
@@ -23,7 +23,6 @@
 
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.commons.lang3.mutable.MutableObject;
-
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
@@ -45,9 +44,10 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator.Kind;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
@@ -62,8 +62,8 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.ExternalGroupByPOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.InMemoryStableSortPOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.IndexBulkloadPOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.physical.IndexInsertDeletePOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.physical.InsertDeletePOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.IndexInsertDeleteUpsertPOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.InsertDeleteUpsertPOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.MicroPreclusteredGroupByPOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.NestedTupleSourcePOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.PreSortedDistinctByPOperator;
@@ -95,7 +95,8 @@
     }
 
     @Override
-    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
         AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
         // if (context.checkIfInDontApplySet(this, op)) {
         // return false;
@@ -116,7 +117,7 @@
         }
     }
 
-    @SuppressWarnings("unchecked")
+    @SuppressWarnings({ "unchecked", "rawtypes" })
     private static void computeDefaultPhysicalOp(AbstractLogicalOperator op, boolean topLevelOp,
             IOptimizationContext context) throws AlgebricksException {
         PhysicalOptimizationConfig physicalOptimizationConfig = context.getPhysicalOptimizationConfig();
@@ -152,8 +153,8 @@
                     if (gby.getNestedPlans().size() == 1) {
                         ILogicalPlan p0 = gby.getNestedPlans().get(0);
                         if (p0.getRoots().size() == 1) {
-                            if (gby.getAnnotations().get(OperatorAnnotations.USE_HASH_GROUP_BY) == Boolean.TRUE
-                                    || gby.getAnnotations().get(OperatorAnnotations.USE_EXTERNAL_GROUP_BY) == Boolean.TRUE) {
+                            if (gby.getAnnotations().get(OperatorAnnotations.USE_HASH_GROUP_BY) == Boolean.TRUE || gby
+                                    .getAnnotations().get(OperatorAnnotations.USE_EXTERNAL_GROUP_BY) == Boolean.TRUE) {
                                 if (!topLevelOp) {
                                     throw new NotImplementedException(
                                             "External hash group-by for nested grouping is not implemented.");
@@ -213,8 +214,8 @@
                         }
                     }
                     if (topLevelOp) {
-                        op.setPhysicalOperator(new StableSortPOperator(physicalOptimizationConfig
-                                .getMaxFramesExternalSort()));
+                        op.setPhysicalOperator(
+                                new StableSortPOperator(physicalOptimizationConfig.getMaxFramesExternalSort()));
                     } else {
                         op.setPhysicalOperator(new InMemoryStableSortPOperator());
                     }
@@ -282,12 +283,13 @@
                         additionalFilteringKeys = new ArrayList<LogicalVariable>();
                         getKeys(opLoad.getAdditionalFilteringExpressions(), additionalFilteringKeys);
                     }
-                    op.setPhysicalOperator(new WriteResultPOperator(opLoad.getDataSource(), payload, keys,
-                            additionalFilteringKeys));
+                    op.setPhysicalOperator(
+                            new WriteResultPOperator(opLoad.getDataSource(), payload, keys, additionalFilteringKeys));
                     break;
                 }
-                case INSERT_DELETE: {
-                    InsertDeleteOperator opLoad = (InsertDeleteOperator) op;
+                case INSERT_DELETE_UPSERT: {
+                    // Primary index
+                    InsertDeleteUpsertOperator opLoad = (InsertDeleteUpsertOperator) op;
                     LogicalVariable payload;
                     List<LogicalVariable> keys = new ArrayList<LogicalVariable>();
                     List<LogicalVariable> additionalFilteringKeys = null;
@@ -297,16 +299,17 @@
                         getKeys(opLoad.getAdditionalFilteringExpressions(), additionalFilteringKeys);
                     }
                     if (opLoad.isBulkload()) {
-                        op.setPhysicalOperator(new BulkloadPOperator(payload, keys, additionalFilteringKeys, opLoad
-                                .getDataSource()));
+                        op.setPhysicalOperator(
+                                new BulkloadPOperator(payload, keys, additionalFilteringKeys, opLoad.getDataSource()));
                     } else {
-                        op.setPhysicalOperator(new InsertDeletePOperator(payload, keys, additionalFilteringKeys, opLoad
-                                .getDataSource()));
+                        op.setPhysicalOperator(new InsertDeleteUpsertPOperator(payload, keys, additionalFilteringKeys,
+                                opLoad.getDataSource(), opLoad.getOperation(), opLoad.getPrevRecordVar()));
                     }
                     break;
                 }
-                case INDEX_INSERT_DELETE: {
-                    IndexInsertDeleteOperator opInsDel = (IndexInsertDeleteOperator) op;
+                case INDEX_INSERT_DELETE_UPSERT: {
+                    // Secondary index
+                    IndexInsertDeleteUpsertOperator opInsDel = (IndexInsertDeleteUpsertOperator) op;
                     List<LogicalVariable> primaryKeys = new ArrayList<LogicalVariable>();
                     List<LogicalVariable> secondaryKeys = new ArrayList<LogicalVariable>();
                     List<LogicalVariable> additionalFilteringKeys = null;
@@ -317,13 +320,24 @@
                         getKeys(opInsDel.getAdditionalFilteringExpressions(), additionalFilteringKeys);
                     }
                     if (opInsDel.isBulkload()) {
-                        op.setPhysicalOperator(new IndexBulkloadPOperator(primaryKeys, secondaryKeys,
-                                additionalFilteringKeys, opInsDel.getFilterExpression(), opInsDel.getDataSourceIndex()));
+                        op.setPhysicalOperator(
+                                new IndexBulkloadPOperator(primaryKeys, secondaryKeys, additionalFilteringKeys,
+                                        opInsDel.getFilterExpression(), opInsDel.getDataSourceIndex()));
                     } else {
-                        op.setPhysicalOperator(new IndexInsertDeletePOperator(primaryKeys, secondaryKeys,
-                                additionalFilteringKeys, opInsDel.getFilterExpression(), opInsDel.getDataSourceIndex()));
+                        List<LogicalVariable> prevSecondaryKeys = null;
+                        LogicalVariable prevAdditionalFilteringKey = null;
+                        if (opInsDel.getOperation() == Kind.UPSERT) {
+                            prevSecondaryKeys = new ArrayList<LogicalVariable>();
+                            getKeys(opInsDel.getPrevSecondaryKeyExprs(), prevSecondaryKeys);
+                            if (opInsDel.getPrevAdditionalFilteringExpression() != null) {
+                                prevAdditionalFilteringKey = ((VariableReferenceExpression) (opInsDel
+                                        .getPrevAdditionalFilteringExpression()).getValue()).getVariableReference();
+                            }
+                        }
+                        op.setPhysicalOperator(new IndexInsertDeleteUpsertPOperator(primaryKeys, secondaryKeys,
+                                additionalFilteringKeys, opInsDel.getFilterExpression(), opInsDel.getDataSourceIndex(),
+                                prevSecondaryKeys, prevAdditionalFilteringKey));
                     }
-
                     break;
 
                 }
@@ -335,8 +349,8 @@
                     getKeys(opTokenize.getSecondaryKeyExpressions(), secondaryKeys);
                     // Tokenize Operator only operates with a bulk load on a data set with an index
                     if (opTokenize.isBulkload()) {
-                        op.setPhysicalOperator(new TokenizePOperator(primaryKeys, secondaryKeys, opTokenize
-                                .getDataSourceIndex()));
+                        op.setPhysicalOperator(
+                                new TokenizePOperator(primaryKeys, secondaryKeys, opTokenize.getDataSourceIndex()));
                     }
                     break;
                 }
@@ -415,8 +429,8 @@
         int n = aggOp.getExpressions().size();
         List<Mutable<ILogicalExpression>> mergeExpressionRefs = new ArrayList<Mutable<ILogicalExpression>>();
         for (int i = 0; i < n; i++) {
-            ILogicalExpression mergeExpr = mergeAggregationExpressionFactory.createMergeAggregation(
-                    originalAggVars.get(i), aggFuncRefs.get(i).getValue(), context);
+            ILogicalExpression mergeExpr = mergeAggregationExpressionFactory
+                    .createMergeAggregation(originalAggVars.get(i), aggFuncRefs.get(i).getValue(), context);
             if (mergeExpr == null) {
                 return false;
             }
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAccessor.java b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAccessor.java
index a89e895..70d4f80 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAccessor.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAccessor.java
@@ -20,6 +20,7 @@
 
 import java.io.DataInputStream;
 import java.nio.ByteBuffer;
+import java.util.Arrays;
 
 import org.apache.hyracks.api.comm.FrameConstants;
 import org.apache.hyracks.api.comm.FrameHelper;
@@ -72,9 +73,8 @@
 
     @Override
     public int getTupleStartOffset(int tupleIndex) {
-        int offset = tupleIndex == 0 ?
-                FrameConstants.TUPLE_START_OFFSET :
-                IntSerDeUtils.getInt(buffer.array(), tupleCountOffset - 4 * tupleIndex);
+        int offset = tupleIndex == 0 ? FrameConstants.TUPLE_START_OFFSET
+                : IntSerDeUtils.getInt(buffer.array(), tupleCountOffset - 4 * tupleIndex);
         return start + offset;
     }
 
@@ -85,16 +85,15 @@
 
     @Override
     public int getTupleEndOffset(int tupleIndex) {
-        return start + IntSerDeUtils
-                .getInt(buffer.array(), tupleCountOffset - FrameConstants.SIZE_LEN * (tupleIndex + 1));
+        return start
+                + IntSerDeUtils.getInt(buffer.array(), tupleCountOffset - FrameConstants.SIZE_LEN * (tupleIndex + 1));
     }
 
     @Override
     public int getFieldStartOffset(int tupleIndex, int fIdx) {
-        return fIdx == 0 ?
-                0 :
-                IntSerDeUtils
-                        .getInt(buffer.array(), getTupleStartOffset(tupleIndex) + (fIdx - 1) * FrameConstants.SIZE_LEN);
+        return fIdx == 0 ? 0
+                : IntSerDeUtils.getInt(buffer.array(),
+                        getTupleStartOffset(tupleIndex) + (fIdx - 1) * FrameConstants.SIZE_LEN);
     }
 
     @Override
@@ -161,4 +160,44 @@
     public int getFieldCount() {
         return recordDescriptor.getFieldCount();
     }
+
+    /*
+     * The two methods below can be used for debugging.
+     * They are safe as they don't print records. Printing records
+     * using IserializerDeserializer can print incorrect results or throw exceptions.
+     * A better way yet would be to use record pointable.
+     */
+    public void prettyPrint(String prefix, int[] recordFields) {
+        ByteBufferInputStream bbis = new ByteBufferInputStream();
+        DataInputStream dis = new DataInputStream(bbis);
+        int tc = getTupleCount();
+        StringBuilder sb = new StringBuilder();
+        sb.append(prefix).append("TC: " + tc).append("\n");
+        for (int i = 0; i < tc; ++i) {
+            prettyPrint(i, bbis, dis, sb, recordFields);
+        }
+        System.err.println(sb.toString());
+    }
+
+    protected void prettyPrint(int tid, ByteBufferInputStream bbis, DataInputStream dis, StringBuilder sb,
+            int[] recordFields) {
+        Arrays.sort(recordFields);
+        sb.append(" tid" + tid + ":(" + getTupleStartOffset(tid) + ", " + getTupleEndOffset(tid) + ")[");
+        for (int j = 0; j < getFieldCount(); ++j) {
+            sb.append("f" + j + ":(" + getFieldStartOffset(tid, j) + ", " + getFieldEndOffset(tid, j) + ") ");
+            sb.append("{");
+            bbis.setByteBuffer(buffer, getTupleStartOffset(tid) + getFieldSlotsLength() + getFieldStartOffset(tid, j));
+            try {
+                if (Arrays.binarySearch(recordFields, j) >= 0) {
+                    sb.append("a record field: only print using pointable");
+                } else {
+                    sb.append(recordDescriptor.getFields()[j].deserialize(dis));
+                }
+            } catch (HyracksDataException e) {
+                e.printStackTrace();
+            }
+            sb.append("}");
+        }
+        sb.append("\n");
+    }
 }
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/accessors/FrameTupleReference.java b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/accessors/FrameTupleReference.java
index abf9c37..15ebe52 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/accessors/FrameTupleReference.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/accessors/FrameTupleReference.java
@@ -21,9 +21,9 @@
 
 import org.apache.hyracks.api.comm.IFrameTupleAccessor;
 
-public final class FrameTupleReference implements IFrameTupleReference {
-    private IFrameTupleAccessor fta;
-    private int tIndex;
+public class FrameTupleReference implements IFrameTupleReference {
+    protected IFrameTupleAccessor fta;
+    protected int tIndex;
 
     public void reset(IFrameTupleAccessor fta, int tIndex) {
         this.fta = fta;
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/RangePredicate.java b/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/RangePredicate.java
index 0399ede..f1a411b 100644
--- a/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/RangePredicate.java
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/RangePredicate.java
@@ -70,10 +70,12 @@
         this.highKeyCmp = highKeyCmp;
     }
 
+    @Override
     public MultiComparator getLowKeyComparator() {
         return lowKeyCmp;
     }
 
+    @Override
     public MultiComparator getHighKeyComparator() {
         return highKeyCmp;
     }
@@ -86,6 +88,7 @@
         this.highKeyCmp = highKeyCmp;
     }
 
+    @Override
     public ITupleReference getLowKey() {
         return lowKey;
     }
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IModificationOperationCallback.java b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IModificationOperationCallback.java
index 5aac5a6..7ec4a30 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IModificationOperationCallback.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IModificationOperationCallback.java
@@ -22,33 +22,42 @@
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 
 /**
- * This operation callback allows for arbitrary actions to be taken while traversing 
- * an index structure. The {@link IModificationOperationCallback} will be called on 
+ * This operation callback allows for arbitrary actions to be taken while traversing
+ * an index structure. The {@link IModificationOperationCallback} will be called on
  * all modifying operations (e.g. insert, update, delete...) for all indexes.
- * 
  * @author zheilbron
  */
 public interface IModificationOperationCallback {
+    public enum Operation {
+        INSERT,
+        DELETE
+    }
 
     /**
-     * This method is called on a tuple that is about to traverse an index's structure 
-     * (i.e. before any pages are pinned or latched). 
-     *
+     * This method is called on a tuple that is about to traverse an index's structure
+     * (i.e. before any pages are pinned or latched).
      * The format of the tuple is the format that would be stored in the index itself.
-     * 
-     * @param tuple the tuple that is about to be operated on
+     * @param tuple
+     *            the tuple that is about to be operated on
      */
     public void before(ITupleReference tuple) throws HyracksDataException;
 
     /**
-     * This method is called on a tuple when a tuple with a matching key is found for the 
-     * current operation. It is possible that tuple is null, in which case no tuple with a 
+     * This method is called on a tuple when a tuple with a matching key is found for the
+     * current operation. It is possible that tuple is null, in which case no tuple with a
      * matching key was found.
-     * 
-     * When found is called, the leaf page where the tuple resides will be pinned and latched, 
+     * When found is called, the leaf page where the tuple resides will be pinned and latched,
      * so blocking operations should be avoided.
-     * 
-     * @param tuple a tuple with a matching key, otherwise null if none exists
+     * @param tuple
+     *            a tuple with a matching key, otherwise null if none exists
      */
     public void found(ITupleReference before, ITupleReference after) throws HyracksDataException;
+
+    /**
+     * This call specifies the next operation to be performed. It is used to allow
+     * a single operator to perform different operations per tuple
+     * @param op
+     * @throws HyracksDataException
+     */
+    public void setOp(Operation op) throws HyracksDataException;
 }
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ISearchOperationCallback.java b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ISearchOperationCallback.java
index 08ad1d0..9b1cd47 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ISearchOperationCallback.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ISearchOperationCallback.java
@@ -22,50 +22,56 @@
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 
 /**
- * This operation callback allows for arbitrary actions to be taken while traversing 
- * an index structure. The {@link ISearchOperationCallback} will be called on 
+ * This operation callback allows for arbitrary actions to be taken while traversing
+ * an index structure. The {@link ISearchOperationCallback} will be called on
  * all search operations for ordered indexes only.
- * 
  * @author zheilbron
  */
 public interface ISearchOperationCallback {
 
     /**
-     * During an index search operation, this method will be called on tuples as they are 
-     * passed by with a search cursor. This call will be invoked while a leaf page is latched 
-     * and pinned. If the call returns false, then the page will be unlatched and unpinned and 
-     * {@link #reconcile(ITupleReference)} will be called with the tuple that was not proceeded 
+     * After the harness enters the operation components and before an index search operation starts,
+     * this method will be called on the search key.
+     * @param tuple
+     *            the tuple containing the search key (expected to be a point search key)
+     */
+    public void before(ITupleReference tuple) throws HyracksDataException;
+
+    /**
+     * During an index search operation, this method will be called on tuples as they are
+     * passed by with a search cursor. This call will be invoked while a leaf page is latched
+     * and pinned. If the call returns false, then the page will be unlatched and unpinned and
+     * {@link #reconcile(ITupleReference)} will be called with the tuple that was not proceeded
      * on.
-     * 
-     * @param tuple the tuple that is being passed over by the search cursor
+     * @param tuple
+     *            the tuple that is being passed over by the search cursor
      * @return true to proceed otherwise false to unlatch and unpin, leading to reconciliation
      */
     public boolean proceed(ITupleReference tuple) throws HyracksDataException;
 
     /**
-     * This method is only called on a tuple that was not 'proceeded' on 
-     * (see {@link #proceed(ITupleReference)}). This method allows an opportunity to reconcile 
-     * by performing any necessary actions before resuming the search (e.g. a try-lock may have 
+     * This method is only called on a tuple that was not 'proceeded' on
+     * (see {@link #proceed(ITupleReference)}). This method allows an opportunity to reconcile
+     * by performing any necessary actions before resuming the search (e.g. a try-lock may have
      * failed in the proceed call, and now in reconcile we should take a full (blocking) lock).
-     * 
-     * @param tuple the tuple that failed to proceed
+     * @param tuple
+     *            the tuple that failed to proceed
      */
     public void reconcile(ITupleReference tuple) throws HyracksDataException;
 
     /**
-     * This method is only called on a tuple that was reconciled on, but not found after 
-     * retraversing. This method allows an opportunity to cancel some action that was taken in 
+     * This method is only called on a tuple that was reconciled on, but not found after
+     * retraversing. This method allows an opportunity to cancel some action that was taken in
      * {@link #reconcile(ITupleReference))}.
-     * 
-     * @param tuple the tuple that was previously reconciled
+     * @param tuple
+     *            the tuple that was previously reconciled
      */
     public void cancel(ITupleReference tuple) throws HyracksDataException;
-    
+
     /**
      * This method is only called on a tuple that was reconciled on, and found after
-     * retraversing. This method allows an opportunity to do some subsequent action that was 
+     * retraversing. This method allows an opportunity to do some subsequent action that was
      * taken in {@link #reconcile(ITupleReference))}.
-     * 
      * @param tuple
      *            the tuple that was previously reconciled
      */
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ISearchPredicate.java b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ISearchPredicate.java
index 94a6a27..1c0d143 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ISearchPredicate.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ISearchPredicate.java
@@ -21,10 +21,20 @@
 
 import java.io.Serializable;
 
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.storage.am.common.ophelpers.MultiComparator;
 
 public interface ISearchPredicate extends Serializable {
-	public MultiComparator getLowKeyComparator();
+    public MultiComparator getLowKeyComparator();
 
-	public MultiComparator getHighKeyComparator();
+    public MultiComparator getHighKeyComparator();
+
+    /**
+     * Get the search key to be used with point search operation on primary index.
+     * This method will only be called with point search predicates that only happen in primary index.
+     * @return
+     * @throws HyracksDataException
+     */
+    public ITupleReference getLowKey();
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/NoOpOperationCallback.java b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/NoOpOperationCallback.java
index b632ba9..e8ab8dc 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/NoOpOperationCallback.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/NoOpOperationCallback.java
@@ -27,7 +27,7 @@
 /**
  * Dummy operation callback that simply does nothing.
  */
-public enum NoOpOperationCallback implements IModificationOperationCallback, ISearchOperationCallback {
+public enum NoOpOperationCallback implements IModificationOperationCallback,ISearchOperationCallback {
     INSTANCE;
 
     @Override
@@ -42,12 +42,12 @@
 
     @Override
     public void before(ITupleReference tuple) {
-        // Do nothing.        
+        // Do nothing.
     }
 
     @Override
     public void found(ITupleReference before, ITupleReference after) {
-        // Do nothing.        
+        // Do nothing.
     }
 
     @Override
@@ -59,4 +59,9 @@
     public void complete(ITupleReference tuple) throws HyracksDataException {
         // Do nothing.
     }
+
+    @Override
+    public void setOp(Operation op) throws HyracksDataException {
+        // Do nothing.
+    }
 }
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/tuples/PermutingFrameTupleReference.java b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/tuples/PermutingFrameTupleReference.java
index 5f27835..b9792c2 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/tuples/PermutingFrameTupleReference.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/tuples/PermutingFrameTupleReference.java
@@ -19,51 +19,40 @@
 
 package org.apache.hyracks.storage.am.common.tuples;
 
-import org.apache.hyracks.api.comm.IFrameTupleAccessor;
-import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
 
-public class PermutingFrameTupleReference implements IFrameTupleReference {
-	private IFrameTupleAccessor fta;
-	private int tIndex;
-	private int[] fieldPermutation;
+public class PermutingFrameTupleReference extends FrameTupleReference {
+    private int[] fieldPermutation;
 
-	public void setFieldPermutation(int[] fieldPermutation) {
-		this.fieldPermutation = fieldPermutation;
-	}
+    public PermutingFrameTupleReference(int[] fieldPermutation) {
+        this.fieldPermutation = fieldPermutation;
+    }
 
-	public void reset(IFrameTupleAccessor fta, int tIndex) {
-		this.fta = fta;
-		this.tIndex = tIndex;
-	}
+    public PermutingFrameTupleReference() {
+    }
 
-	@Override
-	public IFrameTupleAccessor getFrameTupleAccessor() {
-		return fta;
-	}
+    public void setFieldPermutation(int[] fieldPermutation) {
+        this.fieldPermutation = fieldPermutation;
+    }
 
-	@Override
-	public int getTupleIndex() {
-		return tIndex;
-	}
+    @Override
+    public int getFieldCount() {
+        return fieldPermutation.length;
+    }
 
-	@Override
-	public int getFieldCount() {
-		return fieldPermutation.length;
-	}
+    @Override
+    public byte[] getFieldData(int fIdx) {
+        return fta.getBuffer().array();
+    }
 
-	@Override
-	public byte[] getFieldData(int fIdx) {
-		return fta.getBuffer().array();
-	}
+    @Override
+    public int getFieldStart(int fIdx) {
+        return fta.getTupleStartOffset(tIndex) + fta.getFieldSlotsLength()
+                + fta.getFieldStartOffset(tIndex, fieldPermutation[fIdx]);
+    }
 
-	@Override
-	public int getFieldStart(int fIdx) {
-		return fta.getTupleStartOffset(tIndex) + fta.getFieldSlotsLength()
-				+ fta.getFieldStartOffset(tIndex, fieldPermutation[fIdx]);
-	}
-
-	@Override
-	public int getFieldLength(int fIdx) {
-		return fta.getFieldLength(tIndex, fieldPermutation[fIdx]);
-	}
+    @Override
+    public int getFieldLength(int fIdx) {
+        return fta.getFieldLength(tIndex, fieldPermutation[fIdx]);
+    }
 }
diff --git a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
index ae3aa52..13c6949 100644
--- a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
+++ b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
@@ -271,13 +271,13 @@
         operationalComponents.clear();
         switch (ctx.getOperation()) {
             case UPDATE:
-            case UPSERT:
             case PHYSICALDELETE:
             case FLUSH:
             case DELETE:
                 operationalComponents.add(memoryComponents.get(cmc));
                 break;
             case INSERT:
+            case UPSERT:
                 addOperationalMutableComponents(operationalComponents);
                 operationalComponents.addAll(immutableComponents);
                 break;
diff --git a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeOpContext.java b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeOpContext.java
index 08891c2..fa25524 100644
--- a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeOpContext.java
+++ b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeOpContext.java
@@ -54,8 +54,8 @@
     public IndexOperation op;
     public final MultiComparator cmp;
     public final MultiComparator bloomFilterCmp;
-    public final IModificationOperationCallback modificationCallback;
-    public final ISearchOperationCallback searchCallback;
+    public IModificationOperationCallback modificationCallback;
+    public ISearchOperationCallback searchCallback;
     private final List<ILSMComponent> componentHolder;
     private final List<ILSMComponent> componentsToBeMerged;
     private final List<ILSMComponent> componentsToBeReplicated;
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
index 63e651a..4c4ed28 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
@@ -28,8 +28,8 @@
 
 public interface ILSMHarness {
 
-    public void forceModify(ILSMIndexOperationContext ctx, ITupleReference tuple) throws HyracksDataException,
-            IndexException;
+    public void forceModify(ILSMIndexOperationContext ctx, ITupleReference tuple)
+            throws HyracksDataException, IndexException;
 
     public boolean modify(ILSMIndexOperationContext ctx, boolean tryOperation, ITupleReference tuple)
             throws HyracksDataException, IndexException;
@@ -45,14 +45,14 @@
     public void scheduleFullMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
             throws HyracksDataException, IndexException;
 
-    public void merge(ILSMIndexOperationContext ctx, ILSMIOOperation operation) throws HyracksDataException,
-            IndexException;
+    public void merge(ILSMIndexOperationContext ctx, ILSMIOOperation operation)
+            throws HyracksDataException, IndexException;
 
     public void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
             throws HyracksDataException;
 
-    public void flush(ILSMIndexOperationContext ctx, ILSMIOOperation operation) throws HyracksDataException,
-            IndexException;
+    public void flush(ILSMIndexOperationContext ctx, ILSMIOOperation operation)
+            throws HyracksDataException, IndexException;
 
     public void addBulkLoadedComponent(ILSMComponent index) throws HyracksDataException, IndexException;
 
@@ -62,5 +62,4 @@
             LSMOperationType opType) throws HyracksDataException;
 
     public void endReplication(ILSMIndexOperationContext ctx) throws HyracksDataException;
-
 }
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java
index 08bcfee..99f981d 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java
@@ -39,6 +39,6 @@
     public void setSearchPredicate(ISearchPredicate searchPredicate);
 
     public ISearchPredicate getSearchPredicate();
-    
+
     public List<ILSMComponent> getComponentsToBeReplicated();
 }
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMTreeIndexInsertUpdateDeleteOperatorDescriptor.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMTreeIndexInsertUpdateDeleteOperatorDescriptor.java
index b172864..d99c9e8 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMTreeIndexInsertUpdateDeleteOperatorDescriptor.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMTreeIndexInsertUpdateDeleteOperatorDescriptor.java
@@ -22,17 +22,19 @@
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.INullWriterFactory;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import org.apache.hyracks.api.dataflow.value.ITypeTraits;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
 import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
 import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
 import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
+import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
 import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
 import org.apache.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
 import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
-import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
 import org.apache.hyracks.storage.common.IStorageManagerInterface;
 import org.apache.hyracks.storage.common.file.NoOpLocalResourceFactoryProvider;
@@ -49,18 +51,20 @@
             IFileSplitProvider fileSplitProvider, ITypeTraits[] typeTraits,
             IBinaryComparatorFactory[] comparatorFactories, int[] bloomFilterKeyFields, int[] fieldPermutation,
             IndexOperation op, IIndexDataflowHelperFactory dataflowHelperFactory,
-            ITupleFilterFactory tupleFilterFactory, IModificationOperationCallbackFactory modificationOpCallbackProvider) {
+            ITupleFilterFactory tupleFilterFactory, INullWriterFactory nullWriterFactory,
+            IModificationOperationCallbackFactory modificationOpCallbackProvider,
+            ISearchOperationCallbackFactory searchOpCallbackProvider) {
         super(spec, 1, 1, recDesc, storageManager, lifecycleManagerProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, bloomFilterKeyFields, dataflowHelperFactory, tupleFilterFactory, false,
-                false, null,
-                NoOpLocalResourceFactoryProvider.INSTANCE, NoOpOperationCallbackFactory.INSTANCE, modificationOpCallbackProvider);
+                comparatorFactories, bloomFilterKeyFields, dataflowHelperFactory, tupleFilterFactory, false, false,
+                nullWriterFactory, NoOpLocalResourceFactoryProvider.INSTANCE, searchOpCallbackProvider,
+                modificationOpCallbackProvider);
         this.fieldPermutation = fieldPermutation;
         this.op = op;
     }
 
     @Override
     public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
-            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
         return new LSMIndexInsertUpdateDeleteOperatorNodePushable(this, ctx, partition, fieldPermutation,
                 recordDescProvider, op);
     }
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index 54500a6..21b0d8a 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -81,7 +81,8 @@
                         if (!((AbstractMemoryLSMComponent) flushingComponent).isModified()) {
                             //The mutable component has not been modified by any writer. There is nothing to flush.
                             //since the component is empty, set its state back to READABLE_WRITABLE
-                            if (((AbstractLSMIndex) lsmIndex).getCurrentMutableComponentState() == ComponentState.READABLE_UNWRITABLE) {
+                            if (((AbstractLSMIndex) lsmIndex)
+                                    .getCurrentMutableComponentState() == ComponentState.READABLE_UNWRITABLE) {
                                 ((AbstractLSMIndex) lsmIndex)
                                         .setCurrentMutableComponentState(ComponentState.READABLE_WRITABLE);
                             }
@@ -196,8 +197,8 @@
                         if (c.getType() == LSMComponentType.MEMORY) {
                             switch (c.getState()) {
                                 case READABLE_UNWRITABLE:
-                                    if (isMutableComponent
-                                            && (opType == LSMOperationType.MODIFICATION || opType == LSMOperationType.FORCE_MODIFICATION)) {
+                                    if (isMutableComponent && (opType == LSMOperationType.MODIFICATION
+                                            || opType == LSMOperationType.FORCE_MODIFICATION)) {
                                         lsmIndex.changeFlushStatusForCurrentMutableCompoent(true);
                                     }
                                     break;
@@ -256,8 +257,8 @@
                     e.printStackTrace();
                     throw e;
                 } finally {
-                    if (failedOperation
-                            && (opType == LSMOperationType.MODIFICATION || opType == LSMOperationType.FORCE_MODIFICATION)) {
+                    if (failedOperation && (opType == LSMOperationType.MODIFICATION
+                            || opType == LSMOperationType.FORCE_MODIFICATION)) {
                         //When the operation failed, completeOperation() method must be called
                         //in order to decrement active operation count which was incremented in beforeOperation() method.
                         opTracker.completeOperation(lsmIndex, opType, ctx.getSearchOperationCallback(),
@@ -314,8 +315,8 @@
     }
 
     @Override
-    public void forceModify(ILSMIndexOperationContext ctx, ITupleReference tuple) throws HyracksDataException,
-            IndexException {
+    public void forceModify(ILSMIndexOperationContext ctx, ITupleReference tuple)
+            throws HyracksDataException, IndexException {
         LSMOperationType opType = LSMOperationType.FORCE_MODIFICATION;
         modify(ctx, false, tuple, opType);
     }
@@ -357,6 +358,7 @@
         ctx.setSearchPredicate(pred);
         getAndEnterComponents(ctx, opType, false);
         try {
+            ctx.getSearchOperationCallback().before(pred.getLowKey());
             lsmIndex.search(ctx, cursor, pred);
         } catch (HyracksDataException | IndexException e) {
             exitComponents(ctx, opType, null, true);
@@ -386,8 +388,8 @@
     }
 
     @Override
-    public void flush(ILSMIndexOperationContext ctx, ILSMIOOperation operation) throws HyracksDataException,
-            IndexException {
+    public void flush(ILSMIndexOperationContext ctx, ILSMIOOperation operation)
+            throws HyracksDataException, IndexException {
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Started a flush operation for index: " + lsmIndex + " ...");
         }
@@ -434,8 +436,8 @@
     }
 
     @Override
-    public void merge(ILSMIndexOperationContext ctx, ILSMIOOperation operation) throws HyracksDataException,
-            IndexException {
+    public void merge(ILSMIndexOperationContext ctx, ILSMIOOperation operation)
+            throws HyracksDataException, IndexException {
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Started a merge operation for index: " + lsmIndex + " ...");
         }
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
index 284fc1b..009bd36 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
@@ -109,13 +109,13 @@
             OnDiskInvertedIndexFactory diskInvIndexFactory, BTreeFactory deletedKeysBTreeFactory,
             BloomFilterFactory bloomFilterFactory, ILSMComponentFilterFactory filterFactory,
             ILSMComponentFilterFrameFactory filterFrameFactory, LSMComponentFilterManager filterManager,
-            double bloomFilterFalsePositiveRate, ILSMIndexFileManager fileManager,
-            IFileMapProvider diskFileMapProvider, ITypeTraits[] invListTypeTraits,
-            IBinaryComparatorFactory[] invListCmpFactories, ITypeTraits[] tokenTypeTraits,
-            IBinaryComparatorFactory[] tokenCmpFactories, IBinaryTokenizerFactory tokenizerFactory,
-            ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker, ILSMIOOperationScheduler ioScheduler,
-            ILSMIOOperationCallback ioOpCallback, int[] invertedIndexFields, int[] filterFields,
-            int[] filterFieldsForNonBulkLoadOps, int[] invertedIndexFieldsForNonBulkLoadOps, boolean durable) throws IndexException {
+            double bloomFilterFalsePositiveRate, ILSMIndexFileManager fileManager, IFileMapProvider diskFileMapProvider,
+            ITypeTraits[] invListTypeTraits, IBinaryComparatorFactory[] invListCmpFactories,
+            ITypeTraits[] tokenTypeTraits, IBinaryComparatorFactory[] tokenCmpFactories,
+            IBinaryTokenizerFactory tokenizerFactory, ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker,
+            ILSMIOOperationScheduler ioScheduler, ILSMIOOperationCallback ioOpCallback, int[] invertedIndexFields,
+            int[] filterFields, int[] filterFieldsForNonBulkLoadOps, int[] invertedIndexFieldsForNonBulkLoadOps,
+            boolean durable) throws IndexException {
         super(virtualBufferCaches, diskInvIndexFactory.getBufferCache(), fileManager, diskFileMapProvider,
                 bloomFilterFalsePositiveRate, mergePolicy, opTracker, ioScheduler, ioOpCallback, filterFrameFactory,
                 filterManager, filterFields, durable);
@@ -136,13 +136,14 @@
         for (IVirtualBufferCache virtualBufferCache : virtualBufferCaches) {
             InMemoryInvertedIndex memInvIndex = createInMemoryInvertedIndex(virtualBufferCache,
                     new VirtualMetaDataPageManager(virtualBufferCache.getNumPages()), i);
-            BTree deleteKeysBTree = BTreeUtils.createBTree(virtualBufferCache, new VirtualMetaDataPageManager(
-                    virtualBufferCache.getNumPages()), virtualBufferCache.getFileMapProvider(),
-                    invListTypeTraits, invListCmpFactories, BTreeLeafFrameType.REGULAR_NSM, new FileReference(new File(
-                            fileManager.getBaseDir() + "_virtual_del_" + i)));
+            BTree deleteKeysBTree = BTreeUtils.createBTree(virtualBufferCache,
+                    new VirtualMetaDataPageManager(virtualBufferCache.getNumPages()),
+                    virtualBufferCache.getFileMapProvider(), invListTypeTraits, invListCmpFactories,
+                    BTreeLeafFrameType.REGULAR_NSM,
+                    new FileReference(new File(fileManager.getBaseDir() + "_virtual_del_" + i)));
             LSMInvertedIndexMemoryComponent mutableComponent = new LSMInvertedIndexMemoryComponent(memInvIndex,
-                    deleteKeysBTree, virtualBufferCache, i == 0 ? true : false, filterFactory == null ? null
-                            : filterFactory.createLSMComponentFilter());
+                    deleteKeysBTree, virtualBufferCache, i == 0 ? true : false,
+                    filterFactory == null ? null : filterFactory.createLSMComponentFilter());
             memoryComponents.add(mutableComponent);
             ++i;
         }
@@ -271,6 +272,7 @@
             case FLUSH:
             case DELETE:
             case INSERT:
+            case UPSERT:
                 operationalComponents.add(memoryComponents.get(cmc));
                 break;
             case SEARCH:
@@ -336,8 +338,8 @@
             indexTuple = tuple;
         }
 
-        ctx.modificationCallback.before(indexTuple);
-        ctx.modificationCallback.found(null, indexTuple);
+        ctx.getModificationCallback().before(indexTuple);
+        ctx.getModificationCallback().found(null, indexTuple);
         switch (ctx.getOperation()) {
             case INSERT: {
                 // Insert into the in-memory inverted index.
@@ -362,8 +364,8 @@
         }
         if (ctx.filterTuple != null) {
             ctx.filterTuple.reset(tuple);
-            memoryComponents.get(currentMutableComponentId.get()).getLSMComponentFilter()
-                    .update(ctx.filterTuple, ctx.filterCmp);
+            memoryComponents.get(currentMutableComponentId.get()).getLSMComponentFilter().update(ctx.filterTuple,
+                    ctx.filterCmp);
         }
     }
 
@@ -383,9 +385,8 @@
                 IIndexAccessor invIndexAccessor = ((LSMInvertedIndexMemoryComponent) component).getInvIndex()
                         .createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
                 indexAccessors.add(invIndexAccessor);
-                IIndexAccessor deletedKeysAccessor = ((LSMInvertedIndexMemoryComponent) component)
-                        .getDeletedKeysBTree().createAccessor(NoOpOperationCallback.INSTANCE,
-                                NoOpOperationCallback.INSTANCE);
+                IIndexAccessor deletedKeysAccessor = ((LSMInvertedIndexMemoryComponent) component).getDeletedKeysBTree()
+                        .createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
                 deletedKeysBTreeAccessors.add(deletedKeysAccessor);
             } else {
                 IIndexAccessor invIndexAccessor = ((LSMInvertedIndexDiskComponent) component).getInvIndex()
@@ -415,8 +416,8 @@
             initState = new LSMInvertedIndexSearchCursorInitialState(keyCmp, keysOnlyTuple, indexAccessors,
                     deletedKeysBTreeAccessors,
                     ((LSMInvertedIndexMemoryComponent) memoryComponents.get(currentMutableComponentId.get()))
-                            .getDeletedKeysBTree().getLeafFrameFactory(), ictx, includeMutableComponent, lsmHarness,
-                    operationalComponents);
+                            .getDeletedKeysBTree().getLeafFrameFactory(),
+                    ictx, includeMutableComponent, lsmHarness, operationalComponents);
         } else {
             LSMInvertedIndexMemoryComponent mutableComponent = (LSMInvertedIndexMemoryComponent) memoryComponents
                     .get(currentMutableComponentId.get());
@@ -424,8 +425,9 @@
             MultiComparator tokensAndKeysCmp = MultiComparator.create(memInvIndex.getBTree().getComparatorFactories());
             initState = new LSMInvertedIndexRangeSearchCursorInitialState(tokensAndKeysCmp, keyCmp, keysOnlyTuple,
                     ((LSMInvertedIndexMemoryComponent) memoryComponents.get(currentMutableComponentId.get()))
-                            .getDeletedKeysBTree().getLeafFrameFactory(), includeMutableComponent, lsmHarness,
-                    indexAccessors, deletedKeysBTreeAccessors, pred, operationalComponents);
+                            .getDeletedKeysBTree().getLeafFrameFactory(),
+                    includeMutableComponent, lsmHarness, indexAccessors, deletedKeysBTreeAccessors, pred,
+                    operationalComponents);
         }
         return initState;
     }
@@ -453,8 +455,8 @@
         opCtx.setOperation(IndexOperation.FLUSH);
         opCtx.getComponentHolder().add(flushingComponent);
         ioScheduler.scheduleOperation(new LSMInvertedIndexFlushOperation(
-                new LSMInvertedIndexAccessor(lsmHarness, opCtx), flushingComponent, componentFileRefs
-                        .getInsertIndexFileReference(), componentFileRefs.getDeleteIndexFileReference(),
+                new LSMInvertedIndexAccessor(lsmHarness, opCtx), flushingComponent,
+                componentFileRefs.getInsertIndexFileReference(), componentFileRefs.getDeleteIndexFileReference(),
                 componentFileRefs.getBloomFilterFileReference(), callback, fileManager.getBaseDir()));
     }
 
@@ -494,13 +496,12 @@
             filterTuples.add(flushingComponent.getLSMComponentFilter().getMaxTuple());
             filterManager.updateFilterInfo(component.getLSMComponentFilter(), filterTuples);
             filterManager.writeFilterInfo(component.getLSMComponentFilter(),
-                    ((OnDiskInvertedIndex) component.getInvIndex()).getBTree()
-            );
+                    ((OnDiskInvertedIndex) component.getInvIndex()).getBTree());
         }
         invIndexBulkLoader.end();
 
-        IIndexAccessor deletedKeysBTreeAccessor = flushingComponent.getDeletedKeysBTree().createAccessor(
-                NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+        IIndexAccessor deletedKeysBTreeAccessor = flushingComponent.getDeletedKeysBTree()
+                .createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
         IIndexCursor btreeCountingCursor = ((BTreeAccessor) deletedKeysBTreeAccessor).createCountingSearchCursor();
         deletedKeysBTreeAccessor.search(btreeCountingCursor, nullPred);
         long numBTreeTuples = 0L;
@@ -549,7 +550,8 @@
     @Override
     public void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
             throws HyracksDataException, IndexException {
-        LSMInvertedIndexOpContext ictx = createOpContext(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+        LSMInvertedIndexOpContext ictx = createOpContext(NoOpOperationCallback.INSTANCE,
+                NoOpOperationCallback.INSTANCE);
         ictx.setOperation(IndexOperation.MERGE);
         List<ILSMComponent> mergingComponents = ctx.getComponentHolder();
         IIndexCursor cursor = new LSMInvertedIndexRangeSearchCursor(ictx);
@@ -648,8 +650,7 @@
             }
             filterManager.updateFilterInfo(component.getLSMComponentFilter(), filterTuples);
             filterManager.writeFilterInfo(component.getLSMComponentFilter(),
-                    ((OnDiskInvertedIndex) component.getInvIndex()).getBTree()
-            );
+                    ((OnDiskInvertedIndex) component.getInvIndex()).getBTree());
         }
         invIndexBulkLoader.end();
 
@@ -676,7 +677,8 @@
     public IIndexBulkLoader createBulkLoader(float fillFactor, boolean verifyInput, long numElementsHint,
             boolean checkIfEmptyIndex, boolean appendOnly) throws IndexException {
         try {
-            return new LSMInvertedIndexBulkLoader(fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex, appendOnly);
+            return new LSMInvertedIndexBulkLoader(fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex,
+                    appendOnly);
         } catch (HyracksDataException e) {
             throw new IndexException(e);
         }
@@ -706,10 +708,10 @@
             }
             invIndexBulkLoader = ((LSMInvertedIndexDiskComponent) component).getInvIndex().createBulkLoader(fillFactor,
                     verifyInput, numElementsHint, false, true);
-            
+
             //validity of the component depends on the deleted keys file being there even if it's empty.
-            deletedKeysBTreeBulkLoader = ((LSMInvertedIndexDiskComponent) component).getDeletedKeysBTree().createBulkLoader(fillFactor,
-                    verifyInput, numElementsHint, false, true);
+            deletedKeysBTreeBulkLoader = ((LSMInvertedIndexDiskComponent) component).getDeletedKeysBTree()
+                    .createBulkLoader(fillFactor, verifyInput, numElementsHint, false, true);
 
             if (filterFields != null) {
                 indexTuple = new PermutingTupleReference(invertedIndexFields);
@@ -782,11 +784,11 @@
 
         @Override
         public void abort() throws HyracksDataException {
-            if( invIndexBulkLoader != null){
+            if (invIndexBulkLoader != null) {
                 invIndexBulkLoader.abort();
             }
 
-            if(deletedKeysBTreeBulkLoader != null){
+            if (deletedKeysBTreeBulkLoader != null) {
                 deletedKeysBTreeBulkLoader.abort();
             }
         }
@@ -800,11 +802,10 @@
     }
 
     protected LSMInvertedIndexDiskComponent createDiskInvIndexComponent(ILSMComponentFactory factory,
-            FileReference dictBTreeFileRef, FileReference btreeFileRef, FileReference bloomFilterFileRef, boolean create)
-            throws HyracksDataException, IndexException {
-        LSMInvertedIndexDiskComponent component = (LSMInvertedIndexDiskComponent) factory
-                .createLSMComponentInstance(new LSMComponentFileReferences(dictBTreeFileRef, btreeFileRef,
-                        bloomFilterFileRef));
+            FileReference dictBTreeFileRef, FileReference btreeFileRef, FileReference bloomFilterFileRef,
+            boolean create) throws HyracksDataException, IndexException {
+        LSMInvertedIndexDiskComponent component = (LSMInvertedIndexDiskComponent) factory.createLSMComponentInstance(
+                new LSMComponentFileReferences(dictBTreeFileRef, btreeFileRef, bloomFilterFileRef));
         if (create) {
             component.getBloomFilter().create();
             component.getBloomFilter().activate();
@@ -884,10 +885,10 @@
         LSMInvertedIndexDiskComponent invIndexComponent = (LSMInvertedIndexDiskComponent) lsmComponent;
         OnDiskInvertedIndex invIndex = (OnDiskInvertedIndex) invIndexComponent.getInvIndex();
         IBufferCache bufferCache = invIndex.getBufferCache();
-        markAsValidInternal(invIndex.getBufferCache(),invIndexComponent.getBloomFilter());
+        markAsValidInternal(invIndex.getBufferCache(), invIndexComponent.getBloomFilter());
 
         // Flush inverted index second.
-        bufferCache.force(invIndex.getInvListsFileId(),true);
+        bufferCache.force(invIndex.getInvListsFileId(), true);
         markAsValidInternal(invIndex.getBTree());
 
         // Flush deleted keys BTree.
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexOpContext.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexOpContext.java
index 49b1b03..c511a67 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexOpContext.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexOpContext.java
@@ -44,8 +44,8 @@
     private final List<ILSMComponent> componentsToBeMerged;
     private final List<ILSMComponent> componentsToBeReplicated;
 
-    public final IModificationOperationCallback modificationCallback;
-    public final ISearchOperationCallback searchCallback;
+    private IModificationOperationCallback modificationCallback;
+    private ISearchOperationCallback searchCallback;
 
     // Tuple that only has the inverted-index elements (aka keys), projecting away the document fields.
     public PermutingTupleReference keysOnlyTuple;
@@ -79,10 +79,10 @@
         for (int i = 0; i < mutableComponents.size(); i++) {
             LSMInvertedIndexMemoryComponent mutableComponent = (LSMInvertedIndexMemoryComponent) mutableComponents
                     .get(i);
-            mutableInvIndexAccessors[i] = (IInvertedIndexAccessor) mutableComponent.getInvIndex().createAccessor(
-                    NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
-            deletedKeysBTreeAccessors[i] = mutableComponent.getDeletedKeysBTree().createAccessor(
-                    NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+            mutableInvIndexAccessors[i] = (IInvertedIndexAccessor) mutableComponent.getInvIndex()
+                    .createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+            deletedKeysBTreeAccessors[i] = mutableComponent.getDeletedKeysBTree()
+                    .createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
         }
 
         assert mutableComponents.size() > 0;
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexSearchCursorInitialState.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexSearchCursorInitialState.java
index 91058af..274e399 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexSearchCursorInitialState.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexSearchCursorInitialState.java
@@ -48,11 +48,10 @@
 
     private final List<ILSMComponent> operationalComponents;
 
-    public LSMInvertedIndexSearchCursorInitialState(final MultiComparator keyCmp,
-            PermutingTupleReference keysOnlyTuple, List<IIndexAccessor> indexAccessors,
-            List<IIndexAccessor> deletedKeysBTreeAccessors, ITreeIndexFrameFactory deletedKeysBtreeLeafFrameFactory,
-            IIndexOperationContext ctx, boolean includeMemComponent, ILSMHarness lsmHarness,
-            List<ILSMComponent> operationalComponents) {
+    public LSMInvertedIndexSearchCursorInitialState(final MultiComparator keyCmp, PermutingTupleReference keysOnlyTuple,
+            List<IIndexAccessor> indexAccessors, List<IIndexAccessor> deletedKeysBTreeAccessors,
+            ITreeIndexFrameFactory deletedKeysBtreeLeafFrameFactory, IIndexOperationContext ctx,
+            boolean includeMemComponent, ILSMHarness lsmHarness, List<ILSMComponent> operationalComponents) {
         this.keyCmp = keyCmp;
         this.keysOnlyTuple = keysOnlyTuple;
         this.indexAccessors = indexAccessors;
@@ -62,7 +61,7 @@
         this.operationalComponents = operationalComponents;
         this.lsmHarness = lsmHarness;
         this.ctx = (LSMInvertedIndexOpContext) ctx;
-        this.searchCallback = this.ctx.searchCallback;
+        this.searchCallback = this.ctx.getSearchOperationCallback();
     }
 
     @Override
@@ -121,7 +120,7 @@
     public List<IIndexAccessor> getDeletedKeysBTreeAccessors() {
         return deletedKeysBTreeAccessors;
     }
-    
+
     public ITreeIndexFrameFactory getgetDeletedKeysBTreeLeafFrameFactory() {
         return deletedKeysBtreeLeafFrameFactory;
     }
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/search/InvertedIndexSearchPredicate.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/search/InvertedIndexSearchPredicate.java
index f379759..e37f007 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/search/InvertedIndexSearchPredicate.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/search/InvertedIndexSearchPredicate.java
@@ -80,4 +80,9 @@
         // TODO: This doesn't make sense for an inverted index. Change ISearchPredicate interface.
         return null;
     }
+
+    @Override
+    public ITupleReference getLowKey() {
+        return null;
+    }
 }
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java
index 61284f0..bb0c8d6 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java
@@ -214,6 +214,7 @@
             case INSERT:
             case DELETE:
             case FLUSH:
+            case UPSERT:
                 operationalComponents.add(memoryComponents.get(cmc));
                 break;
             case SEARCH:
@@ -251,14 +252,6 @@
     public void search(ILSMIndexOperationContext ictx, IIndexCursor cursor, ISearchPredicate pred)
             throws HyracksDataException, IndexException {
         LSMRTreeOpContext ctx = (LSMRTreeOpContext) ictx;
-        //List<ILSMComponent> operationalComponents = ictx.getComponentHolder();
-
-        /*
-        LSMRTreeCursorInitialState initialState = new LSMRTreeCursorInitialState(rtreeLeafFrameFactory,
-                rtreeInteriorFrameFactory, btreeLeafFrameFactory, ctx.getBTreeMultiComparator(), lsmHarness,
-                comparatorFields, linearizerArray, ctx.searchCallback, operationalComponents);
-        */
-
         cursor.open(ctx.searchInitialState, pred);
     }
 
@@ -353,8 +346,8 @@
             indexTuple = tuple;
         }
 
-        ctx.modificationCallback.before(indexTuple);
-        ctx.modificationCallback.found(null, indexTuple);
+        ctx.getModificationCallback().before(indexTuple);
+        ctx.getModificationCallback().found(null, indexTuple);
         if (ctx.getOperation() == IndexOperation.INSERT) {
             ctx.currentMutableRTreeAccessor.insert(indexTuple);
         } else {
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
index f839e15..3c91d62 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
@@ -123,7 +123,6 @@
     /**
      * Opens LSMRTree, cleaning up invalid files from base dir, and registering
      * all valid files as on-disk RTrees and BTrees.
-     * 
      * @throws HyracksDataException
      */
     @Override
@@ -493,8 +492,8 @@
             indexTuple = tuple;
         }
 
-        ctx.modificationCallback.before(indexTuple);
-        ctx.modificationCallback.found(null, indexTuple);
+        ctx.getModificationCallback().before(indexTuple);
+        ctx.getModificationCallback().found(null, indexTuple);
         if (ctx.getOperation() == IndexOperation.INSERT) {
             ctx.currentMutableRTreeAccessor.insert(indexTuple);
         } else {
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeOpContext.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeOpContext.java
index 2238af0..96c3380 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeOpContext.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeOpContext.java
@@ -55,8 +55,8 @@
     public final List<ILSMComponent> componentHolder;
     private final List<ILSMComponent> componentsToBeMerged;
     private final List<ILSMComponent> componentsToBeReplicated;
-    public final IModificationOperationCallback modificationCallback;
-    public final ISearchOperationCallback searchCallback;
+    private IModificationOperationCallback modificationCallback;
+    private ISearchOperationCallback searchCallback;
     public final PermutingTupleReference indexTuple;
     public final MultiComparator filterCmp;
     public final PermutingTupleReference filterTuple;
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTreeSearchCursor.java b/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTreeSearchCursor.java
index 5d8ce2e..6370458 100644
--- a/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTreeSearchCursor.java
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTreeSearchCursor.java
@@ -218,7 +218,7 @@
 
         pred = (SearchPredicate) searchPred;
         cmp = pred.getLowKeyComparator();
-        searchKey = pred.getSearchKey();
+        searchKey = pred.getLowKey();
 
         if (searchKey != null) {
             int maxFieldPos = cmp.getKeyFieldCount() / 2;
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/SearchPredicate.java b/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/SearchPredicate.java
index 643eb8e..da03608 100644
--- a/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/SearchPredicate.java
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/SearchPredicate.java
@@ -42,7 +42,8 @@
         this.cmp = cmp;
     }
 
-    public ITupleReference getSearchKey() {
+    @Override
+    public ITupleReference getLowKey() {
         return searchKey;
     }
 
@@ -50,10 +51,12 @@
         this.searchKey = searchKey;
     }
 
+    @Override
     public MultiComparator getLowKeyComparator() {
         return cmp;
     }
 
+    @Override
     public MultiComparator getHighKeyComparator() {
         return cmp;
     }
diff --git a/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/btree/AbstractModificationOperationCallbackTest.java b/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/btree/AbstractModificationOperationCallbackTest.java
index eebbf66..a9b2e1f 100644
--- a/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/btree/AbstractModificationOperationCallbackTest.java
+++ b/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/btree/AbstractModificationOperationCallbackTest.java
@@ -18,11 +18,6 @@
  */
 package org.apache.hyracks.storage.am.btree;
 
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleReference;
@@ -32,6 +27,10 @@
 import org.apache.hyracks.storage.am.common.api.IModificationOperationCallback;
 import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
 import org.apache.hyracks.storage.am.config.AccessMethodTestsConfig;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
 
 public abstract class AbstractModificationOperationCallbackTest extends AbstractOperationCallbackTest {
 
@@ -48,11 +47,13 @@
         this.isFoundNull = true;
     }
 
+    @Override
     @Before
     public void setup() throws Exception {
         super.setup();
     }
 
+    @Override
     @After
     public void tearDown() throws Exception {
         super.tearDown();
@@ -98,6 +99,10 @@
             Assert.assertEquals(0, cmp.compare(AbstractModificationOperationCallbackTest.this.tuple, after));
         }
 
+        @Override
+        public void setOp(Operation op) throws HyracksDataException {
+        }
+
     }
 
 }
diff --git a/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/btree/AbstractSearchOperationCallbackTest.java b/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/btree/AbstractSearchOperationCallbackTest.java
index 2094676..20a2c0f 100644
--- a/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/btree/AbstractSearchOperationCallbackTest.java
+++ b/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/btree/AbstractSearchOperationCallbackTest.java
@@ -26,11 +26,6 @@
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleReference;
@@ -41,6 +36,10 @@
 import org.apache.hyracks.storage.am.common.api.IIndexCursor;
 import org.apache.hyracks.storage.am.common.api.ISearchOperationCallback;
 import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
 
 public abstract class AbstractSearchOperationCallbackTest extends AbstractOperationCallbackTest {
     private static final int NUM_TASKS = 2;
@@ -57,12 +56,14 @@
         this.insertTaskStarted = false;
     }
 
+    @Override
     @Before
     public void setup() throws Exception {
         executor = Executors.newFixedThreadPool(NUM_TASKS);
         super.setup();
     }
 
+    @Override
     @After
     public void tearDown() throws Exception {
         executor.shutdown();
@@ -192,7 +193,10 @@
 
             @Override
             public void complete(ITupleReference tuple) throws HyracksDataException {
+            }
 
+            @Override
+            public void before(ITupleReference tuple) throws HyracksDataException {
             }
 
         }
diff --git a/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/common/TestOperationCallback.java b/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/common/TestOperationCallback.java
index 8e6a11d..15d8a60 100644
--- a/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/common/TestOperationCallback.java
+++ b/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/common/TestOperationCallback.java
@@ -25,7 +25,7 @@
 import org.apache.hyracks.storage.am.common.api.IModificationOperationCallback;
 import org.apache.hyracks.storage.am.common.api.ISearchOperationCallback;
 
-public enum TestOperationCallback implements ISearchOperationCallback, IModificationOperationCallback {
+public enum TestOperationCallback implements ISearchOperationCallback,IModificationOperationCallback {
     INSTANCE;
 
     private static final int RANDOM_SEED = 50;
@@ -48,12 +48,12 @@
 
     @Override
     public void before(ITupleReference tuple) {
-        // Do nothing.        
+        // Do nothing.
     }
 
     @Override
     public void found(ITupleReference before, ITupleReference after) {
-        // Do nothing.        
+        // Do nothing.
     }
 
     @Override
@@ -66,4 +66,9 @@
         // Do nothing.
     }
 
+    @Override
+    public void setOp(Operation op) throws HyracksDataException {
+        // Do nothing.
+    }
+
 }
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeSearchOperationCallbackTest.java b/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeSearchOperationCallbackTest.java
index 6712d1c..20dfb9c 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeSearchOperationCallbackTest.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeSearchOperationCallbackTest.java
@@ -22,9 +22,6 @@
 import java.util.concurrent.Callable;
 import java.util.concurrent.Future;
 
-import org.junit.Assert;
-import org.junit.Test;
-
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleReference;
@@ -41,6 +38,8 @@
 import org.apache.hyracks.storage.am.lsm.btree.util.LSMBTreeTestHarness;
 import org.apache.hyracks.storage.am.lsm.btree.util.LSMBTreeUtils;
 import org.apache.hyracks.storage.am.lsm.common.impls.NoOpOperationTrackerProvider;
+import org.junit.Assert;
+import org.junit.Test;
 
 public class LSMBTreeSearchOperationCallbackTest extends AbstractSearchOperationCallbackTest {
     private final LSMBTreeTestHarness harness;
@@ -207,6 +206,9 @@
                         expectedTupleToBeLockedValue);
             }
 
+            @Override
+            public void before(ITupleReference tuple) throws HyracksDataException {
+            }
         }
     }