Using btree upsert in lsmbtree
Added mechanism for passing around operation callbacks


git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_tree@1307 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/InsertPipelineExample.java b/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/InsertPipelineExample.java
index 301d1bd..88fef58 100644
--- a/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/InsertPipelineExample.java
+++ b/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/InsertPipelineExample.java
@@ -48,6 +48,7 @@
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOp;
 import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
 
@@ -151,9 +152,9 @@
 
         // create operator descriptor
         TreeIndexInsertUpdateDeleteOperatorDescriptor primaryInsert = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
-                spec, recDesc, storageManager, indexRegistryProvider, primarySplitProvider,
-                primaryTypeTraits, primaryComparatorFactories,
-                primaryFieldPermutation, IndexOp.INSERT, dataflowHelperFactory);
+                spec, recDesc, storageManager, indexRegistryProvider, primarySplitProvider, primaryTypeTraits,
+                primaryComparatorFactories, primaryFieldPermutation, IndexOp.INSERT, dataflowHelperFactory,
+                NoOpOperationCallbackProvider.INSTANCE);
         JobHelper.createPartitionConstraint(spec, primaryInsert, splitNCs);
 
         // prepare insertion into secondary index
@@ -175,9 +176,9 @@
                 options.secondaryBTreeName);
         // create operator descriptor
         TreeIndexInsertUpdateDeleteOperatorDescriptor secondaryInsert = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
-                spec, recDesc, storageManager, indexRegistryProvider, secondarySplitProvider,
-                secondaryTypeTraits,
-                secondaryComparatorFactories, secondaryFieldPermutation, IndexOp.INSERT, dataflowHelperFactory);
+                spec, recDesc, storageManager, indexRegistryProvider, secondarySplitProvider, secondaryTypeTraits,
+                secondaryComparatorFactories, secondaryFieldPermutation, IndexOp.INSERT, dataflowHelperFactory,
+                NoOpOperationCallbackProvider.INSTANCE);
         JobHelper.createPartitionConstraint(spec, secondaryInsert, splitNCs);
 
         // end the insert pipeline at this sink operator
diff --git a/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexBulkLoadExample.java b/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexBulkLoadExample.java
index 9375285..8f77c15 100644
--- a/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexBulkLoadExample.java
+++ b/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexBulkLoadExample.java
@@ -47,6 +47,7 @@
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
 import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
 
 // This example will load a primary index from randomly generated data
@@ -152,8 +153,8 @@
         IFileSplitProvider btreeSplitProvider = JobHelper.createFileSplitProvider(splitNCs, options.btreeName);
         IIndexDataflowHelperFactory dataflowHelperFactory = new BTreeDataflowHelperFactory();
         TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
-                storageManager, indexRegistryProvider, btreeSplitProvider,
-                typeTraits, comparatorFactories, fieldPermutation, 0.7f, dataflowHelperFactory);
+                storageManager, indexRegistryProvider, btreeSplitProvider, typeTraits, comparatorFactories,
+                fieldPermutation, 0.7f, dataflowHelperFactory, NoOpOperationCallbackProvider.INSTANCE);
         JobHelper.createPartitionConstraint(spec, btreeBulkLoad, splitNCs);
 
         // distribute the records from the datagen via hashing to the bulk load
diff --git a/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexSearchExample.java b/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexSearchExample.java
index 76aa660..1a43a0b 100644
--- a/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexSearchExample.java
+++ b/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexSearchExample.java
@@ -45,6 +45,7 @@
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
 import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
 
 // This example will perform an ordered scan on the primary index
@@ -142,8 +143,8 @@
         IFileSplitProvider btreeSplitProvider = JobHelper.createFileSplitProvider(splitNCs, options.btreeName);
         IIndexDataflowHelperFactory dataflowHelperFactory = new BTreeDataflowHelperFactory();
         BTreeSearchOperatorDescriptor btreeSearchOp = new BTreeSearchOperatorDescriptor(spec, recDesc, storageManager,
-                indexRegistryProvider, btreeSplitProvider, typeTraits,
-                comparatorFactories, lowKeyFields, highKeyFields, true, true, dataflowHelperFactory);
+                indexRegistryProvider, btreeSplitProvider, typeTraits, comparatorFactories, lowKeyFields,
+                highKeyFields, true, true, dataflowHelperFactory, NoOpOperationCallbackProvider.INSTANCE);
         JobHelper.createPartitionConstraint(spec, btreeSearchOp, splitNCs);
 
         // have each node print the results of its respective B-Tree
diff --git a/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexBulkLoadExample.java b/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexBulkLoadExample.java
index 729fae4..11c3ed6 100644
--- a/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexBulkLoadExample.java
+++ b/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexBulkLoadExample.java
@@ -41,6 +41,7 @@
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDiskOrderScanOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
 import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
 
 // This example will load a secondary index with <key, primary-index key> pairs
@@ -122,8 +123,8 @@
         IFileSplitProvider primarySplitProvider = JobHelper.createFileSplitProvider(splitNCs, options.primaryBTreeName);
         IIndexDataflowHelperFactory dataflowHelperFactory = new BTreeDataflowHelperFactory();
         TreeIndexDiskOrderScanOperatorDescriptor btreeScanOp = new TreeIndexDiskOrderScanOperatorDescriptor(spec,
-                recDesc, storageManager, indexRegistryProvider, primarySplitProvider, 
-                primaryTypeTraits, dataflowHelperFactory);
+                recDesc, storageManager, indexRegistryProvider, primarySplitProvider, primaryTypeTraits,
+                dataflowHelperFactory, NoOpOperationCallbackProvider.INSTANCE);
         JobHelper.createPartitionConstraint(spec, btreeScanOp, splitNCs);
 
         // sort the tuples as preparation for bulk load into secondary index
@@ -144,9 +145,8 @@
         int[] fieldPermutation = { 1, 0 };
         IFileSplitProvider btreeSplitProvider = JobHelper.createFileSplitProvider(splitNCs, options.secondaryBTreeName);
         TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
-                storageManager, indexRegistryProvider, btreeSplitProvider,
-                secondaryTypeTraits, comparatorFactories, fieldPermutation, 0.7f,
-                dataflowHelperFactory);
+                storageManager, indexRegistryProvider, btreeSplitProvider, secondaryTypeTraits, comparatorFactories,
+                fieldPermutation, 0.7f, dataflowHelperFactory, NoOpOperationCallbackProvider.INSTANCE);
         JobHelper.createPartitionConstraint(spec, btreeBulkLoad, splitNCs);
 
         // connect the ops
diff --git a/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexSearchExample.java b/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexSearchExample.java
index 67aa546..5f9d2bc 100644
--- a/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexSearchExample.java
+++ b/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexSearchExample.java
@@ -45,6 +45,7 @@
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
 import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
 
 // This example will perform range search on the secondary index
@@ -168,9 +169,9 @@
                 options.secondaryBTreeName);
         IIndexDataflowHelperFactory dataflowHelperFactory = new BTreeDataflowHelperFactory();
         BTreeSearchOperatorDescriptor secondarySearchOp = new BTreeSearchOperatorDescriptor(spec, secondaryRecDesc,
-                storageManager, indexRegistryProvider, secondarySplitProvider, 
-                secondaryTypeTraits, searchComparatorFactories, secondaryLowKeyFields,
-                secondaryHighKeyFields, true, true, dataflowHelperFactory);
+                storageManager, indexRegistryProvider, secondarySplitProvider, secondaryTypeTraits,
+                searchComparatorFactories, secondaryLowKeyFields, secondaryHighKeyFields, true, true,
+                dataflowHelperFactory, NoOpOperationCallbackProvider.INSTANCE);
         JobHelper.createPartitionConstraint(spec, secondarySearchOp, splitNCs);
 
         // secondary index will output tuples with [UTF8String, Integer]
@@ -184,9 +185,9 @@
 
         IFileSplitProvider primarySplitProvider = JobHelper.createFileSplitProvider(splitNCs, options.primaryBTreeName);
         BTreeSearchOperatorDescriptor primarySearchOp = new BTreeSearchOperatorDescriptor(spec, primaryRecDesc,
-                storageManager, indexRegistryProvider, primarySplitProvider, 
-                primaryTypeTraits, primaryComparatorFactories, primaryLowKeyFields,
-                primaryHighKeyFields, true, true, dataflowHelperFactory);
+                storageManager, indexRegistryProvider, primarySplitProvider, primaryTypeTraits,
+                primaryComparatorFactories, primaryLowKeyFields, primaryHighKeyFields, true, true,
+                dataflowHelperFactory, NoOpOperationCallbackProvider.INSTANCE);
         JobHelper.createPartitionConstraint(spec, primarySearchOp, splitNCs);
 
         // have each node print the results of its respective B-Tree
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/btree/AbstractBTreeOperatorTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/btree/AbstractBTreeOperatorTest.java
index 9d252bc..f512ba0 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/btree/AbstractBTreeOperatorTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/btree/AbstractBTreeOperatorTest.java
@@ -50,6 +50,7 @@
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOp;
 import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
 import edu.uci.ics.hyracks.test.support.TestIndexRegistryProvider;
@@ -158,7 +159,8 @@
         int[] fieldPermutation = { 0, 1, 2, 4, 5, 7 };
         TreeIndexBulkLoadOperatorDescriptor primaryBtreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
                 storageManager, indexRegistryProvider, primarySplitProvider, primaryTypeTraits,
-                primaryComparatorFactories, fieldPermutation, 0.7f, dataflowHelperFactory);
+                primaryComparatorFactories, fieldPermutation, 0.7f, dataflowHelperFactory,
+                NoOpOperationCallbackProvider.INSTANCE);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryBtreeBulkLoad, NC1_ID);
 
         spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0);
@@ -194,7 +196,8 @@
         // scan primary index
         BTreeSearchOperatorDescriptor primaryBtreeSearchOp = new BTreeSearchOperatorDescriptor(spec, primaryRecDesc,
                 storageManager, indexRegistryProvider, primarySplitProvider, primaryTypeTraits,
-                primaryComparatorFactories, lowKeyFields, highKeyFields, true, true, dataflowHelperFactory);
+                primaryComparatorFactories, lowKeyFields, highKeyFields, true, true, dataflowHelperFactory,
+                NoOpOperationCallbackProvider.INSTANCE);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryBtreeSearchOp, NC1_ID);
 
         // sort based on secondary keys
@@ -207,7 +210,8 @@
         int[] fieldPermutation = { 3, 0 };
         TreeIndexBulkLoadOperatorDescriptor secondaryBtreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
                 storageManager, indexRegistryProvider, secondarySplitProvider, secondaryTypeTraits,
-                secondaryComparatorFactories, fieldPermutation, 0.7f, dataflowHelperFactory);
+                secondaryComparatorFactories, fieldPermutation, 0.7f, dataflowHelperFactory,
+                NoOpOperationCallbackProvider.INSTANCE);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondaryBtreeBulkLoad, NC1_ID);
 
         spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primaryBtreeSearchOp, 0);
@@ -218,8 +222,8 @@
         runTest(spec);
     }
 
-    protected void insertPipeline() throws Exception {
-
+    protected void insertPipeline(boolean useUpsert) throws Exception {
+        IndexOp pipelineOperation = useUpsert ? IndexOp.INSERT : IndexOp.UPSERT;
         JobSpecification spec = new JobSpecification();
 
         FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
@@ -244,14 +248,16 @@
         int[] primaryFieldPermutation = { 0, 1, 2, 4, 5, 7 };
         TreeIndexInsertUpdateDeleteOperatorDescriptor primaryBtreeInsertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
                 spec, ordersDesc, storageManager, indexRegistryProvider, primarySplitProvider, primaryTypeTraits,
-                primaryComparatorFactories, primaryFieldPermutation, IndexOp.INSERT, dataflowHelperFactory);
+                primaryComparatorFactories, primaryFieldPermutation, pipelineOperation, dataflowHelperFactory,
+                NoOpOperationCallbackProvider.INSTANCE);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryBtreeInsertOp, NC1_ID);
 
         // first secondary index
         int[] fieldPermutationB = { 4, 0 };
         TreeIndexInsertUpdateDeleteOperatorDescriptor secondaryInsertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
                 spec, ordersDesc, storageManager, indexRegistryProvider, secondarySplitProvider, secondaryTypeTraits,
-                secondaryComparatorFactories, fieldPermutationB, IndexOp.INSERT, dataflowHelperFactory);
+                secondaryComparatorFactories, fieldPermutationB, pipelineOperation, dataflowHelperFactory,
+                NoOpOperationCallbackProvider.INSTANCE);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondaryInsertOp, NC1_ID);
 
         NullSinkOperatorDescriptor nullSink = new NullSinkOperatorDescriptor(spec);
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/btree/BTreePrimaryIndexScanOperatorTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/btree/BTreePrimaryIndexScanOperatorTest.java
index 66ef309..3bdaa5a 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/btree/BTreePrimaryIndexScanOperatorTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/btree/BTreePrimaryIndexScanOperatorTest.java
@@ -35,9 +35,10 @@
 import edu.uci.ics.hyracks.dataflow.std.misc.ConstantTupleSourceOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
 
 public class BTreePrimaryIndexScanOperatorTest extends AbstractBTreeOperatorTest {
-    
+
     @Before
     public void setup() throws Exception {
         super.setup();
@@ -68,9 +69,9 @@
         int[] highKeyFields = null; // + infinity
 
         BTreeSearchOperatorDescriptor primaryBtreeSearchOp = new BTreeSearchOperatorDescriptor(spec, primaryRecDesc,
-                storageManager, indexRegistryProvider, primarySplitProvider,
-                primaryTypeTraits, primaryComparatorFactories, lowKeyFields,
-                highKeyFields, true, true, dataflowHelperFactory);
+                storageManager, indexRegistryProvider, primarySplitProvider, primaryTypeTraits,
+                primaryComparatorFactories, lowKeyFields, highKeyFields, true, true, dataflowHelperFactory,
+                NoOpOperationCallbackProvider.INSTANCE);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryBtreeSearchOp, NC1_ID);
 
         IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
@@ -84,7 +85,7 @@
         spec.addRoot(printer);
         runTest(spec);
     }
-    
+
     @Override
     protected IIndexDataflowHelperFactory createDataFlowHelperFactory() {
         return ((BTreeOperatorTestHelper) testHelper).createDataFlowHelperFactory();
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/btree/BTreePrimaryIndexSearchOperatorTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/btree/BTreePrimaryIndexSearchOperatorTest.java
index 1ecb84c..878507b 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/btree/BTreePrimaryIndexSearchOperatorTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/btree/BTreePrimaryIndexSearchOperatorTest.java
@@ -35,9 +35,10 @@
 import edu.uci.ics.hyracks.dataflow.std.misc.ConstantTupleSourceOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
 
 public class BTreePrimaryIndexSearchOperatorTest extends AbstractBTreeOperatorTest {
-    
+
     @Before
     public void setup() throws Exception {
         super.setup();
@@ -73,9 +74,9 @@
         int[] highKeyFields = { 1 };
 
         BTreeSearchOperatorDescriptor primaryBtreeSearchOp = new BTreeSearchOperatorDescriptor(spec, primaryRecDesc,
-                storageManager, indexRegistryProvider, primarySplitProvider, 
-                primaryTypeTraits, primaryComparatorFactories, lowKeyFields,
-                highKeyFields, true, true, dataflowHelperFactory);
+                storageManager, indexRegistryProvider, primarySplitProvider, primaryTypeTraits,
+                primaryComparatorFactories, lowKeyFields, highKeyFields, true, true, dataflowHelperFactory,
+                NoOpOperationCallbackProvider.INSTANCE);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryBtreeSearchOp, NC1_ID);
 
         IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
@@ -89,7 +90,7 @@
         spec.addRoot(printer);
         runTest(spec);
     }
-    
+
     @Override
     protected IIndexDataflowHelperFactory createDataFlowHelperFactory() {
         return ((BTreeOperatorTestHelper) testHelper).createDataFlowHelperFactory();
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/btree/BTreePrimaryIndexStatsOperatorTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/btree/BTreePrimaryIndexStatsOperatorTest.java
index 22f97f7..326a4b9 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/btree/BTreePrimaryIndexStatsOperatorTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/btree/BTreePrimaryIndexStatsOperatorTest.java
@@ -28,22 +28,23 @@
 import edu.uci.ics.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexStatsOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
 
 public class BTreePrimaryIndexStatsOperatorTest extends AbstractBTreeOperatorTest {
-    
+
     @Before
     public void setup() throws Exception {
         super.setup();
         loadPrimaryIndex();
     }
-    
+
     @Test
     public void showPrimaryIndexStats() throws Exception {
         JobSpecification spec = new JobSpecification();
 
         TreeIndexStatsOperatorDescriptor primaryStatsOp = new TreeIndexStatsOperatorDescriptor(spec, storageManager,
-                indexRegistryProvider, primarySplitProvider,
-                primaryTypeTraits, primaryComparatorFactories, dataflowHelperFactory);
+                indexRegistryProvider, primarySplitProvider, primaryTypeTraits, primaryComparatorFactories,
+                dataflowHelperFactory, NoOpOperationCallbackProvider.INSTANCE);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryStatsOp, NC1_ID);
         IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
                 createTempFile().getAbsolutePath()) });
@@ -54,7 +55,7 @@
         spec.addRoot(printer);
         runTest(spec);
     }
-    
+
     @Override
     protected IIndexDataflowHelperFactory createDataFlowHelperFactory() {
         return ((BTreeOperatorTestHelper) testHelper).createDataFlowHelperFactory();
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/btree/BTreeSecondaryIndexInsertOperatorTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/btree/BTreeSecondaryIndexInsertOperatorTest.java
index e48294f..50bc259 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/btree/BTreeSecondaryIndexInsertOperatorTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/btree/BTreeSecondaryIndexInsertOperatorTest.java
@@ -35,6 +35,7 @@
 import edu.uci.ics.hyracks.dataflow.std.misc.ConstantTupleSourceOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
 
 public class BTreeSecondaryIndexInsertOperatorTest extends AbstractBTreeOperatorTest {
 
@@ -43,7 +44,7 @@
         super.setup();
         loadPrimaryIndex();
         loadSecondaryIndex();
-        insertPipeline();
+        insertPipeline(false);
     }
 
     @Test
@@ -76,9 +77,9 @@
 
         // search secondary index
         BTreeSearchOperatorDescriptor secondaryBtreeSearchOp = new BTreeSearchOperatorDescriptor(spec,
-                secondaryRecDesc, storageManager, indexRegistryProvider, secondarySplitProvider,
-                secondaryTypeTraits, secondaryComparatorFactories, secondaryLowKeyFields, secondaryHighKeyFields, true, true,
-                dataflowHelperFactory);
+                secondaryRecDesc, storageManager, indexRegistryProvider, secondarySplitProvider, secondaryTypeTraits,
+                secondaryComparatorFactories, secondaryLowKeyFields, secondaryHighKeyFields, true, true,
+                dataflowHelperFactory, NoOpOperationCallbackProvider.INSTANCE);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondaryBtreeSearchOp, NC1_ID);
 
         // second field from the tuples coming from secondary index
@@ -88,9 +89,9 @@
 
         // search primary index
         BTreeSearchOperatorDescriptor primaryBtreeSearchOp = new BTreeSearchOperatorDescriptor(spec, primaryRecDesc,
-                storageManager, indexRegistryProvider, primarySplitProvider, 
-                primaryTypeTraits, primaryComparatorFactories, primaryLowKeyFields,
-                primaryHighKeyFields, true, true, dataflowHelperFactory);
+                storageManager, indexRegistryProvider, primarySplitProvider, primaryTypeTraits,
+                primaryComparatorFactories, primaryLowKeyFields, primaryHighKeyFields, true, true,
+                dataflowHelperFactory, NoOpOperationCallbackProvider.INSTANCE);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryBtreeSearchOp, NC1_ID);
 
         IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
@@ -105,7 +106,7 @@
         spec.addRoot(printer);
         runTest(spec);
     }
-    
+
     @Override
     protected IIndexDataflowHelperFactory createDataFlowHelperFactory() {
         return ((BTreeOperatorTestHelper) testHelper).createDataFlowHelperFactory();
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/btree/BTreeSecondaryIndexSearchOperatorTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/btree/BTreeSecondaryIndexSearchOperatorTest.java
index b1b6e39..cc8fe12 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/btree/BTreeSecondaryIndexSearchOperatorTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/btree/BTreeSecondaryIndexSearchOperatorTest.java
@@ -35,6 +35,7 @@
 import edu.uci.ics.hyracks.dataflow.std.misc.ConstantTupleSourceOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
 
 public class BTreeSecondaryIndexSearchOperatorTest extends AbstractBTreeOperatorTest {
 
@@ -77,7 +78,7 @@
         BTreeSearchOperatorDescriptor secondaryBtreeSearchOp = new BTreeSearchOperatorDescriptor(spec,
                 secondaryRecDesc, storageManager, indexRegistryProvider, secondarySplitProvider, secondaryTypeTraits,
                 secondaryComparatorFactories, secondaryLowKeyFields, secondaryHighKeyFields, true, true,
-                dataflowHelperFactory);
+                dataflowHelperFactory, NoOpOperationCallbackProvider.INSTANCE);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondaryBtreeSearchOp, NC1_ID);
 
         int[] primaryLowKeyFields = { 1 }; // second field from the tuples
@@ -89,7 +90,7 @@
         BTreeSearchOperatorDescriptor primaryBtreeSearchOp = new BTreeSearchOperatorDescriptor(spec, primaryRecDesc,
                 storageManager, indexRegistryProvider, primarySplitProvider, primaryTypeTraits,
                 primaryComparatorFactories, primaryLowKeyFields, primaryHighKeyFields, true, true,
-                dataflowHelperFactory);
+                dataflowHelperFactory, NoOpOperationCallbackProvider.INSTANCE);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryBtreeSearchOp, NC1_ID);
 
         IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/btree/BTreeSecondaryIndexUpsertOperatorTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/btree/BTreeSecondaryIndexUpsertOperatorTest.java
new file mode 100644
index 0000000..374d1a0
--- /dev/null
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/btree/BTreeSecondaryIndexUpsertOperatorTest.java
@@ -0,0 +1,99 @@
+package edu.uci.ics.hyracks.tests.am.btree;
+
+import java.io.DataOutput;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.misc.ConstantTupleSourceOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
+
+public class BTreeSecondaryIndexUpsertOperatorTest extends AbstractBTreeOperatorTest {
+
+    @Before
+    public void setup() throws Exception {
+        super.setup();
+        loadPrimaryIndex();
+        loadSecondaryIndex();
+        insertPipeline(true);
+    }
+
+    @Test
+    public void searchUpdatedSecondaryIndexTest() throws Exception {
+        JobSpecification spec = new JobSpecification();
+
+        // build tuple containing search keys (only use the first key as search
+        // key)
+        ArrayTupleBuilder tb = new ArrayTupleBuilder(secondaryKeyFieldCount);
+        DataOutput dos = tb.getDataOutput();
+
+        tb.reset();
+        // low key
+        UTF8StringSerializerDeserializer.INSTANCE.serialize("1998-07-21", dos);
+        tb.addFieldEndOffset();
+        // high key
+        UTF8StringSerializerDeserializer.INSTANCE.serialize("2000-10-18", dos);
+        tb.addFieldEndOffset();
+
+        ISerializerDeserializer[] keyRecDescSers = { UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE };
+        RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
+
+        ConstantTupleSourceOperatorDescriptor keyProviderOp = new ConstantTupleSourceOperatorDescriptor(spec,
+                keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, keyProviderOp, NC1_ID);
+
+        int[] secondaryLowKeyFields = { 0 };
+        int[] secondaryHighKeyFields = { 1 };
+
+        // search secondary index
+        BTreeSearchOperatorDescriptor secondaryBtreeSearchOp = new BTreeSearchOperatorDescriptor(spec,
+                secondaryRecDesc, storageManager, indexRegistryProvider, secondarySplitProvider, secondaryTypeTraits,
+                secondaryComparatorFactories, secondaryLowKeyFields, secondaryHighKeyFields, true, true,
+                dataflowHelperFactory, NoOpOperationCallbackProvider.INSTANCE);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondaryBtreeSearchOp, NC1_ID);
+
+        // second field from the tuples coming from secondary index
+        int[] primaryLowKeyFields = { 1 };
+        // second field from the tuples coming from secondary index
+        int[] primaryHighKeyFields = { 1 };
+
+        // search primary index
+        BTreeSearchOperatorDescriptor primaryBtreeSearchOp = new BTreeSearchOperatorDescriptor(spec, primaryRecDesc,
+                storageManager, indexRegistryProvider, primarySplitProvider, primaryTypeTraits,
+                primaryComparatorFactories, primaryLowKeyFields, primaryHighKeyFields, true, true,
+                dataflowHelperFactory, NoOpOperationCallbackProvider.INSTANCE);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryBtreeSearchOp, NC1_ID);
+
+        IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
+                createTempFile().getAbsolutePath()) });
+        IOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(spec, outSplits, ",");
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
+
+        spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, secondaryBtreeSearchOp, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), secondaryBtreeSearchOp, 0, primaryBtreeSearchOp, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), primaryBtreeSearchOp, 0, printer, 0);
+
+        spec.addRoot(printer);
+        runTest(spec);
+    }
+
+    @Override
+    protected IIndexDataflowHelperFactory createDataFlowHelperFactory() {
+        return ((BTreeOperatorTestHelper) testHelper).createDataFlowHelperFactory();
+    }
+}
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/WordInvertedIndexTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/WordInvertedIndexTest.java
index a3afcbd..c54deea 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/WordInvertedIndexTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/WordInvertedIndexTest.java
@@ -56,6 +56,7 @@
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
 import edu.uci.ics.hyracks.storage.am.invertedindex.api.IInvertedIndexSearchModifierFactory;
 import edu.uci.ics.hyracks.storage.am.invertedindex.dataflow.BinaryTokenizerOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.invertedindex.dataflow.InvertedIndexBulkLoadOperatorDescriptor;
@@ -165,7 +166,8 @@
         int[] fieldPermutation = { 0, 1 };
         TreeIndexBulkLoadOperatorDescriptor primaryBtreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
                 storageManager, indexRegistryProvider, primaryFileSplitProvider, primaryTypeTraits,
-                primaryComparatorFactories, fieldPermutation, 0.7f, btreeDataflowHelperFactory);
+                primaryComparatorFactories, fieldPermutation, 0.7f, btreeDataflowHelperFactory,
+                NoOpOperationCallbackProvider.INSTANCE);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryBtreeBulkLoad, NC1_ID);
         return primaryBtreeBulkLoad;
     }
@@ -191,7 +193,8 @@
         int[] highKeyFields = null; // + infinity
         BTreeSearchOperatorDescriptor primaryBtreeSearchOp = new BTreeSearchOperatorDescriptor(spec, primaryRecDesc,
                 storageManager, indexRegistryProvider, primaryFileSplitProvider, primaryTypeTraits,
-                primaryComparatorFactories, lowKeyFields, highKeyFields, true, true, btreeDataflowHelperFactory);
+                primaryComparatorFactories, lowKeyFields, highKeyFields, true, true, btreeDataflowHelperFactory,
+                NoOpOperationCallbackProvider.INSTANCE);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryBtreeSearchOp, NC1_ID);
         return primaryBtreeSearchOp;
     }
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/rtree/AbstractRTreeOperatorTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/rtree/AbstractRTreeOperatorTest.java
index 4848920..e35d181 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/rtree/AbstractRTreeOperatorTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/rtree/AbstractRTreeOperatorTest.java
@@ -55,6 +55,7 @@
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOp;
 import edu.uci.ics.hyracks.storage.am.rtree.util.RTreeUtils;
 import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
@@ -203,7 +204,8 @@
         int[] fieldPermutation = { 0, 1, 2, 4, 5, 7, 9, 10, 11, 12 };
         TreeIndexBulkLoadOperatorDescriptor primaryBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
                 storageManager, indexRegistryProvider, primarySplitProvider, primaryTypeTraits,
-                primaryComparatorFactories, fieldPermutation, 0.7f, btreeDataflowHelperFactory);
+                primaryComparatorFactories, fieldPermutation, 0.7f, btreeDataflowHelperFactory,
+                NoOpOperationCallbackProvider.INSTANCE);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryBulkLoad, NC1_ID);
 
         spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0);
@@ -239,14 +241,16 @@
         // scan primary index
         BTreeSearchOperatorDescriptor primarySearchOp = new BTreeSearchOperatorDescriptor(spec, primaryRecDesc,
                 storageManager, indexRegistryProvider, primarySplitProvider, primaryTypeTraits,
-                primaryComparatorFactories, lowKeyFields, highKeyFields, true, true, btreeDataflowHelperFactory);
+                primaryComparatorFactories, lowKeyFields, highKeyFields, true, true, btreeDataflowHelperFactory,
+                NoOpOperationCallbackProvider.INSTANCE);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primarySearchOp, NC1_ID);
 
         // load secondary index
         int[] fieldPermutation = { 6, 7, 8, 9, 0 };
         TreeIndexBulkLoadOperatorDescriptor secondaryBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
                 storageManager, indexRegistryProvider, secondarySplitProvider, secondaryTypeTraits,
-                secondaryComparatorFactories, fieldPermutation, 0.7f, rtreeDataflowHelperFactory);
+                secondaryComparatorFactories, fieldPermutation, 0.7f, rtreeDataflowHelperFactory,
+                NoOpOperationCallbackProvider.INSTANCE);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondaryBulkLoad, NC1_ID);
 
         spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primarySearchOp, 0);
@@ -286,14 +290,16 @@
         int[] primaryFieldPermutation = { 0, 1, 2, 4, 5, 7, 9, 10, 11, 12 };
         TreeIndexInsertUpdateDeleteOperatorDescriptor primaryInsertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
                 spec, ordersDesc, storageManager, indexRegistryProvider, primarySplitProvider, primaryTypeTraits,
-                primaryComparatorFactories, primaryFieldPermutation, IndexOp.INSERT, btreeDataflowHelperFactory);
+                primaryComparatorFactories, primaryFieldPermutation, IndexOp.INSERT, btreeDataflowHelperFactory,
+                NoOpOperationCallbackProvider.INSTANCE);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryInsertOp, NC1_ID);
 
         // secondary index
         int[] secondaryFieldPermutation = { 6, 7, 8, 9, 0 };
         TreeIndexInsertUpdateDeleteOperatorDescriptor secondaryInsertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
                 spec, ordersDesc, storageManager, indexRegistryProvider, secondarySplitProvider, secondaryTypeTraits,
-                secondaryComparatorFactories, secondaryFieldPermutation, IndexOp.INSERT, rtreeDataflowHelperFactory);
+                secondaryComparatorFactories, secondaryFieldPermutation, IndexOp.INSERT, rtreeDataflowHelperFactory,
+                NoOpOperationCallbackProvider.INSTANCE);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondaryInsertOp, NC1_ID);
 
         NullSinkOperatorDescriptor nullSink = new NullSinkOperatorDescriptor(spec);
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/rtree/RTreeSecondaryIndexInsertOperatorTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/rtree/RTreeSecondaryIndexInsertOperatorTest.java
index d75c0bc..83fc78a 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/rtree/RTreeSecondaryIndexInsertOperatorTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/rtree/RTreeSecondaryIndexInsertOperatorTest.java
@@ -37,6 +37,7 @@
 import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
 import edu.uci.ics.hyracks.storage.am.rtree.dataflow.RTreeSearchOperatorDescriptor;
 
 public class RTreeSecondaryIndexInsertOperatorTest extends AbstractRTreeOperatorTest {
@@ -80,7 +81,8 @@
 
         RTreeSearchOperatorDescriptor secondarySearchOp = new RTreeSearchOperatorDescriptor(spec, secondaryRecDesc,
                 storageManager, indexRegistryProvider, secondarySplitProvider, secondaryTypeTraits,
-                secondaryComparatorFactories, keyFields, rtreeDataflowHelperFactory);
+                secondaryComparatorFactories, keyFields, rtreeDataflowHelperFactory,
+                NoOpOperationCallbackProvider.INSTANCE);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondarySearchOp, NC1_ID);
 
         // fifth field from the tuples coming from secondary index
@@ -92,7 +94,7 @@
         BTreeSearchOperatorDescriptor primarySearchOp = new BTreeSearchOperatorDescriptor(spec, primaryRecDesc,
                 storageManager, indexRegistryProvider, primarySplitProvider, primaryTypeTraits,
                 primaryComparatorFactories, primaryLowKeyFields, primaryHighKeyFields, true, true,
-                btreeDataflowHelperFactory);
+                btreeDataflowHelperFactory, NoOpOperationCallbackProvider.INSTANCE);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primarySearchOp, NC1_ID);
 
         IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/rtree/RTreeSecondaryIndexScanOperatorTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/rtree/RTreeSecondaryIndexScanOperatorTest.java
index 21fbd91..0d1b319 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/rtree/RTreeSecondaryIndexScanOperatorTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/rtree/RTreeSecondaryIndexScanOperatorTest.java
@@ -36,6 +36,7 @@
 import edu.uci.ics.hyracks.dataflow.std.misc.ConstantTupleSourceOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
 import edu.uci.ics.hyracks.storage.am.rtree.dataflow.RTreeSearchOperatorDescriptor;
 
 public class RTreeSecondaryIndexScanOperatorTest extends AbstractRTreeOperatorTest {
@@ -78,7 +79,8 @@
 
         RTreeSearchOperatorDescriptor secondarySearchOp = new RTreeSearchOperatorDescriptor(spec, secondaryRecDesc,
                 storageManager, indexRegistryProvider, secondarySplitProvider, secondaryTypeTraits,
-                secondaryComparatorFactories, keyFields, rtreeDataflowHelperFactory);
+                secondaryComparatorFactories, keyFields, rtreeDataflowHelperFactory,
+                NoOpOperationCallbackProvider.INSTANCE);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondarySearchOp, NC1_ID);
 
         IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/rtree/RTreeSecondaryIndexSearchOperatorTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/rtree/RTreeSecondaryIndexSearchOperatorTest.java
index 0e3203d..e804a45 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/rtree/RTreeSecondaryIndexSearchOperatorTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/rtree/RTreeSecondaryIndexSearchOperatorTest.java
@@ -37,6 +37,7 @@
 import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
 import edu.uci.ics.hyracks.storage.am.rtree.dataflow.RTreeSearchOperatorDescriptor;
 
 public class RTreeSecondaryIndexSearchOperatorTest extends AbstractRTreeOperatorTest {
@@ -79,7 +80,8 @@
 
         RTreeSearchOperatorDescriptor secondarySearchOp = new RTreeSearchOperatorDescriptor(spec, secondaryRecDesc,
                 storageManager, indexRegistryProvider, secondarySplitProvider, secondaryTypeTraits,
-                secondaryComparatorFactories, keyFields, rtreeDataflowHelperFactory);
+                secondaryComparatorFactories, keyFields, rtreeDataflowHelperFactory,
+                NoOpOperationCallbackProvider.INSTANCE);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondarySearchOp, NC1_ID);
 
         // fifth field from the tuples coming from secondary index
@@ -91,7 +93,7 @@
         BTreeSearchOperatorDescriptor primarySearchOp = new BTreeSearchOperatorDescriptor(spec, primaryRecDesc,
                 storageManager, indexRegistryProvider, primarySplitProvider, primaryTypeTraits,
                 primaryComparatorFactories, primaryLowKeyFields, primaryHighKeyFields, true, true,
-                btreeDataflowHelperFactory);
+                btreeDataflowHelperFactory, NoOpOperationCallbackProvider.INSTANCE);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primarySearchOp, NC1_ID);
 
         IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/rtree/RTreeSecondaryIndexStatsOperatorTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/rtree/RTreeSecondaryIndexStatsOperatorTest.java
index 7c308cd..d858b95 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/rtree/RTreeSecondaryIndexStatsOperatorTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/rtree/RTreeSecondaryIndexStatsOperatorTest.java
@@ -30,6 +30,7 @@
 import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexStatsOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
 
 public class RTreeSecondaryIndexStatsOperatorTest extends AbstractRTreeOperatorTest {
 
@@ -46,7 +47,7 @@
 
         TreeIndexStatsOperatorDescriptor secondaryStatsOp = new TreeIndexStatsOperatorDescriptor(spec, storageManager,
                 indexRegistryProvider, secondarySplitProvider, secondaryTypeTraits, secondaryComparatorFactories,
-                rtreeDataflowHelperFactory);
+                rtreeDataflowHelperFactory, NoOpOperationCallbackProvider.INSTANCE);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondaryStatsOp, NC1_ID);
         IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
                 createTempFile().getAbsolutePath()) });
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDataflowHelper.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDataflowHelper.java
index a31a57c..9bb3ffa 100644
--- a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDataflowHelper.java
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDataflowHelper.java
@@ -20,23 +20,22 @@
 import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeException;
 import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeLeafFrameType;
 import edu.uci.ics.hyracks.storage.am.btree.util.BTreeUtils;
+import edu.uci.ics.hyracks.storage.am.common.api.IOperationCallbackProvider;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
-import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
 
 public class BTreeDataflowHelper extends TreeIndexDataflowHelper {
-    public BTreeDataflowHelper(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx, int partition,
-            boolean createIfNotExists) {
-        super(opDesc, ctx, partition, createIfNotExists);
+    public BTreeDataflowHelper(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
+            IOperationCallbackProvider opCallbackProvider, int partition, boolean createIfNotExists) {
+        super(opDesc, ctx, opCallbackProvider, partition, createIfNotExists);
     }
 
     @Override
     public ITreeIndex createIndexInstance() throws HyracksDataException {
         try {
-            // TODO: Figure out where to get the proper operation callback from.
             return BTreeUtils.createBTree(opDesc.getStorageManager().getBufferCache(ctx),
-                    NoOpOperationCallback.INSTANCE, treeOpDesc.getTreeIndexTypeTraits(),
+                    opCallbackProvider.getOperationCallback(), treeOpDesc.getTreeIndexTypeTraits(),
                     treeOpDesc.getTreeIndexComparatorFactories(), BTreeLeafFrameType.REGULAR_NSM);
         } catch (BTreeException e) {
             throw new HyracksDataException(e);
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDataflowHelperFactory.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDataflowHelperFactory.java
index 308e305..7dcc5f3 100644
--- a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDataflowHelperFactory.java
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDataflowHelperFactory.java
@@ -16,6 +16,7 @@
 package edu.uci.ics.hyracks.storage.am.btree.dataflow;
 
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.storage.am.common.api.IOperationCallbackProvider;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexDataflowHelper;
@@ -26,7 +27,7 @@
 
     @Override
     public IndexDataflowHelper createIndexDataflowHelper(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
-            int partition, boolean createIfNotExists) {
-        return new BTreeDataflowHelper(opDesc, ctx, partition, createIfNotExists);
+            IOperationCallbackProvider opCallbackProvider, int partition, boolean createIfNotExists) {
+        return new BTreeDataflowHelper(opDesc, ctx, opCallbackProvider, partition, createIfNotExists);
     }
 }
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorDescriptor.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorDescriptor.java
index 6e05922..5883a1b 100644
--- a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorDescriptor.java
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorDescriptor.java
@@ -23,6 +23,7 @@
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IOperationCallbackProvider;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
@@ -44,9 +45,10 @@
             IStorageManagerInterface storageManager, IIndexRegistryProvider<IIndex> indexRegistryProvider,
             IFileSplitProvider fileSplitProvider, ITypeTraits[] typeTraits,
             IBinaryComparatorFactory[] comparatorFactories, int[] lowKeyFields, int[] highKeyFields,
-            boolean lowKeyInclusive, boolean highKeyInclusive, IIndexDataflowHelperFactory dataflowHelperFactory) {
+            boolean lowKeyInclusive, boolean highKeyInclusive, IIndexDataflowHelperFactory dataflowHelperFactory,
+            IOperationCallbackProvider opCallbackProvider) {
         super(spec, 1, 1, recDesc, storageManager, indexRegistryProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, dataflowHelperFactory);
+                comparatorFactories, dataflowHelperFactory, opCallbackProvider);
         this.lowKeyFields = lowKeyFields;
         this.highKeyFields = highKeyFields;
         this.lowKeyInclusive = lowKeyInclusive;
@@ -56,7 +58,7 @@
     @Override
     public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
             IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
-        return new BTreeSearchOperatorNodePushable(this, ctx, partition, recordDescProvider, lowKeyFields,
-                highKeyFields, lowKeyInclusive, highKeyInclusive);
+        return new BTreeSearchOperatorNodePushable(this, ctx, opCallbackProvider, partition, recordDescProvider,
+                lowKeyFields, highKeyFields, lowKeyInclusive, highKeyInclusive);
     }
 }
\ No newline at end of file
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
index 10c5b6e..d583d04 100644
--- a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
@@ -18,6 +18,7 @@
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
 import edu.uci.ics.hyracks.storage.am.btree.util.BTreeUtils;
+import edu.uci.ics.hyracks.storage.am.common.api.IOperationCallbackProvider;
 import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.PermutingFrameTupleReference;
@@ -25,17 +26,17 @@
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
 
 public class BTreeSearchOperatorNodePushable extends TreeIndexSearchOperatorNodePushable {
-	protected PermutingFrameTupleReference lowKey;
-	protected PermutingFrameTupleReference highKey;
-	protected boolean lowKeyInclusive;
-	protected boolean highKeyInclusive;
-	protected MultiComparator lowKeySearchCmp;
-	protected MultiComparator highKeySearchCmp;
+    protected PermutingFrameTupleReference lowKey;
+    protected PermutingFrameTupleReference highKey;
+    protected boolean lowKeyInclusive;
+    protected boolean highKeyInclusive;
+    protected MultiComparator lowKeySearchCmp;
+    protected MultiComparator highKeySearchCmp;
 
     public BTreeSearchOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
-            int partition, IRecordDescriptorProvider recordDescProvider, int[] lowKeyFields,
-            int[] highKeyFields, boolean lowKeyInclusive, boolean highKeyInclusive) {
-        super(opDesc, ctx, partition, recordDescProvider);
+            IOperationCallbackProvider opCallbackProvider, int partition, IRecordDescriptorProvider recordDescProvider,
+            int[] lowKeyFields, int[] highKeyFields, boolean lowKeyInclusive, boolean highKeyInclusive) {
+        super(opDesc, ctx, opCallbackProvider, partition, recordDescProvider);
         this.lowKeyInclusive = lowKeyInclusive;
         this.highKeyInclusive = highKeyInclusive;
         this.recDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getOperatorId(), 0);
@@ -58,13 +59,11 @@
             highKey.reset(accessor, tupleIndex);
         }
     }
-    
+
     @Override
     protected ISearchPredicate createSearchPredicate() {
         lowKeySearchCmp = BTreeUtils.getSearchMultiComparator(treeIndex.getComparatorFactories(), lowKey);
-        highKeySearchCmp = BTreeUtils
-                .getSearchMultiComparator(treeIndex.getComparatorFactories(), highKey);
-        return new RangePredicate(lowKey, highKey, lowKeyInclusive, highKeyInclusive, lowKeySearchCmp,
-                highKeySearchCmp);
+        highKeySearchCmp = BTreeUtils.getSearchMultiComparator(treeIndex.getComparatorFactories(), highKey);
+        return new RangePredicate(lowKey, highKey, lowKeyInclusive, highKeyInclusive, lowKeySearchCmp, highKeySearchCmp);
     }
 }
\ No newline at end of file
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeUpdateSearchOperatorDescriptor.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeUpdateSearchOperatorDescriptor.java
index be967ef..5df52c1 100644
--- a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeUpdateSearchOperatorDescriptor.java
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeUpdateSearchOperatorDescriptor.java
@@ -23,6 +23,7 @@
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IOperationCallbackProvider;
 import edu.uci.ics.hyracks.storage.am.common.api.ITupleUpdaterFactory;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
@@ -40,16 +41,18 @@
             IFileSplitProvider fileSplitProvider, ITypeTraits[] typeTraits,
             IBinaryComparatorFactory[] comparatorFactories, int[] lowKeyFields, int[] highKeyFields,
             boolean lowKeyInclusive, boolean highKeyInclusive, IIndexDataflowHelperFactory dataflowHelperFactory,
-            ITupleUpdaterFactory tupleUpdaterFactory) {
+            IOperationCallbackProvider opCallbackProvider, ITupleUpdaterFactory tupleUpdaterFactory) {
         super(spec, recDesc, storageManager, indexRegistryProvider, fileSplitProvider, typeTraits, comparatorFactories,
-                lowKeyFields, highKeyFields, lowKeyInclusive, highKeyInclusive, dataflowHelperFactory);
+                lowKeyFields, highKeyFields, lowKeyInclusive, highKeyInclusive, dataflowHelperFactory,
+                opCallbackProvider);
         this.tupleUpdaterFactory = tupleUpdaterFactory;
     }
 
     @Override
     public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
             IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
-        return new BTreeUpdateSearchOperatorNodePushable(this, ctx, partition, recordDescProvider, lowKeyFields,
-                highKeyFields, lowKeyInclusive, highKeyInclusive, tupleUpdaterFactory.createTupleUpdater());
+        return new BTreeUpdateSearchOperatorNodePushable(this, ctx, opCallbackProvider, partition, recordDescProvider,
+                lowKeyFields, highKeyFields, lowKeyInclusive, highKeyInclusive,
+                tupleUpdaterFactory.createTupleUpdater());
     }
 }
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeUpdateSearchOperatorNodePushable.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeUpdateSearchOperatorNodePushable.java
index 5a70e87..903eca2 100644
--- a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeUpdateSearchOperatorNodePushable.java
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeUpdateSearchOperatorNodePushable.java
@@ -21,31 +21,30 @@
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
 import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrame;
 import edu.uci.ics.hyracks.storage.am.btree.impls.BTreeRangeSearchCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.IOperationCallbackProvider;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
 import edu.uci.ics.hyracks.storage.am.common.api.ITupleUpdater;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
 
 public class BTreeUpdateSearchOperatorNodePushable extends BTreeSearchOperatorNodePushable {
     private final ITupleUpdater tupleUpdater;
-    
-	public BTreeUpdateSearchOperatorNodePushable(
-			AbstractTreeIndexOperatorDescriptor opDesc,
-			IHyracksTaskContext ctx, int partition,
-			IRecordDescriptorProvider recordDescProvider,
-			int[] lowKeyFields, int[] highKeyFields, boolean lowKeyInclusive,
-			boolean highKeyInclusive, ITupleUpdater tupleUpdater) {
-		super(opDesc, ctx, partition, recordDescProvider, lowKeyFields,
-				highKeyFields, lowKeyInclusive, highKeyInclusive);
-		this.tupleUpdater = tupleUpdater;
-	}
 
-	@Override
-	protected ITreeIndexCursor createCursor() {
+    public BTreeUpdateSearchOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
+            IOperationCallbackProvider opCallbackProvider, int partition, IRecordDescriptorProvider recordDescProvider,
+            int[] lowKeyFields, int[] highKeyFields, boolean lowKeyInclusive, boolean highKeyInclusive,
+            ITupleUpdater tupleUpdater) {
+        super(opDesc, ctx, opCallbackProvider, partition, recordDescProvider, lowKeyFields, highKeyFields,
+                lowKeyInclusive, highKeyInclusive);
+        this.tupleUpdater = tupleUpdater;
+    }
+
+    @Override
+    protected ITreeIndexCursor createCursor() {
         return new BTreeRangeSearchCursor((IBTreeLeafFrame) cursorFrame, true);
     }
-	
-	@Override
-	protected void writeSearchResults() throws Exception {
+
+    @Override
+    protected void writeSearchResults() throws Exception {
         while (cursor.hasNext()) {
             tb.reset();
             cursor.next();
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/api/IOperationCallbackProvider.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/api/IOperationCallbackProvider.java
new file mode 100644
index 0000000..974ef1a
--- /dev/null
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/api/IOperationCallbackProvider.java
@@ -0,0 +1,7 @@
+package edu.uci.ics.hyracks.storage.am.common.api;
+
+import java.io.Serializable;
+
+public interface IOperationCallbackProvider extends Serializable {
+    public IOperationCallback getOperationCallback();
+}
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/AbstractTreeIndexOperatorDescriptor.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/AbstractTreeIndexOperatorDescriptor.java
index 2e2b67e..ab668d1 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/AbstractTreeIndexOperatorDescriptor.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/AbstractTreeIndexOperatorDescriptor.java
@@ -21,6 +21,7 @@
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IOperationCallbackProvider;
 import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
 
 public abstract class AbstractTreeIndexOperatorDescriptor extends
@@ -39,6 +40,8 @@
 	protected final ITypeTraits[] typeTraits;
 	protected final IIndexDataflowHelperFactory dataflowHelperFactory;
 
+    protected final IOperationCallbackProvider opCallbackProvider;
+
 	public AbstractTreeIndexOperatorDescriptor(JobSpecification spec,
 			int inputArity, int outputArity, RecordDescriptor recDesc,
 			IStorageManagerInterface storageManager,
@@ -46,7 +49,8 @@
 			IFileSplitProvider fileSplitProvider,
 			ITypeTraits[] typeTraits,
 			IBinaryComparatorFactory[] comparatorFactories,
-			IIndexDataflowHelperFactory dataflowHelperFactory) {
+			IIndexDataflowHelperFactory dataflowHelperFactory, 
+			IOperationCallbackProvider opCallbackProvider) {
 		super(spec, inputArity, outputArity);
 		this.fileSplitProvider = fileSplitProvider;
 		this.storageManager = storageManager;
@@ -54,6 +58,7 @@
 		this.typeTraits = typeTraits;
 		this.comparatorFactories = comparatorFactories;
 		this.dataflowHelperFactory = dataflowHelperFactory;
+        this.opCallbackProvider = opCallbackProvider;
 		if (outputArity > 0) {
 			recordDescriptors[0] = recDesc;
 		}
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IIndexDataflowHelperFactory.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IIndexDataflowHelperFactory.java
index a42cf5f..344929c 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IIndexDataflowHelperFactory.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IIndexDataflowHelperFactory.java
@@ -18,8 +18,10 @@
 import java.io.Serializable;
 
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.storage.am.common.api.IOperationCallbackProvider;
 
 public interface IIndexDataflowHelperFactory extends Serializable {
     public IndexDataflowHelper createIndexDataflowHelper(IIndexOperatorDescriptor opDesc,
-            final IHyracksTaskContext ctx, int partition, boolean createIfNotExists);
-}
+            final IHyracksTaskContext ctx, IOperationCallbackProvider opCallbackProvider, int partition,
+            boolean createIfNotExists);
+}
\ No newline at end of file
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java
index 99ebab7..062e7eb 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java
@@ -19,6 +19,7 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IOperationCallbackProvider;
 import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
 import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
 
@@ -26,22 +27,24 @@
     protected IIndex index;
     protected int indexFileId = -1;
     protected int partition;
-    
+
     protected final IIndexOperatorDescriptor opDesc;
     protected final IHyracksTaskContext ctx;
+    protected final IOperationCallbackProvider opCallbackProvider;
     protected final boolean createIfNotExists;
-    
+
     public IndexDataflowHelper(IIndexOperatorDescriptor opDesc, final IHyracksTaskContext ctx,
-            int partition, boolean createIfNotExists) {
+            IOperationCallbackProvider opCallbackProvider, int partition, boolean createIfNotExists) {
+        this.opCallbackProvider = opCallbackProvider;
         this.opDesc = opDesc;
-        this.ctx = ctx;        
+        this.ctx = ctx;
         this.partition = partition;
         this.createIfNotExists = createIfNotExists;
     }
 
     public void init() throws HyracksDataException {
         IBufferCache bufferCache = opDesc.getStorageManager().getBufferCache(ctx);
-        IFileMapProvider fileMapProvider = opDesc.getStorageManager().getFileMapProvider(ctx);        
+        IFileMapProvider fileMapProvider = opDesc.getStorageManager().getFileMapProvider(ctx);
 
         FileReference f = getFilereference();
         int fileId = -1;
@@ -75,7 +78,7 @@
             }
             index = createIndexInstance();
             if (createIfNotExists) {
-                index.create(indexFileId);                
+                index.create(indexFileId);
             }
             index.open(indexFileId);
             indexRegistry.register(indexFileId, index);
@@ -88,7 +91,7 @@
         IFileSplitProvider fileSplitProvider = opDesc.getFileSplitProvider();
         return fileSplitProvider.getFileSplits()[partition].getLocalFile();
     }
-    
+
     public void deinit() throws HyracksDataException {
         if (indexFileId != -1) {
             IBufferCache bufferCache = opDesc.getStorageManager().getBufferCache(ctx);
@@ -111,4 +114,8 @@
     public int getIndexFileId() {
         return indexFileId;
     }
+
+    public IOperationCallbackProvider getOpCallbackProvider() {
+        return opCallbackProvider;
+    }
 }
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexBulkLoadOperatorDescriptor.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexBulkLoadOperatorDescriptor.java
index 90295be..ca74160 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexBulkLoadOperatorDescriptor.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexBulkLoadOperatorDescriptor.java
@@ -22,6 +22,7 @@
 import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IOperationCallbackProvider;
 import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
 
 public class TreeIndexBulkLoadOperatorDescriptor extends AbstractTreeIndexOperatorDescriptor {
@@ -34,9 +35,10 @@
     public TreeIndexBulkLoadOperatorDescriptor(JobSpecification spec, IStorageManagerInterface storageManager,
             IIndexRegistryProvider<IIndex> indexRegistryProvider, IFileSplitProvider fileSplitProvider,
             ITypeTraits[] typeTraits, IBinaryComparatorFactory[] comparatorFactories, int[] fieldPermutation,
-            float fillFactor, IIndexDataflowHelperFactory dataflowHelperFactory) {
-        super(spec, 1, 0, null, storageManager, indexRegistryProvider, fileSplitProvider, 
-                typeTraits, comparatorFactories, dataflowHelperFactory);
+            float fillFactor, IIndexDataflowHelperFactory dataflowHelperFactory,
+            IOperationCallbackProvider opCallbackProvider) {
+        super(spec, 1, 0, null, storageManager, indexRegistryProvider, fileSplitProvider, typeTraits,
+                comparatorFactories, dataflowHelperFactory, opCallbackProvider);
         this.fieldPermutation = fieldPermutation;
         this.fillFactor = fillFactor;
     }
@@ -44,7 +46,7 @@
     @Override
     public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
             IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
-        return new TreeIndexBulkLoadOperatorNodePushable(this, ctx, partition, fieldPermutation, fillFactor,
-                recordDescProvider);
+        return new TreeIndexBulkLoadOperatorNodePushable(this, ctx, opCallbackProvider, partition, fieldPermutation,
+                fillFactor, recordDescProvider);
     }
 }
\ No newline at end of file
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexBulkLoadOperatorNodePushable.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexBulkLoadOperatorNodePushable.java
index fe3b10f..f2031ca 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexBulkLoadOperatorNodePushable.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexBulkLoadOperatorNodePushable.java
@@ -23,6 +23,7 @@
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexBulkLoadContext;
+import edu.uci.ics.hyracks.storage.am.common.api.IOperationCallbackProvider;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
 
 public class TreeIndexBulkLoadOperatorNodePushable extends AbstractUnaryInputSinkOperatorNodePushable {
@@ -37,9 +38,11 @@
     private PermutingFrameTupleReference tuple = new PermutingFrameTupleReference();
 
     public TreeIndexBulkLoadOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
-            int partition, int[] fieldPermutation, float fillFactor, IRecordDescriptorProvider recordDescProvider) {
+            IOperationCallbackProvider opCallbackProvider, int partition, int[] fieldPermutation, float fillFactor,
+            IRecordDescriptorProvider recordDescProvider) {
+
         treeIndexHelper = (TreeIndexDataflowHelper) opDesc.getIndexDataflowHelperFactory().createIndexDataflowHelper(
-                opDesc, ctx, partition, true);
+                opDesc, ctx, opCallbackProvider, partition, true);
         this.fillFactor = fillFactor;
         this.recordDescProvider = recordDescProvider;
         tuple.setFieldPermutation(fieldPermutation);
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexDataflowHelper.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexDataflowHelper.java
index 17c1827..c9e7e08 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexDataflowHelper.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexDataflowHelper.java
@@ -17,6 +17,7 @@
 
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.common.api.IOperationCallbackProvider;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
@@ -24,14 +25,15 @@
 
 public abstract class TreeIndexDataflowHelper extends IndexDataflowHelper {
     protected ITreeIndexOperatorDescriptor treeOpDesc;
-    public TreeIndexDataflowHelper(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx, int partition,
-            boolean createIfNotExists) {
-        super(opDesc, ctx, partition, createIfNotExists);
+
+    public TreeIndexDataflowHelper(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
+            IOperationCallbackProvider opCallbackProvider, int partition, boolean createIfNotExists) {
+        super(opDesc, ctx, opCallbackProvider, partition, createIfNotExists);
         this.treeOpDesc = (ITreeIndexOperatorDescriptor) opDesc;
     }
 
     public abstract ITreeIndex createIndexInstance() throws HyracksDataException;
-    
+
     public ITreeIndexCursor createDiskOrderScanCursor(ITreeIndexFrame leafFrame) throws HyracksDataException {
         return new TreeDiskOrderScanCursor(leafFrame);
     }
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorDescriptor.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorDescriptor.java
index ab16acb..4bf308e 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorDescriptor.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorDescriptor.java
@@ -22,6 +22,7 @@
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IOperationCallbackProvider;
 import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
 
 public class TreeIndexDiskOrderScanOperatorDescriptor extends AbstractTreeIndexOperatorDescriptor {
@@ -31,14 +32,14 @@
     public TreeIndexDiskOrderScanOperatorDescriptor(JobSpecification spec, RecordDescriptor recDesc,
             IStorageManagerInterface storageManager, IIndexRegistryProvider<IIndex> indexRegistryProvider,
             IFileSplitProvider fileSplitProvider, ITypeTraits[] typeTraits,
-            IIndexDataflowHelperFactory dataflowHelperFactory) {
+            IIndexDataflowHelperFactory dataflowHelperFactory, IOperationCallbackProvider opCallbackProvider) {
         super(spec, 0, 1, recDesc, storageManager, indexRegistryProvider, fileSplitProvider, typeTraits, null,
-                dataflowHelperFactory);
+                dataflowHelperFactory, opCallbackProvider);
     }
 
     @Override
     public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
             IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
-        return new TreeIndexDiskOrderScanOperatorNodePushable(this, ctx, partition);
+        return new TreeIndexDiskOrderScanOperatorNodePushable(this, ctx, opCallbackProvider, partition);
     }
 }
\ No newline at end of file
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorNodePushable.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorNodePushable.java
index d207314..78c0a82 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorNodePushable.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorNodePushable.java
@@ -24,6 +24,7 @@
 import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+import edu.uci.ics.hyracks.storage.am.common.api.IOperationCallbackProvider;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexAccessor;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
@@ -34,9 +35,9 @@
     private ITreeIndex treeIndex;
 
     public TreeIndexDiskOrderScanOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc,
-            IHyracksTaskContext ctx, int partition) {
+            IHyracksTaskContext ctx, IOperationCallbackProvider opCallbackProvider, int partition) {
         treeIndexHelper = (TreeIndexDataflowHelper) opDesc.getIndexDataflowHelperFactory().createIndexDataflowHelper(
-                opDesc, ctx, partition, false);
+                opDesc, ctx, opCallbackProvider, partition, false);
     }
 
     @Override
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexInsertUpdateDeleteOperatorDescriptor.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexInsertUpdateDeleteOperatorDescriptor.java
index ac90c1a..a06b37b 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexInsertUpdateDeleteOperatorDescriptor.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexInsertUpdateDeleteOperatorDescriptor.java
@@ -23,6 +23,7 @@
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IOperationCallbackProvider;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOp;
 import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
 
@@ -38,9 +39,9 @@
             IStorageManagerInterface storageManager, IIndexRegistryProvider<IIndex> indexRegistryProvider,
             IFileSplitProvider fileSplitProvider, ITypeTraits[] typeTraits,
             IBinaryComparatorFactory[] comparatorFactories, int[] fieldPermutation, IndexOp op,
-            IIndexDataflowHelperFactory dataflowHelperFactory) {
+            IIndexDataflowHelperFactory dataflowHelperFactory, IOperationCallbackProvider opCallbackProvider) {
         super(spec, 1, 1, recDesc, storageManager, indexRegistryProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, dataflowHelperFactory);
+                comparatorFactories, dataflowHelperFactory, opCallbackProvider);
         this.fieldPermutation = fieldPermutation;
         this.op = op;
     }
@@ -48,7 +49,7 @@
     @Override
     public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
             IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
-        return new TreeIndexInsertUpdateDeleteOperatorNodePushable(this, ctx, partition, fieldPermutation,
-                recordDescProvider, op);
+        return new TreeIndexInsertUpdateDeleteOperatorNodePushable(this, ctx, opCallbackProvider, partition,
+                fieldPermutation, recordDescProvider, op);
     }
 }
\ No newline at end of file
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexInsertUpdateDeleteOperatorNodePushable.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexInsertUpdateDeleteOperatorNodePushable.java
index ddef338..623487b 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexInsertUpdateDeleteOperatorNodePushable.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexInsertUpdateDeleteOperatorNodePushable.java
@@ -24,6 +24,7 @@
 import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.IOperationCallbackProvider;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOp;
 
@@ -37,12 +38,12 @@
     private IIndexAccessor indexAccessor;
 
     public TreeIndexInsertUpdateDeleteOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc,
-            IHyracksTaskContext ctx, int partition, int[] fieldPermutation,
-            IRecordDescriptorProvider recordDescProvider, IndexOp op) {
+            IHyracksTaskContext ctx, IOperationCallbackProvider opCallbackProvider, int partition,
+            int[] fieldPermutation, IRecordDescriptorProvider recordDescProvider, IndexOp op) {
         // Only create the if insert operation is an insert.
         boolean createIfNotExists = (op == IndexOp.INSERT);
         treeIndexHelper = (TreeIndexDataflowHelper) opDesc.getIndexDataflowHelperFactory().createIndexDataflowHelper(
-                opDesc, ctx, partition, createIfNotExists);
+                opDesc, ctx, opCallbackProvider, partition, createIfNotExists);
         this.recordDescProvider = recordDescProvider;
         this.op = op;
         tuple.setFieldPermutation(fieldPermutation);
@@ -83,6 +84,10 @@
                         indexAccessor.update(tuple);
                         break;
                     }
+                    case UPSERT: {
+                        indexAccessor.upsert(tuple);
+                        break;
+                    }
                     case DELETE: {
                         indexAccessor.delete(tuple);
                         break;
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexSearchOperatorNodePushable.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexSearchOperatorNodePushable.java
index 399643e..8557332 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexSearchOperatorNodePushable.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexSearchOperatorNodePushable.java
@@ -29,42 +29,43 @@
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.IOperationCallbackProvider;
 import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
 
 public abstract class TreeIndexSearchOperatorNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
-	protected TreeIndexDataflowHelper treeIndexHelper;
-	protected FrameTupleAccessor accessor;
+    protected TreeIndexDataflowHelper treeIndexHelper;
+    protected FrameTupleAccessor accessor;
 
-	protected ByteBuffer writeBuffer;
-	protected FrameTupleAppender appender;
-	protected ArrayTupleBuilder tb;
-	protected DataOutput dos;
+    protected ByteBuffer writeBuffer;
+    protected FrameTupleAppender appender;
+    protected ArrayTupleBuilder tb;
+    protected DataOutput dos;
 
-	protected ITreeIndex treeIndex;
-	protected ISearchPredicate searchPred;
-	protected IIndexCursor cursor;
-	protected ITreeIndexFrame cursorFrame;
-	protected IIndexAccessor indexAccessor;
+    protected ITreeIndex treeIndex;
+    protected ISearchPredicate searchPred;
+    protected IIndexCursor cursor;
+    protected ITreeIndexFrame cursorFrame;
+    protected IIndexAccessor indexAccessor;
 
-	protected RecordDescriptor recDesc;
+    protected RecordDescriptor recDesc;
 
     public TreeIndexSearchOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
-            int partition, IRecordDescriptorProvider recordDescProvider) {
+            IOperationCallbackProvider opCallbackProvider, int partition, IRecordDescriptorProvider recordDescProvider) {
         treeIndexHelper = (TreeIndexDataflowHelper) opDesc.getIndexDataflowHelperFactory().createIndexDataflowHelper(
-                opDesc, ctx, partition, false);
+                opDesc, ctx, opCallbackProvider, partition, false);
         this.recDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getOperatorId(), 0);
     }
-    
+
     protected abstract ISearchPredicate createSearchPredicate();
-    
+
     protected abstract void resetSearchPredicate(int tupleIndex);
-    
+
     protected IIndexCursor createCursor() {
         return indexAccessor.createSearchCursor();
     }
-    
+
     @Override
     public void open() throws HyracksDataException {
         accessor = new FrameTupleAccessor(treeIndexHelper.getHyracksTaskContext().getFrameSize(), recDesc);
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexStatsOperatorDescriptor.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexStatsOperatorDescriptor.java
index c227e17..b7beec0 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexStatsOperatorDescriptor.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexStatsOperatorDescriptor.java
@@ -24,6 +24,7 @@
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IOperationCallbackProvider;
 import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
 
 public class TreeIndexStatsOperatorDescriptor extends AbstractTreeIndexOperatorDescriptor {
@@ -35,14 +36,14 @@
     public TreeIndexStatsOperatorDescriptor(JobSpecification spec, IStorageManagerInterface storageManager,
             IIndexRegistryProvider<IIndex> indexRegistryProvider, IFileSplitProvider fileSplitProvider,
             ITypeTraits[] typeTraits, IBinaryComparatorFactory[] comparatorFactories,
-            IIndexDataflowHelperFactory dataflowHelperFactory) {
+            IIndexDataflowHelperFactory dataflowHelperFactory, IOperationCallbackProvider opCallbackProvider) {
         super(spec, 0, 1, recDesc, storageManager, indexRegistryProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, dataflowHelperFactory);
+                comparatorFactories, dataflowHelperFactory, opCallbackProvider);
     }
 
     @Override
     public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
             IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
-        return new TreeIndexStatsOperatorNodePushable(this, ctx, partition);
+        return new TreeIndexStatsOperatorNodePushable(this, ctx, opCallbackProvider, partition);
     }
 }
\ No newline at end of file
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexStatsOperatorNodePushable.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexStatsOperatorNodePushable.java
index ff007af..2e8e300 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexStatsOperatorNodePushable.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexStatsOperatorNodePushable.java
@@ -25,6 +25,7 @@
 import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+import edu.uci.ics.hyracks.storage.am.common.api.IOperationCallbackProvider;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
 import edu.uci.ics.hyracks.storage.am.common.util.TreeIndexStats;
 import edu.uci.ics.hyracks.storage.am.common.util.TreeIndexStatsGatherer;
@@ -36,9 +37,9 @@
     private TreeIndexStatsGatherer statsGatherer;
 
     public TreeIndexStatsOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
-            int partition) {
+            IOperationCallbackProvider opCallbackProvider, int partition) {
         treeIndexHelper = (TreeIndexDataflowHelper) opDesc.getIndexDataflowHelperFactory().createIndexDataflowHelper(
-                opDesc, ctx, partition, false);
+                opDesc, ctx, opCallbackProvider, partition, false);
         this.ctx = ctx;
     }
 
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/impls/NoOpOperationCallback.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/impls/NoOpOperationCallback.java
index cd7ca70..828dd81 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/impls/NoOpOperationCallback.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/impls/NoOpOperationCallback.java
@@ -21,18 +21,21 @@
 /**
  * Dummy operation callback that simply does nothing. Mainly, intended to be
  * used in non-transaction access method testing.
- * 
  */
 public class NoOpOperationCallback implements IOperationCallback {
 
     public static IOperationCallback INSTANCE = new NoOpOperationCallback();
     
+    private NoOpOperationCallback() {
+    }
+    
     @Override
     public void pre(ITupleReference tuple) {
-        
+        // Do nothing.
     }
 
     @Override
     public void post(ITupleReference tuple) {
+        // Do nothing.
     }
 }
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/impls/NoOpOperationCallbackProvider.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/impls/NoOpOperationCallbackProvider.java
new file mode 100644
index 0000000..55dfb74
--- /dev/null
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/impls/NoOpOperationCallbackProvider.java
@@ -0,0 +1,19 @@
+package edu.uci.ics.hyracks.storage.am.common.impls;
+
+import edu.uci.ics.hyracks.storage.am.common.api.IOperationCallback;
+import edu.uci.ics.hyracks.storage.am.common.api.IOperationCallbackProvider;
+
+/**
+ * Dummy NoOp callback provider used primarily for testing. Always returns the 
+ * {@link NoOpOperationCallback} instance. 
+ *
+ * Implemented as an enum to preserve singleton model while being serializable
+ */
+public enum NoOpOperationCallbackProvider implements IOperationCallbackProvider {
+    INSTANCE;
+
+    @Override
+    public IOperationCallback getOperationCallback() {
+        return NoOpOperationCallback.INSTANCE;
+    }
+}
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/ophelpers/IndexOp.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/ophelpers/IndexOp.java
index 4b5a492..77ad7ff 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/ophelpers/IndexOp.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/ophelpers/IndexOp.java
@@ -16,5 +16,5 @@
 package edu.uci.ics.hyracks.storage.am.common.ophelpers;
 
 public enum IndexOp {
-	INSERT, DELETE, UPDATE, UPSERT, SEARCH, DISKORDERSCAN
+	INSERT, DELETE, UPDATE, UPSERT, SEARCH, DISKORDERSCAN, PHYSICALDELETE
 }
diff --git a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/InvertedIndexBulkLoadOperatorNodePushable.java b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/InvertedIndexBulkLoadOperatorNodePushable.java
index 5106d52..c4d6747 100644
--- a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/InvertedIndexBulkLoadOperatorNodePushable.java
+++ b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/InvertedIndexBulkLoadOperatorNodePushable.java
@@ -25,6 +25,7 @@
 import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.PermutingFrameTupleReference;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
 import edu.uci.ics.hyracks.storage.am.invertedindex.impls.InvertedIndex;
 import edu.uci.ics.hyracks.storage.am.invertedindex.impls.InvertedIndex.InvertedIndexBulkLoadContext;
 
@@ -42,7 +43,7 @@
     public InvertedIndexBulkLoadOperatorNodePushable(AbstractInvertedIndexOperatorDescriptor opDesc,
             IHyracksTaskContext ctx, int partition, int[] fieldPermutation, IRecordDescriptorProvider recordDescProvider) {
         btreeDataflowHelper = (TreeIndexDataflowHelper) opDesc.getIndexDataflowHelperFactory()
-                .createIndexDataflowHelper(opDesc, ctx, partition, true);
+                .createIndexDataflowHelper(opDesc, ctx, NoOpOperationCallbackProvider.INSTANCE, partition, true);
         invIndexDataflowHelper = new InvertedIndexDataflowHelper(btreeDataflowHelper, opDesc, ctx, partition, true);
         this.recordDescProvider = recordDescProvider;
         tuple.setFieldPermutation(fieldPermutation);
diff --git a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/InvertedIndexDataflowHelper.java b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/InvertedIndexDataflowHelper.java
index 1bba574..ed9a6bd 100644
--- a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/InvertedIndexDataflowHelper.java
+++ b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/InvertedIndexDataflowHelper.java
@@ -23,6 +23,7 @@
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexDataflowHelper;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
 import edu.uci.ics.hyracks.storage.am.invertedindex.api.IInvertedIndexOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.invertedindex.api.IInvertedListBuilder;
 import edu.uci.ics.hyracks.storage.am.invertedindex.impls.FixedSizeElementInvertedListBuilder;
@@ -33,7 +34,7 @@
 
     public InvertedIndexDataflowHelper(TreeIndexDataflowHelper btreeDataflowHelper, IIndexOperatorDescriptor opDesc,
             IHyracksTaskContext ctx, int partition, boolean createIfNotExists) {
-        super(opDesc, ctx, partition, createIfNotExists);
+        super(opDesc, ctx, NoOpOperationCallbackProvider.INSTANCE, partition, createIfNotExists);
         this.btreeDataflowHelper = btreeDataflowHelper;
     }
 
diff --git a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/InvertedIndexSearchOperatorNodePushable.java b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/InvertedIndexSearchOperatorNodePushable.java
index 534bb65..8128e0d 100644
--- a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/InvertedIndexSearchOperatorNodePushable.java
+++ b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/InvertedIndexSearchOperatorNodePushable.java
@@ -32,6 +32,7 @@
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
 import edu.uci.ics.hyracks.storage.am.invertedindex.api.IInvertedIndexSearchModifier;
 import edu.uci.ics.hyracks.storage.am.invertedindex.impls.InvertedIndex;
 import edu.uci.ics.hyracks.storage.am.invertedindex.impls.InvertedIndexSearchPredicate;
@@ -45,11 +46,11 @@
     private FrameTupleReference tuple;
     private IRecordDescriptorProvider recordDescProvider;
     private InvertedIndex invIndex;
-    
+
     private final InvertedIndexSearchPredicate searchPred;
     private IIndexAccessor indexAccessor;
-    private IIndexCursor resultCursor;        
-    
+    private IIndexCursor resultCursor;
+
     private ByteBuffer writeBuffer;
     private FrameTupleAppender appender;
     private ArrayTupleBuilder tb;
@@ -62,7 +63,7 @@
             IRecordDescriptorProvider recordDescProvider) {
         this.opDesc = opDesc;
         btreeDataflowHelper = (TreeIndexDataflowHelper) opDesc.getIndexDataflowHelperFactory()
-                .createIndexDataflowHelper(opDesc, ctx, partition, false);
+                .createIndexDataflowHelper(opDesc, ctx, NoOpOperationCallbackProvider.INSTANCE, partition, false);
         invIndexDataflowHelper = new InvertedIndexDataflowHelper(btreeDataflowHelper, opDesc, ctx, partition, false);
         this.queryField = queryField;
         this.searchPred = new InvertedIndexSearchPredicate(searchModifier);
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDataflowHelper.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDataflowHelper.java
index 0ad8aa1..40195f1 100644
--- a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDataflowHelper.java
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDataflowHelper.java
@@ -20,12 +20,12 @@
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.control.nc.io.IOManager;
 import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IOperationCallbackProvider;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexMetaDataFrameFactory;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
 import edu.uci.ics.hyracks.storage.am.common.frames.LIFOMetaDataFrameFactory;
-import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
 import edu.uci.ics.hyracks.storage.am.lsm.btree.util.LSMBTreeUtils;
 import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryBufferCache;
 import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryFreePageManager;
@@ -33,25 +33,26 @@
 
 public class LSMBTreeDataflowHelper extends TreeIndexDataflowHelper {
     private static int DEFAULT_MEM_PAGE_SIZE = 32768;
-    private static int DEFAULT_MEM_NUM_PAGES = 1000;    
-    
+    private static int DEFAULT_MEM_NUM_PAGES = 1000;
+
     private final int memPageSize;
-    private final int memNumPages;    
-    
-    public LSMBTreeDataflowHelper(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx, int partition,
-            boolean createIfNotExists) {
-        super(opDesc, ctx, partition, createIfNotExists);
+    private final int memNumPages;
+
+    public LSMBTreeDataflowHelper(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
+            IOperationCallbackProvider opCallbackProvider, int partition, boolean createIfNotExists) {
+        super(opDesc, ctx, opCallbackProvider, partition, createIfNotExists);
         memPageSize = DEFAULT_MEM_PAGE_SIZE;
-        memNumPages = DEFAULT_MEM_NUM_PAGES;        
+        memNumPages = DEFAULT_MEM_NUM_PAGES;
     }
-    
-    public LSMBTreeDataflowHelper(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx, int partition,
-            boolean createIfNotExists, int memPageSize, int memNumPages) {
-        super(opDesc, ctx, partition, createIfNotExists);
+
+    public LSMBTreeDataflowHelper(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
+            IOperationCallbackProvider opCallbackProvider, int partition, boolean createIfNotExists, int memPageSize,
+            int memNumPages) {
+        super(opDesc, ctx, opCallbackProvider, partition, createIfNotExists);
         this.memPageSize = memPageSize;
         this.memNumPages = memNumPages;
     }
-    
+
     @Override
     public ITreeIndex createIndexInstance() throws HyracksDataException {
         ITreeIndexMetaDataFrameFactory metaDataFrameFactory = new LIFOMetaDataFrameFactory();
@@ -63,10 +64,9 @@
             file.delete();
         }
         InMemoryFreePageManager memFreePageManager = new InMemoryFreePageManager(memNumPages, metaDataFrameFactory);
-        // TODO: Figure out where to get the proper operation callback from.
-        return LSMBTreeUtils.createLSMTree(memBufferCache, NoOpOperationCallback.INSTANCE, memFreePageManager, (IOManager) ctx.getIOManager(), file
-                .getFile().getPath(), opDesc.getStorageManager().getBufferCache(ctx), opDesc
-                .getStorageManager().getFileMapProvider(ctx), treeOpDesc.getTreeIndexTypeTraits(), treeOpDesc
-                .getTreeIndexComparatorFactories());
+        return LSMBTreeUtils.createLSMTree(memBufferCache, opCallbackProvider.getOperationCallback(),
+                memFreePageManager, (IOManager) ctx.getIOManager(), file.getFile().getPath(), opDesc
+                        .getStorageManager().getBufferCache(ctx), opDesc.getStorageManager().getFileMapProvider(ctx),
+                treeOpDesc.getTreeIndexTypeTraits(), treeOpDesc.getTreeIndexComparatorFactories());
     }
 }
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDataflowHelperFactory.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDataflowHelperFactory.java
index da00694..98cfeed 100644
--- a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDataflowHelperFactory.java
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDataflowHelperFactory.java
@@ -16,6 +16,7 @@
 package edu.uci.ics.hyracks.storage.am.lsm.btree.dataflow;
 
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.storage.am.common.api.IOperationCallbackProvider;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexDataflowHelper;
@@ -26,7 +27,7 @@
 
     @Override
     public IndexDataflowHelper createIndexDataflowHelper(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
-            int partition, boolean createIfNotExists) {
-        return new LSMBTreeDataflowHelper(opDesc, ctx, partition, createIfNotExists);
+            IOperationCallbackProvider opCallbackProvider, int partition, boolean createIfNotExists) {
+        return new LSMBTreeDataflowHelper(opDesc, ctx, opCallbackProvider, partition, createIfNotExists);
     }
 }
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
index dee8072..454db50 100644
--- a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
@@ -26,9 +26,6 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
-import edu.uci.ics.hyracks.dataflow.common.util.TupleUtils;
-import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeDuplicateKeyException;
-import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeNonExistentKeyException;
 import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
 import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
 import edu.uci.ics.hyracks.storage.am.common.api.IFreePageManager;
@@ -60,9 +57,9 @@
 
 public class LSMBTree implements ILSMIndex, ITreeIndex {
     protected final Logger LOGGER = Logger.getLogger(LSMBTree.class.getName());
-    
+
     private final LSMHarness lsmHarness;
-    
+
     // In-memory components.   
     private final BTree memBTree;
     private final InMemoryFreePageManager memFreePageManager;
@@ -74,24 +71,25 @@
     // For creating BTree's used in bulk load. Different from diskBTreeFactory
     // because it should have a different tuple writer in it's leaf frames.
     private final BTreeFactory bulkLoadBTreeFactory;
-    private final IBufferCache diskBufferCache;    
+    private final IBufferCache diskBufferCache;
     private final IFileMapProvider diskFileMapProvider;
     // List of BTree instances. Using Object for better sharing via ILSMTree + LSMHarness.
     private LinkedList<Object> diskBTrees = new LinkedList<Object>();
     // Helps to guarantees physical consistency of LSM components.
     private final ILSMComponentFinalizer componentFinalizer;
-    
+
     // Common for in-memory and on-disk components.
     private final ITreeIndexFrameFactory insertLeafFrameFactory;
     private final ITreeIndexFrameFactory deleteLeafFrameFactory;
     private final IBinaryComparatorFactory[] cmpFactories;
-    
+
     private boolean isOpen = false;
-    
+
     public LSMBTree(IBufferCache memBufferCache, IOperationCallback memOpCallback, InMemoryFreePageManager memFreePageManager,
             ITreeIndexFrameFactory interiorFrameFactory, ITreeIndexFrameFactory insertLeafFrameFactory,
-            ITreeIndexFrameFactory deleteLeafFrameFactory, ILSMFileManager fileNameManager, BTreeFactory diskBTreeFactory,
-            BTreeFactory bulkLoadBTreeFactory, IFileMapProvider diskFileMapProvider, int fieldCount, IBinaryComparatorFactory[] cmpFactories) {
+            ITreeIndexFrameFactory deleteLeafFrameFactory, ILSMFileManager fileNameManager,
+            BTreeFactory diskBTreeFactory, BTreeFactory bulkLoadBTreeFactory, IFileMapProvider diskFileMapProvider,
+            int fieldCount, IBinaryComparatorFactory[] cmpFactories) {
         memBTree = new BTree(memBufferCache, memOpCallback, fieldCount, cmpFactories, memFreePageManager, interiorFrameFactory,
                 insertLeafFrameFactory);
         this.memFreePageManager = memFreePageManager;
@@ -113,7 +111,7 @@
         memBTree.create(indexFileId);
         fileManager.createDirs();
     }
-    
+
     /**
      * Opens LSMBTree, cleaning up invalid files from base dir, and registering
      * all valid files as on-disk BTrees.
@@ -124,14 +122,14 @@
      */
     @Override
     public void open(int indexFileId) throws HyracksDataException {
-        synchronized(this) {
+        synchronized (this) {
             if (isOpen) {
                 return;
             }
             memBTree.open(indexFileId);
             BTree dummyBTree = diskBTreeFactory.createIndexInstance();
             List<Object> validFileNames = fileManager.cleanupAndGetValidFiles(dummyBTree, componentFinalizer);
-            for (Object o : validFileNames) {     
+            for (Object o : validFileNames) {
                 String fileName = (String) o;
                 FileReference fileRef = new FileReference(new File(fileName));
                 BTree btree = createDiskBTree(diskBTreeFactory, fileRef, false);
@@ -143,12 +141,12 @@
 
     @Override
     public void close() throws HyracksDataException {
-        synchronized(this) {
+        synchronized (this) {
             if (!isOpen) {
                 return;
             }
             for (Object o : diskBTrees) {
-                BTree btree = (BTree) o;            
+                BTree btree = (BTree) o;
                 diskBufferCache.closeFile(btree.getFileId());
                 diskBufferCache.deleteFile(btree.getFileId(), false);
                 btree.close();
@@ -160,106 +158,17 @@
     }
 
     @Override
-    public boolean insertUpdateOrDelete(ITupleReference tuple, IIndexOpContext ictx) throws HyracksDataException, TreeIndexException {        
+    public boolean insertUpdateOrDelete(ITupleReference tuple, IIndexOpContext ictx) throws HyracksDataException,
+            TreeIndexException {
         LSMBTreeOpContext ctx = (LSMBTreeOpContext) ictx;
-        // TODO: This will become much simpler once the BTree supports a true upsert operation.
-        try {
-            ctx.memBTreeAccessor.insert(tuple);
-        } catch (BTreeDuplicateKeyException e) {
-            // Notice that a flush must wait for the current operation to
-            // finish (threadRefCount must reach zero).
-            // TODO: The methods below are very inefficient, we'd rather like
-            // to flip the antimatter bit one single BTree traversal.
-            if (ctx.getIndexOp() == IndexOp.DELETE) {
-                return deleteExistingKey(tuple, ctx);
-            } else {
-                return insertOrUpdateExistingKey(tuple, ctx);
-            }
-        }
-        return true;
-    }
-    
-	private boolean deleteExistingKey(ITupleReference tuple, LSMBTreeOpContext ctx) throws HyracksDataException, TreeIndexException {
-        // We assume that tuple given by the user for deletion only contains the
-        // key fields, but not any non-key fields.
-        // Therefore, to set the delete bit in the tuple that already exist in
-        // the BTree, we must retrieve the original tuple first. This is to
-        // ensure that we have the proper value field.
-        if (cmpFactories.length != memBTree.getFieldCount()) {
-            ctx.reset(IndexOp.SEARCH);
-            RangePredicate rangePredicate = new RangePredicate(tuple, tuple, true, true, ctx.cmp, ctx.cmp);
-            ITreeIndexCursor cursor = ctx.memBTreeAccessor.createSearchCursor();
-            ctx.memBTreeAccessor.search(cursor, rangePredicate);
-            ITupleReference tupleCopy = null;
-            try {
-                if (cursor.hasNext()) {
-                    cursor.next();
-                    tupleCopy = TupleUtils.copyTuple(cursor.getTuple());
-                }
-            } finally {
-                cursor.close();
-            }
-            // This means the tuple we are looking for must have been truly deleted by another thread.
-            // Simply restart the original operation to insert the antimatter tuple. 
-            // There is a remote chance of livelocks due to this behavior.
-            if (tupleCopy == null) {
-                ctx.reset(IndexOp.DELETE);
-                return false;
-            }
-            return memBTreeUpdate(tupleCopy, ctx);
-        } else {
-            // Since the existing tuple could be a matter tuple, we must delete it and re-insert.
-            return memBTreeDeleteAndReinsert(tuple, ctx);
-        }            
-	}
-	
-    private boolean insertOrUpdateExistingKey(ITupleReference tuple, LSMBTreeOpContext ctx) throws HyracksDataException,
-            TreeIndexException {        
-        // If all fields are keys, and the key we are trying to insert/update
-        // already exists, then we are already done.
-        // Otherwise, we must update the non-key fields.        
-        if (cmpFactories.length != memBTree.getFieldCount()) {
-            return memBTreeUpdate(tuple, ctx);
-        } else {
-            // Since the existing tuple could be an antimatter tuple, we must delete it and re-insert.
-            return memBTreeDeleteAndReinsert(tuple, ctx);
-        }
-    }
-    
-    private boolean memBTreeDeleteAndReinsert(ITupleReference tuple, LSMBTreeOpContext ctx) throws HyracksDataException,
-            TreeIndexException {
-        // All fields are key fields, therefore a true BTree update is not
-        // allowed.
-        // In order to set/unset the antimatter bit, we
-        // must truly delete the existing tuple from the BTree, and then
-        // re-insert it (with the antimatter bit set/unset).
-        // Since the tuple given by the user already has all fields, we
-        // don't need to retrieve the already existing tuple.
-        IndexOp originalOp = ctx.getIndexOp();
-        try {
+        
+        if(ctx.getIndexOp() == IndexOp.PHYSICALDELETE) {
             ctx.memBTreeAccessor.delete(tuple);
-        } catch (BTreeNonExistentKeyException e) {
-            // Tuple has been deleted in the meantime. We will restart
-            // our operation anyway.
+            return true;
         }
-        // Restart performOp to insert the tuple.
-        ctx.reset(originalOp);
-        return false;
-    }
-    
-    private boolean memBTreeUpdate(ITupleReference tuple, LSMBTreeOpContext ctx) throws HyracksDataException,
-            TreeIndexException {
-        IndexOp originalOp = ctx.getIndexOp();
-        try {
-            ctx.reset(IndexOp.UPDATE);
-            ctx.memBTreeAccessor.update(tuple);
-        } catch (BTreeNonExistentKeyException e) {
-            // It is possible that the key has truly been deleted.
-            // Simply restart the operation. There is a remote chance of
-            // livelocks due to this behavior.
-            ctx.reset(originalOp);
-            return false;
-        }
+        
+        ctx.memBTreeAccessor.upsert(tuple);
+
         return true;
     }
 
@@ -289,36 +198,38 @@
     public void addFlushedComponent(Object index) {
         diskBTrees.addFirst(index);
     }
-    
+
     @Override
     public void resetInMemoryComponent() throws HyracksDataException {
         memFreePageManager.reset();
         memBTree.create(memBTree.getFileId());
     }
-    
+
     private BTree createBulkLoadTarget() throws HyracksDataException {
         String relFlushFileName = (String) fileManager.getRelFlushFileName();
         FileReference fileRef = fileManager.createFlushFile(relFlushFileName);
         return createDiskBTree(bulkLoadBTreeFactory, fileRef, true);
     }
-    
+
     private BTree createFlushTarget() throws HyracksDataException {
         String relFlushFileName = (String) fileManager.getRelFlushFileName();
         FileReference fileRef = fileManager.createFlushFile(relFlushFileName);
         return createDiskBTree(diskBTreeFactory, fileRef, true);
     }
-    
+
     private BTree createMergeTarget(List<Object> mergingDiskBTrees) throws HyracksDataException {
         BTree firstBTree = (BTree) mergingDiskBTrees.get(0);
         BTree lastBTree = (BTree) mergingDiskBTrees.get(mergingDiskBTrees.size() - 1);
         FileReference firstFile = diskFileMapProvider.lookupFileName(firstBTree.getFileId());
         FileReference lastFile = diskFileMapProvider.lookupFileName(lastBTree.getFileId());
-        String relMergeFileName = (String) fileManager.getRelMergeFileName(firstFile.getFile().getName(), lastFile.getFile().getName());
+        String relMergeFileName = (String) fileManager.getRelMergeFileName(firstFile.getFile().getName(), lastFile
+                .getFile().getName());
         FileReference fileRef = fileManager.createMergeFile(relMergeFileName);
         return createDiskBTree(diskBTreeFactory, fileRef, true);
     }
-    
-    private BTree createDiskBTree(BTreeFactory factory, FileReference fileRef, boolean createBTree) throws HyracksDataException {
+
+    private BTree createDiskBTree(BTreeFactory factory, FileReference fileRef, boolean createBTree)
+            throws HyracksDataException {
         // File will be deleted during cleanup of merge().
         diskBufferCache.createFile(fileRef);
         int diskBTreeFileId = diskFileMapProvider.lookupFileId(fileRef);
@@ -333,16 +244,17 @@
         diskBTree.open(diskBTreeFileId);
         return diskBTree;
     }
-    
-    public void search(IIndexCursor cursor, List<Object> diskComponents, ISearchPredicate pred, IIndexOpContext ictx, boolean includeMemComponent, AtomicInteger searcherRefCount) throws HyracksDataException, IndexException {
+
+    public void search(IIndexCursor cursor, List<Object> diskComponents, ISearchPredicate pred, IIndexOpContext ictx,
+            boolean includeMemComponent, AtomicInteger searcherRefCount) throws HyracksDataException, IndexException {
         LSMBTreeOpContext ctx = (LSMBTreeOpContext) ictx;
         LSMBTreeRangeSearchCursor lsmTreeCursor = (LSMBTreeRangeSearchCursor) cursor;
         int numDiskBTrees = diskComponents.size();
-        int numBTrees = (includeMemComponent) ? numDiskBTrees + 1 : numDiskBTrees;                
-        LSMBTreeCursorInitialState initialState = new LSMBTreeCursorInitialState(numBTrees,
-                insertLeafFrameFactory, ctx.cmp, includeMemComponent, searcherRefCount, lsmHarness);
+        int numBTrees = (includeMemComponent) ? numDiskBTrees + 1 : numDiskBTrees;
+        LSMBTreeCursorInitialState initialState = new LSMBTreeCursorInitialState(numBTrees, insertLeafFrameFactory,
+                ctx.cmp, includeMemComponent, searcherRefCount, lsmHarness);
         lsmTreeCursor.open(initialState, pred);
-        
+
         int cursorIx;
         if (includeMemComponent) {
             // Open cursor of in-memory BTree at index 0.
@@ -352,12 +264,12 @@
         } else {
             cursorIx = 0;
         }
-        
+
         // Open cursors of on-disk BTrees.
         ITreeIndexAccessor[] diskBTreeAccessors = new ITreeIndexAccessor[numDiskBTrees];
         int diskBTreeIx = 0;
         ListIterator<Object> diskBTreesIter = diskComponents.listIterator();
-        while(diskBTreesIter.hasNext()) {
+        while (diskBTreesIter.hasNext()) {
             BTree diskBTree = (BTree) diskBTreesIter.next();
             diskBTreeAccessors[diskBTreeIx] = diskBTree.createAccessor();
             diskBTreeAccessors[diskBTreeIx].search(lsmTreeCursor.getCursor(cursorIx), pred);
@@ -366,8 +278,8 @@
         }
         lsmTreeCursor.initPriorityQueue();
     }
-    
-    public ITreeIndex merge(List<Object> mergedComponents) throws HyracksDataException, IndexException  {
+
+    public ITreeIndex merge(List<Object> mergedComponents) throws HyracksDataException, IndexException {
         LSMBTreeOpContext ctx = createOpContext();
         ITreeIndexCursor cursor = new LSMBTreeRangeSearchCursor();
         RangePredicate rangePred = new RangePredicate(null, null, true, true, null, null);
@@ -376,13 +288,13 @@
         // merged now, so we can clean them up after the merge has completed.
         List<Object> mergingDiskBTrees = lsmHarness.search(cursor, (RangePredicate) rangePred, ctx, false);
         mergedComponents.addAll(mergingDiskBTrees);
-        
+
         // Nothing to merge.
         if (mergedComponents.size() <= 1) {
             cursor.close();
             return null;
         }
-        
+
         // Bulk load the tuples from all on-disk BTrees into the new BTree.
         BTree mergedBTree = createMergeTarget(mergingDiskBTrees);
         IIndexBulkLoadContext bulkLoadCtx = mergedBTree.beginBulkLoad(1.0f);
@@ -395,16 +307,16 @@
         } finally {
             cursor.close();
         }
-        mergedBTree.endBulkLoad(bulkLoadCtx);       
+        mergedBTree.endBulkLoad(bulkLoadCtx);
         return mergedBTree;
     }
-    
+
     @Override
     public void addMergedComponent(Object newComponent, List<Object> mergedComponents) {
         diskBTrees.removeAll(mergedComponents);
         diskBTrees.addLast(newComponent);
     }
-    
+
     @Override
     public void cleanUpAfterMerge(List<Object> mergedComponents) throws HyracksDataException {
         for (Object o : mergedComponents) {
@@ -416,7 +328,7 @@
             fileRef.getFile().delete();
         }
     }
-    
+
     @Override
     public InMemoryFreePageManager getInMemoryFreePageManager() {
         return (InMemoryFreePageManager) memBTree.getFreePageManager();
@@ -426,11 +338,11 @@
     public List<Object> getDiskComponents() {
         return diskBTrees;
     }
-    
+
     public class LSMTreeBulkLoadContext implements IIndexBulkLoadContext {
         private final BTree btree;
         private IIndexBulkLoadContext bulkLoadCtx;
-        
+
         public LSMTreeBulkLoadContext(BTree btree) {
             this.btree = btree;
         }
@@ -438,20 +350,20 @@
         public void beginBulkLoad(float fillFactor) throws HyracksDataException, TreeIndexException {
             bulkLoadCtx = btree.beginBulkLoad(fillFactor);
         }
-    
+
         public BTree getBTree() {
             return btree;
         }
-        
+
         public IIndexBulkLoadContext getBulkLoadCtx() {
             return bulkLoadCtx;
         }
     }
-    
+
     @Override
     public IIndexBulkLoadContext beginBulkLoad(float fillFactor) throws TreeIndexException, HyracksDataException {
         BTree diskBTree = createBulkLoadTarget();
-        LSMTreeBulkLoadContext bulkLoadCtx = new LSMTreeBulkLoadContext(diskBTree);        
+        LSMTreeBulkLoadContext bulkLoadCtx = new LSMTreeBulkLoadContext(diskBTree);
         bulkLoadCtx.beginBulkLoad(fillFactor);
         return bulkLoadCtx;
     }
@@ -503,11 +415,11 @@
     public int getFileId() {
         return memBTree.getFileId();
     }
-    
+
     public IBinaryComparatorFactory[] getComparatorFactories() {
         return cmpFactories;
     }
-    
+
     public LSMBTreeOpContext createOpContext() {
         return new LSMBTreeOpContext(memBTree, insertLeafFrameFactory, deleteLeafFrameFactory);
     }
@@ -516,7 +428,7 @@
     public IIndexAccessor createAccessor() {
         return new LSMBTreeIndexAccessor(lsmHarness, createOpContext());
     }
-    
+
     public class LSMBTreeIndexAccessor extends LSMTreeIndexAccessor {
         public LSMBTreeIndexAccessor(LSMHarness lsmHarness, IIndexOpContext ctx) {
             super(lsmHarness, ctx);
@@ -526,7 +438,7 @@
         public IIndexCursor createSearchCursor() {
             return new LSMBTreeRangeSearchCursor();
         }
-        
+
         public MultiComparator getMultiComparator() {
             LSMBTreeOpContext concreteCtx = (LSMBTreeOpContext) ctx;
             return concreteCtx.cmp;
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
index a688b5b..6f0f866 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
@@ -16,6 +16,7 @@
 package edu.uci.ics.hyracks.storage.am.lsm.common.api;
 
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
 import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
 import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException;
@@ -43,4 +44,12 @@
      * @throws TreeIndexException
      */
     public void merge() throws HyracksDataException, IndexException;
+
+    /**
+     * Deletes the tuple from the memory component only.
+     * 
+     * @throws HyracksDataException
+     * @throws IndexException
+     */
+    public void physicalDelete(ITupleReference tuple) throws HyracksDataException, IndexException;
 }
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
index 28cea26..de2bdcd 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
@@ -73,4 +73,10 @@
     public void merge() throws HyracksDataException, IndexException {
         lsmHarness.merge();
     }
+    
+    @Override
+    public void physicalDelete(ITupleReference tuple) throws HyracksDataException, IndexException {
+        ctx.reset(IndexOp.PHYSICALDELETE);
+        lsmHarness.insertUpdateOrDelete(tuple, ctx);
+    }
 }
\ No newline at end of file
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeDataflowHelper.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeDataflowHelper.java
index 4f9280d..52ae64a 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeDataflowHelper.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeDataflowHelper.java
@@ -21,6 +21,7 @@
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.control.nc.io.IOManager;
 import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IOperationCallbackProvider;
 import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexMetaDataFrameFactory;
@@ -44,20 +45,21 @@
     private final IBinaryComparatorFactory[] btreeComparatorFactories;
     private final IPrimitiveValueProviderFactory[] valueProviderFactories;
 
-    public LSMRTreeDataflowHelper(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx, int partition,
-            boolean createIfNotExists, IBinaryComparatorFactory[] btreeComparatorFactories,
-            IPrimitiveValueProviderFactory[] valueProviderFactories) {
-        super(opDesc, ctx, partition, createIfNotExists);
+    public LSMRTreeDataflowHelper(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
+            IOperationCallbackProvider opCallbackProvider, int partition, boolean createIfNotExists,
+            IBinaryComparatorFactory[] btreeComparatorFactories, IPrimitiveValueProviderFactory[] valueProviderFactories) {
+        super(opDesc, ctx, opCallbackProvider, partition, createIfNotExists);
         memPageSize = DEFAULT_MEM_PAGE_SIZE;
         memNumPages = DEFAULT_MEM_NUM_PAGES;
         this.btreeComparatorFactories = btreeComparatorFactories;
         this.valueProviderFactories = valueProviderFactories;
     }
 
-    public LSMRTreeDataflowHelper(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx, int partition,
-            boolean createIfNotExists, int memPageSize, int memNumPages,
-            IBinaryComparatorFactory[] btreeComparatorFactories, IPrimitiveValueProviderFactory[] valueProviderFactories) {
-        super(opDesc, ctx, partition, createIfNotExists);
+    public LSMRTreeDataflowHelper(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
+            IOperationCallbackProvider opCallbackProvider, int partition, boolean createIfNotExists, int memPageSize,
+            int memNumPages, IBinaryComparatorFactory[] btreeComparatorFactories,
+            IPrimitiveValueProviderFactory[] valueProviderFactories) {
+        super(opDesc, ctx, opCallbackProvider, partition, createIfNotExists);
         this.memPageSize = memPageSize;
         this.memNumPages = memNumPages;
         this.btreeComparatorFactories = btreeComparatorFactories;
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeDataflowHelperFactory.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeDataflowHelperFactory.java
index 573a0f7..b0f9377 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeDataflowHelperFactory.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeDataflowHelperFactory.java
@@ -17,6 +17,7 @@
 
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.storage.am.common.api.IOperationCallbackProvider;
 import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
@@ -37,8 +38,8 @@
 
     @Override
     public IndexDataflowHelper createIndexDataflowHelper(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
-            int partition, boolean createIfNotExists) {
-        return new LSMRTreeDataflowHelper(opDesc, ctx, partition, createIfNotExists, btreeComparatorFactories,
-                valueProviderFactories);
+            IOperationCallbackProvider opCallbackProvider, int partition, boolean createIfNotExists) {
+        return new LSMRTreeDataflowHelper(opDesc, ctx, opCallbackProvider, partition, createIfNotExists,
+                btreeComparatorFactories, valueProviderFactories);
     }
 }
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
index 803706c..a1e25ae 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
@@ -288,6 +288,10 @@
     public boolean insertUpdateOrDelete(ITupleReference tuple, IIndexOpContext ictx) throws HyracksDataException,
             TreeIndexException {
         LSMRTreeOpContext ctx = (LSMRTreeOpContext) ictx;
+        if (ctx.getIndexOp() == IndexOp.PHYSICALDELETE) {
+            throw new UnsupportedOperationException("Physical delete not yet supported in LSM R-tree");
+        }
+        
         if (ctx.getIndexOp() == IndexOp.INSERT) {
             // Before each insert, we must check whether there exist a killer
             // tuple in the memBTree. If we find a killer tuple, we must truly
@@ -342,9 +346,8 @@
         return true;
     }
 
-    public void search(IIndexCursor cursor, List<Object> diskComponents, ISearchPredicate pred,
-            IIndexOpContext ictx, boolean includeMemComponent, AtomicInteger searcherRefCount)
-            throws HyracksDataException, IndexException {
+    public void search(IIndexCursor cursor, List<Object> diskComponents, ISearchPredicate pred, IIndexOpContext ictx,
+            boolean includeMemComponent, AtomicInteger searcherRefCount) throws HyracksDataException, IndexException {
         LSMRTreeOpContext ctx = (LSMRTreeOpContext) ictx;
         int numDiskTrees = diskComponents.size();
         int numTrees = (includeMemComponent) ? numDiskTrees + 1 : numDiskTrees;
diff --git a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeDataflowHelper.java b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeDataflowHelper.java
index 99a349a..6f2b3e9 100644
--- a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeDataflowHelper.java
+++ b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeDataflowHelper.java
@@ -17,6 +17,7 @@
 
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.common.api.IOperationCallbackProvider;
 import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
@@ -27,9 +28,10 @@
 
     private final IPrimitiveValueProviderFactory[] valueProviderFactories;
 
-    public RTreeDataflowHelper(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx, int partition,
-            boolean createIfNotExists, IPrimitiveValueProviderFactory[] valueProviderFactories) {
-        super(opDesc, ctx, partition, createIfNotExists);
+    public RTreeDataflowHelper(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
+            IOperationCallbackProvider opCallbackProvider, int partition, boolean createIfNotExists,
+            IPrimitiveValueProviderFactory[] valueProviderFactories) {
+        super(opDesc, ctx, opCallbackProvider, partition, createIfNotExists);
         this.valueProviderFactories = valueProviderFactories;
     }
 
diff --git a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeDataflowHelperFactory.java b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeDataflowHelperFactory.java
index c76aa63..4871ce9 100644
--- a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeDataflowHelperFactory.java
+++ b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeDataflowHelperFactory.java
@@ -16,6 +16,7 @@
 package edu.uci.ics.hyracks.storage.am.rtree.dataflow;
 
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.storage.am.common.api.IOperationCallbackProvider;
 import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
@@ -26,14 +27,15 @@
     private static final long serialVersionUID = 1L;
 
     private final IPrimitiveValueProviderFactory[] valueProviderFactories;
-    
+
     public RTreeDataflowHelperFactory(IPrimitiveValueProviderFactory[] valueProviderFactories) {
         this.valueProviderFactories = valueProviderFactories;
     }
-    
+
     @Override
     public IndexDataflowHelper createIndexDataflowHelper(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
-            int partition, boolean createIfNotExists) {
-        return new RTreeDataflowHelper(opDesc, ctx, partition, createIfNotExists, valueProviderFactories);
+            IOperationCallbackProvider opCallbackProvider, int partition, boolean createIfNotExists) {
+        return new RTreeDataflowHelper(opDesc, ctx, opCallbackProvider, partition, createIfNotExists,
+                valueProviderFactories);
     }
 }
diff --git a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorDescriptor.java b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorDescriptor.java
index 843eb34..d797ff2 100644
--- a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorDescriptor.java
+++ b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorDescriptor.java
@@ -23,6 +23,7 @@
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IOperationCallbackProvider;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
@@ -39,15 +40,16 @@
             IStorageManagerInterface storageManager, IIndexRegistryProvider<IIndex> indexRegistryProvider,
             IFileSplitProvider fileSplitProvider, ITypeTraits[] typeTraits,
             IBinaryComparatorFactory[] comparatorFactories, int[] keyFields,
-            IIndexDataflowHelperFactory dataflowHelperFactory) {
+            IIndexDataflowHelperFactory dataflowHelperFactory, IOperationCallbackProvider opCallbackProvider) {
         super(spec, 1, 1, recDesc, storageManager, indexRegistryProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, dataflowHelperFactory);
+                comparatorFactories, dataflowHelperFactory, opCallbackProvider);
         this.keyFields = keyFields;
     }
 
     @Override
     public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
             IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
-        return new RTreeSearchOperatorNodePushable(this, ctx, partition, recordDescProvider, keyFields);
+        return new RTreeSearchOperatorNodePushable(this, ctx, opCallbackProvider, partition, recordDescProvider,
+                keyFields);
     }
 }
\ No newline at end of file
diff --git a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorNodePushable.java b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorNodePushable.java
index c5a36c9..0dde9b6 100644
--- a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorNodePushable.java
+++ b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorNodePushable.java
@@ -17,6 +17,7 @@
 
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IOperationCallbackProvider;
 import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.PermutingFrameTupleReference;
@@ -30,8 +31,9 @@
     protected MultiComparator cmp;
 
     public RTreeSearchOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
-            int partition, IRecordDescriptorProvider recordDescProvider, int[] keyFields) {
-        super(opDesc, ctx, partition, recordDescProvider);
+            IOperationCallbackProvider opCallbackProvider, int partition, IRecordDescriptorProvider recordDescProvider,
+            int[] keyFields) {
+        super(opDesc, ctx, opCallbackProvider, partition, recordDescProvider);
         if (keyFields != null && keyFields.length > 0) {
             searchKey = new PermutingFrameTupleReference();
             searchKey.setFieldPermutation(keyFields);