added proper callbacks to operators. passed all existing tests EXCEPT abort test.(The expected result of the existing abort test is different from the one under the entity-level commit)

git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_lsm_stabilization@852 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/InvertedIndexPOperator.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/InvertedIndexPOperator.java
index 9258bf0..577592d 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/InvertedIndexPOperator.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/InvertedIndexPOperator.java
@@ -166,10 +166,11 @@
                 jobSpec, queryField, appContext.getStorageManagerInterface(), secondarySplitsAndConstraint.first,
                 appContext.getIndexLifecycleManagerProvider(), tokenTypeTraits, tokenComparatorFactories,
                 invListsTypeTraits, invListsComparatorFactories, new LSMInvertedIndexDataflowHelperFactory(
-                        AsterixRuntimeComponentsProvider.INSTANCE, AsterixRuntimeComponentsProvider.INSTANCE,
-                        AsterixRuntimeComponentsProvider.INSTANCE, AsterixRuntimeComponentsProvider.INSTANCE),
-                queryTokenizerFactory, searchModifierFactory, outputRecDesc, retainInput,
-                NoOpOperationCallbackFactory.INSTANCE);
+                        AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
+                        AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
+                        AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
+                        AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER), queryTokenizerFactory,
+                searchModifierFactory, outputRecDesc, retainInput, NoOpOperationCallbackFactory.INSTANCE);
         return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(invIndexSearchOp,
                 secondarySplitsAndConstraint.second);
     }
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/APIFramework.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/APIFramework.java
index 60b7e6c..dec56aa 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/APIFramework.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/APIFramework.java
@@ -68,6 +68,7 @@
 import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IOptimizationContextFactory;
 import edu.uci.ics.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
 import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.job.IJobletEventListenerFactory;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 
@@ -526,9 +527,9 @@
         builder.setTypeTraitProvider(format.getTypeTraitProvider());
         builder.setNormalizedKeyComputerFactoryProvider(format.getNormalizedKeyComputerFactoryProvider());
 
-        JobSpecification spec = compiler.createJob(AsterixAppContextInfoImpl.INSTANCE);
-        // set the job event listener
-        spec.setJobletEventListenerFactory(new JobEventListenerFactory(asterixJobId, isWriteTransaction));
+        IJobletEventListenerFactory jobEventListenerFactory = new JobEventListenerFactory(asterixJobId, isWriteTransaction);
+        JobSpecification spec = compiler.createJob(AsterixAppContextInfoImpl.INSTANCE, jobEventListenerFactory);
+
         if (pc.isPrintJob()) {
             switch (pdf) {
                 case HTML: {
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java
index b34c52b..09c7458 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java
@@ -125,32 +125,32 @@
                     IIndexDataflowHelperFactory dfhFactory;
                     switch (index.getIndexType()) {
                         case BTREE:
-                            dfhFactory = new LSMBTreeDataflowHelperFactory(AsterixRuntimeComponentsProvider.INSTANCE,
-                                    AsterixRuntimeComponentsProvider.INSTANCE,
-                                    AsterixRuntimeComponentsProvider.INSTANCE,
-                                    AsterixRuntimeComponentsProvider.INSTANCE);
+                            dfhFactory = new LSMBTreeDataflowHelperFactory(AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
+                                    AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
+                                    AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
+                                    AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER);
                             break;
                         case RTREE:
                             dfhFactory = new LSMRTreeDataflowHelperFactory(
                                     new IPrimitiveValueProviderFactory[] { null }, RTreePolicyType.RTREE,
-                                    new IBinaryComparatorFactory[] { null }, AsterixRuntimeComponentsProvider.INSTANCE,
-                                    AsterixRuntimeComponentsProvider.INSTANCE,
-                                    AsterixRuntimeComponentsProvider.INSTANCE,
-                                    AsterixRuntimeComponentsProvider.INSTANCE, null);
+                                    new IBinaryComparatorFactory[] { null }, AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
+                                    AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
+                                    AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
+                                    AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER, null);
                             break;
                         case NGRAM_INVIX:
                         case WORD_INVIX:
                             dfhFactory = new LSMInvertedIndexDataflowHelperFactory(
-                                    AsterixRuntimeComponentsProvider.INSTANCE,
-                                    AsterixRuntimeComponentsProvider.INSTANCE,
-                                    AsterixRuntimeComponentsProvider.INSTANCE,
-                                    AsterixRuntimeComponentsProvider.INSTANCE);
+                                    AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
+                                    AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
+                                    AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
+                                    AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER);
                             break;
                         default:
                             throw new AsterixException("Unknown index type provided.");
                     }
                     IndexDropOperatorDescriptor secondaryBtreeDrop = new IndexDropOperatorDescriptor(specs[i],
-                            AsterixRuntimeComponentsProvider.INSTANCE, AsterixRuntimeComponentsProvider.INSTANCE,
+                            AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER, AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER,
                             idxSplitsAndConstraint.first, dfhFactory);
                     AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(specs[i], secondaryBtreeDrop,
                             idxSplitsAndConstraint.second);
@@ -166,10 +166,10 @@
         Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadata
                 .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, datasetName);
         IndexDropOperatorDescriptor primaryBtreeDrop = new IndexDropOperatorDescriptor(specPrimary,
-                AsterixRuntimeComponentsProvider.INSTANCE, AsterixRuntimeComponentsProvider.INSTANCE,
-                splitsAndConstraint.first, new LSMBTreeDataflowHelperFactory(AsterixRuntimeComponentsProvider.INSTANCE,
-                        AsterixRuntimeComponentsProvider.INSTANCE, AsterixRuntimeComponentsProvider.INSTANCE,
-                        AsterixRuntimeComponentsProvider.INSTANCE));
+                AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER, AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER,
+                splitsAndConstraint.first, new LSMBTreeDataflowHelperFactory(AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
+                        AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER, AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
+                        AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER));
         AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(specPrimary, primaryBtreeDrop,
                 splitsAndConstraint.second);
 
@@ -200,10 +200,10 @@
         //TODO replace this transient one to the persistent one 
         TransientLocalResourceFactoryProvider localResourceFactoryProvider = new TransientLocalResourceFactoryProvider();
         TreeIndexCreateOperatorDescriptor indexCreateOp = new TreeIndexCreateOperatorDescriptor(spec,
-                AsterixRuntimeComponentsProvider.INSTANCE, AsterixRuntimeComponentsProvider.INSTANCE,
+                AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER, AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER,
                 splitsAndConstraint.first, typeTraits, comparatorFactories, new LSMBTreeDataflowHelperFactory(
-                        AsterixRuntimeComponentsProvider.INSTANCE, AsterixRuntimeComponentsProvider.INSTANCE,
-                        AsterixRuntimeComponentsProvider.INSTANCE, AsterixRuntimeComponentsProvider.INSTANCE),
+                        AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER, AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
+                        AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER, AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER),
                 localResourceFactoryProvider, NoOpOperationCallbackFactory.INSTANCE);
         AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, indexCreateOp,
                 splitsAndConstraint.second);
@@ -277,11 +277,11 @@
         LOGGER.info("LOAD into File Splits: " + sb.toString());
 
         TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
-                AsterixRuntimeComponentsProvider.INSTANCE, AsterixRuntimeComponentsProvider.INSTANCE,
+                AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER, AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER,
                 splitsAndConstraint.first, typeTraits, comparatorFactories, fieldPermutation,
                 GlobalConfig.DEFAULT_BTREE_FILL_FACTOR, false, new LSMBTreeDataflowHelperFactory(
-                        AsterixRuntimeComponentsProvider.INSTANCE, AsterixRuntimeComponentsProvider.INSTANCE,
-                        AsterixRuntimeComponentsProvider.INSTANCE, AsterixRuntimeComponentsProvider.INSTANCE),
+                        AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER, AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
+                        AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER, AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER),
                 NoOpOperationCallbackFactory.INSTANCE);
         AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, btreeBulkLoad,
                 splitsAndConstraint.second);
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/IndexOperations.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/IndexOperations.java
index 8b2d583..798bc34 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/IndexOperations.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/IndexOperations.java
@@ -46,10 +46,12 @@
         Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = datasetDecls
                 .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, indexName);
         IndexDropOperatorDescriptor btreeDrop = new IndexDropOperatorDescriptor(spec,
-                AsterixRuntimeComponentsProvider.INSTANCE, AsterixRuntimeComponentsProvider.INSTANCE,
-                splitsAndConstraint.first, new LSMBTreeDataflowHelperFactory(AsterixRuntimeComponentsProvider.INSTANCE,
-                        AsterixRuntimeComponentsProvider.INSTANCE, AsterixRuntimeComponentsProvider.INSTANCE,
-                        AsterixRuntimeComponentsProvider.INSTANCE));
+                AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER, AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER,
+                splitsAndConstraint.first, new LSMBTreeDataflowHelperFactory(
+                        AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
+                        AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
+                        AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
+                        AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER));
         AlgebricksPartitionConstraintHelper
                 .setPartitionConstraintInJobSpec(spec, btreeDrop, splitsAndConstraint.second);
         spec.addRoot(btreeDrop);
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryBTreeCreator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryBTreeCreator.java
index 3ecb365..e9c56db 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryBTreeCreator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryBTreeCreator.java
@@ -31,11 +31,12 @@
         //TODO replace this transient one to the persistent one 
         TransientLocalResourceFactoryProvider localResourceFactoryProvider = new TransientLocalResourceFactoryProvider();
         TreeIndexCreateOperatorDescriptor secondaryIndexCreateOp = new TreeIndexCreateOperatorDescriptor(spec,
-                AsterixRuntimeComponentsProvider.INSTANCE, AsterixRuntimeComponentsProvider.INSTANCE,
+                AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER, AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER,
                 secondaryFileSplitProvider, secondaryRecDesc.getTypeTraits(), secondaryComparatorFactories,
-                new LSMBTreeDataflowHelperFactory(AsterixRuntimeComponentsProvider.INSTANCE,
-                        AsterixRuntimeComponentsProvider.INSTANCE, AsterixRuntimeComponentsProvider.INSTANCE,
-                        AsterixRuntimeComponentsProvider.INSTANCE), localResourceFactoryProvider,
+                new LSMBTreeDataflowHelperFactory(AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
+                        AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
+                        AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
+                        AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER), localResourceFactoryProvider,
                 NoOpOperationCallbackFactory.INSTANCE);
         AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, secondaryIndexCreateOp,
                 secondaryPartitionConstraint);
@@ -68,9 +69,10 @@
 
         // Create secondary BTree bulk load op.
         TreeIndexBulkLoadOperatorDescriptor secondaryBulkLoadOp = createTreeIndexBulkLoadOp(spec, numSecondaryKeys,
-                new LSMBTreeDataflowHelperFactory(AsterixRuntimeComponentsProvider.INSTANCE,
-                        AsterixRuntimeComponentsProvider.INSTANCE, AsterixRuntimeComponentsProvider.INSTANCE,
-                        AsterixRuntimeComponentsProvider.INSTANCE), BTree.DEFAULT_FILL_FACTOR);
+                new LSMBTreeDataflowHelperFactory(AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
+                        AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
+                        AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
+                        AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER), BTree.DEFAULT_FILL_FACTOR);
 
         // Connect the operators.
         spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primaryScanOp, 0);
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java
index 169297a..ab61bc4 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java
@@ -224,11 +224,14 @@
         // +Infinity
         int[] highKeyFields = null;
         BTreeSearchOperatorDescriptor primarySearchOp = new BTreeSearchOperatorDescriptor(spec, primaryRecDesc,
-                AsterixRuntimeComponentsProvider.INSTANCE, AsterixRuntimeComponentsProvider.INSTANCE,
+                AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER, AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER,
                 primaryFileSplitProvider, primaryRecDesc.getTypeTraits(), primaryComparatorFactories, lowKeyFields,
-                highKeyFields, true, true, new LSMBTreeDataflowHelperFactory(AsterixRuntimeComponentsProvider.INSTANCE,
-                        AsterixRuntimeComponentsProvider.INSTANCE, AsterixRuntimeComponentsProvider.INSTANCE,
-                        AsterixRuntimeComponentsProvider.INSTANCE), false, NoOpOperationCallbackFactory.INSTANCE);
+                highKeyFields, true, true, new LSMBTreeDataflowHelperFactory(
+                        AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
+                        AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
+                        AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
+                        AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER), false,
+                NoOpOperationCallbackFactory.INSTANCE);
         AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, primarySearchOp,
                 primaryPartitionConstraint);
         return primarySearchOp;
@@ -283,7 +286,7 @@
         Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint = metadata
                 .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, secondaryIndexName);
         TreeIndexBulkLoadOperatorDescriptor treeIndexBulkLoadOp = new TreeIndexBulkLoadOperatorDescriptor(spec,
-                AsterixRuntimeComponentsProvider.INSTANCE, AsterixRuntimeComponentsProvider.INSTANCE,
+                AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER, AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
                 secondarySplitsAndConstraint.first, secondaryRecDesc.getTypeTraits(), secondaryComparatorFactories,
                 fieldPermutation, fillFactor, false, dataflowHelperFactory, NoOpOperationCallbackFactory.INSTANCE);
         AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, treeIndexBulkLoadOp,
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryInvertedIndexCreator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryInvertedIndexCreator.java
index 681059f..2ac2325 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryInvertedIndexCreator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryInvertedIndexCreator.java
@@ -121,12 +121,13 @@
         //TODO replace the transient one to persistent one
         ILocalResourceFactoryProvider localResourceFactoryProvider = new TransientLocalResourceFactoryProvider();
         LSMInvertedIndexCreateOperatorDescriptor invIndexCreateOp = new LSMInvertedIndexCreateOperatorDescriptor(spec,
-                AsterixRuntimeComponentsProvider.INSTANCE, secondaryFileSplitProvider,
-                AsterixRuntimeComponentsProvider.INSTANCE, tokenTypeTraits, tokenComparatorFactories,
+                AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER, secondaryFileSplitProvider,
+                AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER, tokenTypeTraits, tokenComparatorFactories,
                 invListsTypeTraits, primaryComparatorFactories, tokenizerFactory,
-                new LSMInvertedIndexDataflowHelperFactory(AsterixRuntimeComponentsProvider.INSTANCE,
-                        AsterixRuntimeComponentsProvider.INSTANCE, AsterixRuntimeComponentsProvider.INSTANCE,
-                        AsterixRuntimeComponentsProvider.INSTANCE), localResourceFactoryProvider,
+                new LSMInvertedIndexDataflowHelperFactory(AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
+                        AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
+                        AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
+                        AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER), localResourceFactoryProvider,
                 NoOpOperationCallbackFactory.INSTANCE);
         AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, invIndexCreateOp,
                 secondaryPartitionConstraint);
@@ -216,12 +217,14 @@
             fieldPermutation[i] = i;
         }
         LSMInvertedIndexBulkLoadOperatorDescriptor invIndexBulkLoadOp = new LSMInvertedIndexBulkLoadOperatorDescriptor(
-                spec, fieldPermutation, false, AsterixRuntimeComponentsProvider.INSTANCE, secondaryFileSplitProvider,
-                AsterixRuntimeComponentsProvider.INSTANCE, tokenTypeTraits, tokenComparatorFactories,
-                invListsTypeTraits, primaryComparatorFactories, tokenizerFactory,
-                new LSMInvertedIndexDataflowHelperFactory(AsterixRuntimeComponentsProvider.INSTANCE,
-                        AsterixRuntimeComponentsProvider.INSTANCE, AsterixRuntimeComponentsProvider.INSTANCE,
-                        AsterixRuntimeComponentsProvider.INSTANCE), NoOpOperationCallbackFactory.INSTANCE);
+                spec, fieldPermutation, false, AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER,
+                secondaryFileSplitProvider, AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER, tokenTypeTraits,
+                tokenComparatorFactories, invListsTypeTraits, primaryComparatorFactories, tokenizerFactory,
+                new LSMInvertedIndexDataflowHelperFactory(AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
+                        AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
+                        AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
+                        AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER),
+                NoOpOperationCallbackFactory.INSTANCE);
         AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, invIndexBulkLoadOp,
                 secondaryPartitionConstraint);
         return invIndexBulkLoadOp;
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryRTreeCreator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryRTreeCreator.java
index f650899..f7be12f 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryRTreeCreator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryRTreeCreator.java
@@ -54,13 +54,14 @@
         //TODO replace this transient one to the persistent one 
         TransientLocalResourceFactoryProvider localResourceFactoryProvider = new TransientLocalResourceFactoryProvider();
         TreeIndexCreateOperatorDescriptor secondaryIndexCreateOp = new TreeIndexCreateOperatorDescriptor(spec,
-                AsterixRuntimeComponentsProvider.INSTANCE, AsterixRuntimeComponentsProvider.INSTANCE,
+                AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER, AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER,
                 secondaryFileSplitProvider, secondaryRecDesc.getTypeTraits(), secondaryComparatorFactories,
                 new LSMRTreeDataflowHelperFactory(valueProviderFactories, RTreePolicyType.RTREE,
-                        primaryComparatorFactories, AsterixRuntimeComponentsProvider.INSTANCE,
-                        AsterixRuntimeComponentsProvider.INSTANCE, AsterixRuntimeComponentsProvider.INSTANCE,
-                        AsterixRuntimeComponentsProvider.INSTANCE, AqlMetadataProvider.proposeLinearizer(keyType,
-                                secondaryComparatorFactories.length)), localResourceFactoryProvider,
+                        primaryComparatorFactories, AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
+                        AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
+                        AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
+                        AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER, AqlMetadataProvider.proposeLinearizer(
+                                keyType, secondaryComparatorFactories.length)), localResourceFactoryProvider,
                 NoOpOperationCallbackFactory.INSTANCE);
         AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, secondaryIndexCreateOp,
                 secondaryPartitionConstraint);
@@ -139,10 +140,11 @@
                 spec,
                 numNestedSecondaryKeyFields,
                 new LSMRTreeDataflowHelperFactory(valueProviderFactories, RTreePolicyType.RTREE,
-                        primaryComparatorFactories, AsterixRuntimeComponentsProvider.INSTANCE,
-                        AsterixRuntimeComponentsProvider.INSTANCE, AsterixRuntimeComponentsProvider.INSTANCE,
-                        AsterixRuntimeComponentsProvider.INSTANCE, AqlMetadataProvider.proposeLinearizer(keyType,
-                                secondaryComparatorFactories.length)), BTree.DEFAULT_FILL_FACTOR);
+                        primaryComparatorFactories, AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
+                        AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
+                        AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
+                        AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER, AqlMetadataProvider.proposeLinearizer(
+                                keyType, secondaryComparatorFactories.length)), BTree.DEFAULT_FILL_FACTOR);
 
         // Connect the operators.
         spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primaryScanOp, 0);
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/TestKeywordIndexJob.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/TestKeywordIndexJob.java
deleted file mode 100644
index bb2c313..0000000
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/TestKeywordIndexJob.java
+++ /dev/null
@@ -1,223 +0,0 @@
-package edu.uci.ics.asterix.file;
-
-import java.io.DataOutput;
-import java.io.File;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-
-import edu.uci.ics.asterix.common.context.AsterixRuntimeComponentsProvider;
-import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.dataflow.data.nontagged.comparators.AObjectAscBinaryComparatorFactory;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AObjectSerializerDeserializer;
-import edu.uci.ics.asterix.om.base.AString;
-import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import edu.uci.ics.hyracks.api.client.HyracksConnection;
-import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
-import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.io.FileReference;
-import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.DoubleParserFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.FloatParserFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.IntegerParserFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.LongParserFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
-import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
-import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
-import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
-import edu.uci.ics.hyracks.dataflow.std.misc.ConstantTupleSourceOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.misc.PrinterOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeDataflowHelperFactory;
-
-public class TestKeywordIndexJob {
-
-    private static final HashMap<ATypeTag, IValueParserFactory> typeToValueParserFactMap = new HashMap<ATypeTag, IValueParserFactory>();
-    static {
-        typeToValueParserFactMap.put(ATypeTag.INT32, IntegerParserFactory.INSTANCE);
-        typeToValueParserFactMap.put(ATypeTag.FLOAT, FloatParserFactory.INSTANCE);
-        typeToValueParserFactMap.put(ATypeTag.DOUBLE, DoubleParserFactory.INSTANCE);
-        typeToValueParserFactMap.put(ATypeTag.INT64, LongParserFactory.INSTANCE);
-        typeToValueParserFactMap.put(ATypeTag.STRING, UTF8StringParserFactory.INSTANCE);
-    }
-
-    public static int DEFAULT_INPUT_DATA_COLUMN = 0;
-    public static LogicalVariable METADATA_DUMMY_VAR = new LogicalVariable(-1);
-
-    @SuppressWarnings("unchecked")
-    public JobSpecification createJobSpec() throws AsterixException, HyracksDataException {
-
-        JobSpecification spec = new JobSpecification();
-
-        // ---------- START GENERAL BTREE STUFF
-
-        // ---------- END GENERAL BTREE STUFF
-
-        List<String> nodeGroup = new ArrayList<String>();
-        nodeGroup.add("nc1");
-        nodeGroup.add("nc2");
-
-        // ---------- START KEY PROVIDER OP
-
-        // TODO: should actually be empty tuple source
-        // build tuple containing low and high search keys
-        ArrayTupleBuilder tb = new ArrayTupleBuilder(1); // just one dummy field
-        DataOutput dos = tb.getDataOutput();
-
-        tb.reset();
-        AObjectSerializerDeserializer.INSTANCE.serialize(new AString("Jodi Rotruck"), dos); // dummy
-        // field
-        tb.addFieldEndOffset();
-
-        ISerializerDeserializer[] keyRecDescSers = { AObjectSerializerDeserializer.INSTANCE };
-        RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
-
-        ConstantTupleSourceOperatorDescriptor keyProviderOp = new ConstantTupleSourceOperatorDescriptor(spec,
-                keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
-        String[] keyProviderOpLocationConstraint = new String[nodeGroup.size()];
-        for (int p = 0; p < nodeGroup.size(); p++) {
-            keyProviderOpLocationConstraint[p] = nodeGroup.get(p);
-        }
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, keyProviderOp, keyProviderOpLocationConstraint);
-
-        // ---------- END KEY PROVIDER OP
-
-        // ---------- START SECONRARY INDEX SCAN
-
-        ITypeTraits[] secondaryTypeTraits = new ITypeTraits[2];
-        secondaryTypeTraits[0] = new ITypeTraits() {
-
-            @Override
-            public boolean isFixedLength() {
-                return false;
-            }
-
-            @Override
-            public int getFixedLength() {
-                return -1;
-            }
-        };
-
-        secondaryTypeTraits[1] = new ITypeTraits() {
-
-            @Override
-            public boolean isFixedLength() {
-                return true;
-            }
-
-            @Override
-            public int getFixedLength() {
-                return 5;
-            }
-        };
-
-        ISerializerDeserializer[] secondaryRecFields = new ISerializerDeserializer[2];
-        secondaryRecFields[0] = AObjectSerializerDeserializer.INSTANCE;
-        secondaryRecFields[1] = AObjectSerializerDeserializer.INSTANCE;
-        IBinaryComparatorFactory[] secondaryComparatorFactories = new IBinaryComparatorFactory[2];
-        secondaryComparatorFactories[0] = AObjectAscBinaryComparatorFactory.INSTANCE;
-        secondaryComparatorFactories[1] = AObjectAscBinaryComparatorFactory.INSTANCE;
-
-        int[] lowKeyFields = null;
-        int[] highKeyFields = null;
-        RecordDescriptor secondaryRecDesc = new RecordDescriptor(secondaryRecFields);
-        // TODO: change file splits according to mount points in cluster config
-        IFileSplitProvider secondarySplitProvider = new ConstantFileSplitProvider(new FileSplit[] {
-                new FileSplit("nc1", new FileReference(new File("/tmp/nc1/demo1112/Customers_idx_NameInvIndex"))),
-                new FileSplit("nc2", new FileReference(new File("/tmp/nc2/demo1112/Customers_idx_NameInvIndex"))) });
-        BTreeSearchOperatorDescriptor secondarySearchOp = new BTreeSearchOperatorDescriptor(spec, secondaryRecDesc,
-                AsterixRuntimeComponentsProvider.INSTANCE, AsterixRuntimeComponentsProvider.INSTANCE,
-                secondarySplitProvider, secondaryTypeTraits, secondaryComparatorFactories, lowKeyFields, highKeyFields,
-                true, true, new LSMBTreeDataflowHelperFactory(AsterixRuntimeComponentsProvider.INSTANCE,
-                        AsterixRuntimeComponentsProvider.INSTANCE, AsterixRuntimeComponentsProvider.INSTANCE,
-                        AsterixRuntimeComponentsProvider.INSTANCE), false, NoOpOperationCallbackFactory.INSTANCE);
-        String[] secondarySearchOpLocationConstraint = new String[nodeGroup.size()];
-        for (int p = 0; p < nodeGroup.size(); p++) {
-            secondarySearchOpLocationConstraint[p] = nodeGroup.get(p);
-        }
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondarySearchOp,
-                secondarySearchOpLocationConstraint);
-
-        // ---------- END SECONDARY INDEX SCAN
-
-        PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
-        String[] printerLocationConstraint = new String[nodeGroup.size()];
-        for (int p = 0; p < nodeGroup.size(); p++) {
-            printerLocationConstraint[p] = nodeGroup.get(p);
-        }
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, printerLocationConstraint);
-
-        // ---------- START CONNECT THE OPERATORS
-
-        spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, secondarySearchOp, 0);
-
-        spec.connect(new OneToOneConnectorDescriptor(spec), secondarySearchOp, 0, printer, 0);
-
-        // ---------- END CONNECT THE OPERATORS
-
-        spec.addRoot(printer);
-
-        return spec;
-    }
-
-    public static void main(String[] args) throws Exception {
-        String host;
-        String appName;
-        String ddlFile;
-
-        switch (args.length) {
-            case 0: {
-                host = "127.0.0.1";
-                appName = "asterix";
-                ddlFile = "/home/abehm/workspace/asterix/src/test/resources/demo0927/local/create-index.aql";
-                System.out.println("No arguments specified, using defauls:");
-                System.out.println("HYRACKS HOST: " + host);
-                System.out.println("APPNAME:      " + appName);
-                System.out.println("DDLFILE:      " + ddlFile);
-            }
-                break;
-
-            case 3: {
-                host = args[0];
-                appName = args[1];
-                ddlFile = args[2];
-            }
-                break;
-
-            default: {
-                System.out.println("USAGE:");
-                System.out.println("ARG 1: Hyracks Host (IP or Hostname)");
-                System.out.println("ARG 2: Application Name (e.g., asterix)");
-                System.out.println("ARG 3: DDL File");
-                host = null;
-                appName = null;
-                ddlFile = null;
-                System.exit(0);
-            }
-                break;
-        }
-
-        int port = 1098;
-        IHyracksClientConnection hcc = new HyracksConnection(host, port);
-
-        TestKeywordIndexJob tij = new TestKeywordIndexJob();
-        JobSpecification jobSpec = tij.createJobSpec();
-
-        long start = System.currentTimeMillis();
-        JobId jobId = hcc.startJob("asterix", jobSpec);
-        hcc.waitForCompletion(jobId);
-        long end = System.currentTimeMillis();
-        System.err.println(start + " " + end + " " + (end - start));
-    }
-}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/TestSecondaryIndexJob.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/TestSecondaryIndexJob.java
deleted file mode 100644
index 29d5273..0000000
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/TestSecondaryIndexJob.java
+++ /dev/null
@@ -1,223 +0,0 @@
-package edu.uci.ics.asterix.file;
-
-import java.io.DataOutput;
-import java.io.File;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-
-import edu.uci.ics.asterix.common.context.AsterixRuntimeComponentsProvider;
-import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.dataflow.data.nontagged.comparators.AObjectAscBinaryComparatorFactory;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AObjectSerializerDeserializer;
-import edu.uci.ics.asterix.om.base.AString;
-import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import edu.uci.ics.hyracks.api.client.HyracksConnection;
-import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
-import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.io.FileReference;
-import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.DoubleParserFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.FloatParserFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.IntegerParserFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.LongParserFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
-import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
-import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
-import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
-import edu.uci.ics.hyracks.dataflow.std.misc.ConstantTupleSourceOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.misc.PrinterOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeDataflowHelperFactory;
-
-public class TestSecondaryIndexJob {
-
-    private static final HashMap<ATypeTag, IValueParserFactory> typeToValueParserFactMap = new HashMap<ATypeTag, IValueParserFactory>();
-    static {
-        typeToValueParserFactMap.put(ATypeTag.INT32, IntegerParserFactory.INSTANCE);
-        typeToValueParserFactMap.put(ATypeTag.FLOAT, FloatParserFactory.INSTANCE);
-        typeToValueParserFactMap.put(ATypeTag.DOUBLE, DoubleParserFactory.INSTANCE);
-        typeToValueParserFactMap.put(ATypeTag.INT64, LongParserFactory.INSTANCE);
-        typeToValueParserFactMap.put(ATypeTag.STRING, UTF8StringParserFactory.INSTANCE);
-    }
-
-    public static int DEFAULT_INPUT_DATA_COLUMN = 0;
-    public static LogicalVariable METADATA_DUMMY_VAR = new LogicalVariable(-1);
-
-    @SuppressWarnings("unchecked")
-    public JobSpecification createJobSpec() throws AsterixException, HyracksDataException {
-
-        JobSpecification spec = new JobSpecification();
-
-        // ---------- START GENERAL BTREE STUFF
-
-        // ---------- END GENERAL BTREE STUFF
-
-        List<String> nodeGroup = new ArrayList<String>();
-        nodeGroup.add("nc1");
-        nodeGroup.add("nc2");
-
-        // ---------- START KEY PROVIDER OP
-
-        // TODO: should actually be empty tuple source
-        // build tuple containing low and high search keys
-        ArrayTupleBuilder tb = new ArrayTupleBuilder(1); // just one dummy field
-        DataOutput dos = tb.getDataOutput();
-
-        tb.reset();
-        AObjectSerializerDeserializer.INSTANCE.serialize(new AString("Jodi Rotruck"), dos); // dummy
-        // field
-        tb.addFieldEndOffset();
-
-        ISerializerDeserializer[] keyRecDescSers = { AObjectSerializerDeserializer.INSTANCE };
-        RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
-
-        ConstantTupleSourceOperatorDescriptor keyProviderOp = new ConstantTupleSourceOperatorDescriptor(spec,
-                keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
-        String[] keyProviderOpLocationConstraint = new String[nodeGroup.size()];
-        for (int p = 0; p < nodeGroup.size(); p++) {
-            keyProviderOpLocationConstraint[p] = nodeGroup.get(p);
-        }
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, keyProviderOp, keyProviderOpLocationConstraint);
-
-        // ---------- END KEY PROVIDER OP
-
-        // ---------- START SECONRARY INDEX SCAN
-
-        ITypeTraits[] secondaryTypeTraits = new ITypeTraits[2];
-        secondaryTypeTraits[0] = new ITypeTraits() {
-
-            @Override
-            public boolean isFixedLength() {
-                return false;
-            }
-
-            @Override
-            public int getFixedLength() {
-                return -1;
-            }
-        };
-
-        secondaryTypeTraits[1] = new ITypeTraits() {
-
-            @Override
-            public boolean isFixedLength() {
-                return true;
-            }
-
-            @Override
-            public int getFixedLength() {
-                return 5;
-            }
-        };
-
-        ISerializerDeserializer[] secondaryRecFields = new ISerializerDeserializer[2];
-        secondaryRecFields[0] = AObjectSerializerDeserializer.INSTANCE;
-        secondaryRecFields[1] = AObjectSerializerDeserializer.INSTANCE;
-        IBinaryComparatorFactory[] secondaryComparatorFactories = new IBinaryComparatorFactory[2];
-        secondaryComparatorFactories[0] = AObjectAscBinaryComparatorFactory.INSTANCE;
-        secondaryComparatorFactories[1] = AObjectAscBinaryComparatorFactory.INSTANCE;
-
-        int[] lowKeyFields = null; // -infinity
-        int[] highKeyFields = null; // +infinity
-        RecordDescriptor secondaryRecDesc = new RecordDescriptor(secondaryRecFields);
-        // TODO: change file splits according to mount points in cluster config
-        IFileSplitProvider secondarySplitProvider = new ConstantFileSplitProvider(new FileSplit[] {
-                new FileSplit("nc1", new FileReference(new File("/tmp/nc1/demo1112/Customers_idx_NameBtreeIndex"))),
-                new FileSplit("nc2", new FileReference(new File("/tmp/nc2/demo1112/Customers_idx_NameBtreeIndex"))) });
-        BTreeSearchOperatorDescriptor secondarySearchOp = new BTreeSearchOperatorDescriptor(spec, secondaryRecDesc,
-                AsterixRuntimeComponentsProvider.INSTANCE, AsterixRuntimeComponentsProvider.INSTANCE,
-                secondarySplitProvider, secondaryTypeTraits, secondaryComparatorFactories, lowKeyFields, highKeyFields,
-                true, true, new LSMBTreeDataflowHelperFactory(AsterixRuntimeComponentsProvider.INSTANCE,
-                        AsterixRuntimeComponentsProvider.INSTANCE, AsterixRuntimeComponentsProvider.INSTANCE,
-                        AsterixRuntimeComponentsProvider.INSTANCE), false, NoOpOperationCallbackFactory.INSTANCE);
-        String[] secondarySearchOpLocationConstraint = new String[nodeGroup.size()];
-        for (int p = 0; p < nodeGroup.size(); p++) {
-            secondarySearchOpLocationConstraint[p] = nodeGroup.get(p);
-        }
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondarySearchOp,
-                secondarySearchOpLocationConstraint);
-
-        // ---------- END SECONDARY INDEX SCAN
-
-        PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
-        String[] printerLocationConstraint = new String[nodeGroup.size()];
-        for (int p = 0; p < nodeGroup.size(); p++) {
-            printerLocationConstraint[p] = nodeGroup.get(p);
-        }
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, printerLocationConstraint);
-
-        // ---------- START CONNECT THE OPERATORS
-
-        spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, secondarySearchOp, 0);
-
-        spec.connect(new OneToOneConnectorDescriptor(spec), secondarySearchOp, 0, printer, 0);
-
-        // ---------- END CONNECT THE OPERATORS
-
-        spec.addRoot(printer);
-
-        return spec;
-    }
-
-    public static void main(String[] args) throws Exception {
-        String host;
-        String appName;
-        String ddlFile;
-
-        switch (args.length) {
-            case 0: {
-                host = "127.0.0.1";
-                appName = "asterix";
-                ddlFile = "/home/nicnic/workspace/asterix/trunk/asterix/asterix-app/src/test/resources/demo0927/local/create-index.aql";
-                System.out.println("No arguments specified, using defauls:");
-                System.out.println("HYRACKS HOST: " + host);
-                System.out.println("APPNAME:      " + appName);
-                System.out.println("DDLFILE:      " + ddlFile);
-            }
-                break;
-
-            case 3: {
-                host = args[0];
-                appName = args[1];
-                ddlFile = args[2];
-            }
-                break;
-
-            default: {
-                System.out.println("USAGE:");
-                System.out.println("ARG 1: Hyracks Host (IP or Hostname)");
-                System.out.println("ARG 2: Application Name (e.g., asterix)");
-                System.out.println("ARG 3: DDL File");
-                host = null;
-                appName = null;
-                ddlFile = null;
-                System.exit(0);
-            }
-                break;
-        }
-
-        int port = 1098;
-        IHyracksClientConnection hcc = new HyracksConnection(host, port);
-
-        TestSecondaryIndexJob tij = new TestSecondaryIndexJob();
-        JobSpecification jobSpec = tij.createJobSpec();
-
-        long start = System.currentTimeMillis();
-        JobId jobId = hcc.startJob("asterix", jobSpec);
-        hcc.waitForCompletion(jobId);
-        long end = System.currentTimeMillis();
-        System.err.println(start + " " + end + " " + (end - start));
-    }
-}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/AsterixAppContextInfoImpl.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/AsterixAppContextInfoImpl.java
index dca790d..cef79bf 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/AsterixAppContextInfoImpl.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/AsterixAppContextInfoImpl.java
@@ -19,7 +19,7 @@
 
     @Override
     public IStorageManagerInterface getStorageManagerInterface() {
-        return AsterixRuntimeComponentsProvider.INSTANCE;
+        return AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER;
     }
 
     public static void setNodeControllerInfo(Map<String, Set<String>> nodeControllerInfo) {
@@ -32,7 +32,7 @@
 
     @Override
     public IIndexLifecycleManagerProvider getIndexLifecycleManagerProvider() {
-        return AsterixRuntimeComponentsProvider.INSTANCE;
+        return AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER;
     }
 
 }
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/AsterixRuntimeComponentsProvider.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/AsterixRuntimeComponentsProvider.java
index a5684fc..ca45c89 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/AsterixRuntimeComponentsProvider.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/AsterixRuntimeComponentsProvider.java
@@ -1,10 +1,15 @@
 package edu.uci.ics.asterix.common.context;
 
+import edu.uci.ics.asterix.transaction.management.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
+import edu.uci.ics.asterix.transaction.management.ioopcallbacks.LSMInvertedIndexIOOperationCallbackFactory;
+import edu.uci.ics.asterix.transaction.management.ioopcallbacks.LSMRTreeIOOperationCallbackFactory;
+import edu.uci.ics.asterix.transaction.management.opcallbacks.IndexOperationTracker;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManager;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFlushController;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFlushControllerProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationSchedulerProvider;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
@@ -12,23 +17,34 @@
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyProvider;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ReferenceCountingOperationTracker;
 import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
 import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
 import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
 import edu.uci.ics.hyracks.storage.common.file.ILocalResourceRepository;
 import edu.uci.ics.hyracks.storage.common.file.ResourceIdFactory;
 
-public enum AsterixRuntimeComponentsProvider implements IIndexLifecycleManagerProvider, IStorageManagerInterface,
-        ILSMIOOperationSchedulerProvider, ILSMFlushControllerProvider, ILSMMergePolicyProvider, ILSMOperationTrackerFactory {
-    INSTANCE;
+public class AsterixRuntimeComponentsProvider implements IIndexLifecycleManagerProvider, IStorageManagerInterface,
+        ILSMIOOperationSchedulerProvider, ILSMFlushControllerProvider, ILSMMergePolicyProvider,
+        ILSMOperationTrackerFactory {
+    private final ILSMIOOperationCallbackFactory ioOpCallbackFactory;
+
+    public static final AsterixRuntimeComponentsProvider LSMBTREE_PROVIDER = new AsterixRuntimeComponentsProvider(
+            LSMBTreeIOOperationCallbackFactory.INSTANCE);
+    public static final AsterixRuntimeComponentsProvider LSMRTREE_PROVIDER = new AsterixRuntimeComponentsProvider(
+            LSMRTreeIOOperationCallbackFactory.INSTANCE);
+    public static final AsterixRuntimeComponentsProvider LSMINVERTEDINDEX_PROVIDER = new AsterixRuntimeComponentsProvider(
+            LSMInvertedIndexIOOperationCallbackFactory.INSTANCE);
+    public static final AsterixRuntimeComponentsProvider NOINDEX_PROVIDER = new AsterixRuntimeComponentsProvider(null);
+
+    private AsterixRuntimeComponentsProvider(ILSMIOOperationCallbackFactory ioOpCallbackFactory) {
+        this.ioOpCallbackFactory = ioOpCallbackFactory;
+    }
 
     @Override
     public ILSMOperationTracker createOperationTracker(ILSMIndex index) {
-        // TODO: Replace this with the IndexOperationTracker, once the other transactional components are in place.
-        return new ReferenceCountingOperationTracker(index);
+        return new IndexOperationTracker(index, ioOpCallbackFactory);
     }
-    
+
     @Override
     public ILSMFlushController getFlushController(IHyracksTaskContext ctx) {
         return ((AsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject())
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
index 26148bb..55a62cf 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -130,6 +130,10 @@
                 .getTransactionalResourceRepository();
         resourceRepository.registerTransactionalResourceManager(ResourceType.LSM_BTREE, new IndexResourceManager(
                 ResourceType.LSM_BTREE, runtimeContext.getTransactionSubsystem()));
+        resourceRepository.registerTransactionalResourceManager(ResourceType.LSM_RTREE, new IndexResourceManager(
+                ResourceType.LSM_RTREE, runtimeContext.getTransactionSubsystem()));
+        resourceRepository.registerTransactionalResourceManager(ResourceType.LSM_INVERTED_INDEX,
+                new IndexResourceManager(ResourceType.LSM_INVERTED_INDEX, runtimeContext.getTransactionSubsystem()));
 
         metadataNodeName = asterixProperties.getMetadataNodeName();
         isNewUniverse = asterixProperties.isNewUniverse();
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
index fa428bb..5e56779 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
@@ -16,11 +16,13 @@
 package edu.uci.ics.asterix.metadata.declared;
 
 import java.io.File;
+import java.util.ArrayList;
 import java.util.List;
 
 import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
 import edu.uci.ics.asterix.common.config.GlobalConfig;
 import edu.uci.ics.asterix.common.context.AsterixRuntimeComponentsProvider;
+import edu.uci.ics.asterix.common.context.TransactionSubsystemProvider;
 import edu.uci.ics.asterix.common.dataflow.IAsterixApplicationContextInfo;
 import edu.uci.ics.asterix.common.parse.IParseFileSplitsDecl;
 import edu.uci.ics.asterix.dataflow.data.nontagged.valueproviders.AqlPrimitiveValueProviderFactory;
@@ -46,6 +48,12 @@
 import edu.uci.ics.asterix.om.types.IAType;
 import edu.uci.ics.asterix.om.util.NonTaggedFormatUtil;
 import edu.uci.ics.asterix.runtime.base.AsterixTupleFilterFactory;
+import edu.uci.ics.asterix.runtime.job.listener.JobEventListenerFactory;
+import edu.uci.ics.asterix.transaction.management.opcallbacks.PrimaryIndexModificationOperationCallbackFactory;
+import edu.uci.ics.asterix.transaction.management.opcallbacks.PrimaryIndexSearchOperationCallbackFactory;
+import edu.uci.ics.asterix.transaction.management.opcallbacks.SecondaryIndexModificationOperationCallbackFactory;
+import edu.uci.ics.asterix.transaction.management.opcallbacks.SecondaryIndexSearchOperationCallbackFactory;
+import edu.uci.ics.asterix.transaction.management.service.transaction.IResourceManager.ResourceType;
 import edu.uci.ics.asterix.transaction.management.service.transaction.JobId;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
@@ -70,6 +78,7 @@
 import edu.uci.ics.hyracks.algebricks.runtime.operators.std.SinkWriterRuntimeFactory;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ILinearizeComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
@@ -83,6 +92,7 @@
 import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMInteriorFrameFactory;
 import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
@@ -319,12 +329,34 @@
         } catch (Exception e) {
             throw new AlgebricksException(e);
         }
+
+        ISearchOperationCallbackFactory searchCallbackFactory = null;
+        if (isSecondary) {
+            searchCallbackFactory = new SecondaryIndexSearchOperationCallbackFactory();
+        } else {
+            JobId jobId = ((JobEventListenerFactory) jobSpec.getJobletEventListenerFactory()).getJobId();
+            int datasetId = dataset.getDatasetId();
+            int[] primaryKeyFields = new int[numPrimaryKeys];
+            List<LogicalVariable> primaryKeyVars = new ArrayList<LogicalVariable>();
+            for (int i = 0; i < numPrimaryKeys; i++) {
+                primaryKeyFields[i] = i;
+                primaryKeyVars.add(new LogicalVariable(outputVars.get(i).getId()));
+            }
+            IBinaryHashFunctionFactory[] entityIdFieldHashFunctionFactories = JobGenHelper
+                    .variablesToBinaryHashFunctionFactories(primaryKeyVars, typeEnv, context);
+
+            TransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider();
+            searchCallbackFactory = new PrimaryIndexSearchOperationCallbackFactory(jobId, datasetId, primaryKeyFields,
+                    entityIdFieldHashFunctionFactories, txnSubsystemProvider, ResourceType.LSM_BTREE);
+        }
+
         BTreeSearchOperatorDescriptor btreeSearchOp = new BTreeSearchOperatorDescriptor(jobSpec, outputRecDesc,
                 appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(), spPc.first,
                 typeTraits, comparatorFactories, lowKeyFields, highKeyFields, lowKeyInclusive, highKeyInclusive,
-                new LSMBTreeDataflowHelperFactory(AsterixRuntimeComponentsProvider.INSTANCE,
-                        AsterixRuntimeComponentsProvider.INSTANCE, AsterixRuntimeComponentsProvider.INSTANCE,
-                        AsterixRuntimeComponentsProvider.INSTANCE), retainInput, NoOpOperationCallbackFactory.INSTANCE);
+                new LSMBTreeDataflowHelperFactory(AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
+                        AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
+                        AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
+                        AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER), retainInput, searchCallbackFactory);
         return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeSearchOp, spPc.second);
     }
 
@@ -403,13 +435,17 @@
         RecordDescriptor recDesc = new RecordDescriptor(recordFields);
         Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc = metadata
                 .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, indexName);
+
+        ISearchOperationCallbackFactory searchCallbackFactory = new SecondaryIndexSearchOperationCallbackFactory();
         RTreeSearchOperatorDescriptor rtreeSearchOp = new RTreeSearchOperatorDescriptor(jobSpec, recDesc,
                 appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(), spPc.first,
                 typeTraits, comparatorFactories, keyFields, new LSMRTreeDataflowHelperFactory(valueProviderFactories,
-                        RTreePolicyType.RTREE, primaryComparatorFactories, AsterixRuntimeComponentsProvider.INSTANCE,
-                        AsterixRuntimeComponentsProvider.INSTANCE, AsterixRuntimeComponentsProvider.INSTANCE,
-                        AsterixRuntimeComponentsProvider.INSTANCE, proposeLinearizer(nestedKeyType.getTypeTag(),
-                                comparatorFactories.length)), false, NoOpOperationCallbackFactory.INSTANCE);
+                        RTreePolicyType.RTREE, primaryComparatorFactories,
+                        AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
+                        AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
+                        AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
+                        AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER, proposeLinearizer(
+                                nestedKeyType.getTypeTag(), comparatorFactories.length)), false, searchCallbackFactory);
         return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(rtreeSearchOp, spPc.second);
     }
 
@@ -515,20 +551,25 @@
         Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadata
                 .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, indexName);
         IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
+
+        //TODO
+        //figure out the right behavior of the bulkload and then give the right callback
+        //(ex. what's the expected behavior when there is an error during bulkload?)
         TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
                 appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(),
                 splitsAndConstraint.first, typeTraits, comparatorFactories, fieldPermutation,
                 GlobalConfig.DEFAULT_BTREE_FILL_FACTOR, false, new LSMBTreeDataflowHelperFactory(
-                        AsterixRuntimeComponentsProvider.INSTANCE, AsterixRuntimeComponentsProvider.INSTANCE,
-                        AsterixRuntimeComponentsProvider.INSTANCE, AsterixRuntimeComponentsProvider.INSTANCE),
-                NoOpOperationCallbackFactory.INSTANCE);
+                        AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
+                        AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
+                        AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
+                        AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER), NoOpOperationCallbackFactory.INSTANCE);
         return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeBulkLoad, splitsAndConstraint.second);
     }
 
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInsertOrDeleteRuntime(IndexOperation indexOp,
-            IDataSource<AqlSourceId> dataSource, IOperatorSchema propagatedSchema, List<LogicalVariable> keys,
-            LogicalVariable payload, RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec)
-            throws AlgebricksException {
+            IDataSource<AqlSourceId> dataSource, IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv,
+            List<LogicalVariable> keys, LogicalVariable payload, RecordDescriptor recordDesc, JobGenContext context,
+            JobSpecification spec) throws AlgebricksException {
         String datasetName = dataSource.getId().getDatasetName();
         int numKeys = keys.size();
         // Move key fields to front.
@@ -558,39 +599,55 @@
                 itemType, context.getBinaryComparatorFactoryProvider());
         Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadata
                 .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, indexName);
+
+        //prepare callback
+        JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
+        int datasetId = dataset.getDatasetId();
+        int[] primaryKeyFields = new int[numKeys];
+        for (i = 0; i < numKeys; i++) {
+            primaryKeyFields[i] = i;
+        }
+        IBinaryHashFunctionFactory[] entityIdFieldHashFunctionFactories = JobGenHelper
+                .variablesToBinaryHashFunctionFactories(keys, typeEnv, context);
+        TransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider();
+        PrimaryIndexModificationOperationCallbackFactory modificationCallbackFactory = new PrimaryIndexModificationOperationCallbackFactory(
+                jobId, datasetId, primaryKeyFields, entityIdFieldHashFunctionFactories, txnSubsystemProvider, indexOp,
+                ResourceType.LSM_BTREE);
+
         TreeIndexInsertUpdateDeleteOperatorDescriptor btreeBulkLoad = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
                 spec, recordDesc, appContext.getStorageManagerInterface(),
                 appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits,
                 comparatorFactories, fieldPermutation, indexOp, new LSMBTreeDataflowHelperFactory(
-                        AsterixRuntimeComponentsProvider.INSTANCE, AsterixRuntimeComponentsProvider.INSTANCE,
-                        AsterixRuntimeComponentsProvider.INSTANCE, AsterixRuntimeComponentsProvider.INSTANCE), null,
-                NoOpOperationCallbackFactory.INSTANCE);
+                        AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
+                        AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
+                        AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
+                        AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER), null, modificationCallbackFactory);
         return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeBulkLoad, splitsAndConstraint.second);
     }
 
     @Override
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInsertRuntime(
-            IDataSource<AqlSourceId> dataSource, IOperatorSchema propagatedSchema, List<LogicalVariable> keys,
-            LogicalVariable payload, RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec)
-            throws AlgebricksException {
-        return getInsertOrDeleteRuntime(IndexOperation.INSERT, dataSource, propagatedSchema, keys, payload, recordDesc,
-                context, spec);
+            IDataSource<AqlSourceId> dataSource, IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv,
+            List<LogicalVariable> keys, LogicalVariable payload, RecordDescriptor recordDesc, JobGenContext context,
+            JobSpecification spec) throws AlgebricksException {
+        return getInsertOrDeleteRuntime(IndexOperation.INSERT, dataSource, propagatedSchema, typeEnv, keys, payload,
+                recordDesc, context, spec);
     }
 
     @Override
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getDeleteRuntime(
-            IDataSource<AqlSourceId> dataSource, IOperatorSchema propagatedSchema, List<LogicalVariable> keys,
-            LogicalVariable payload, RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec)
-            throws AlgebricksException {
-        return getInsertOrDeleteRuntime(IndexOperation.DELETE, dataSource, propagatedSchema, keys, payload, recordDesc,
-                context, spec);
+            IDataSource<AqlSourceId> dataSource, IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv,
+            List<LogicalVariable> keys, LogicalVariable payload, RecordDescriptor recordDesc, JobGenContext context,
+            JobSpecification spec) throws AlgebricksException {
+        return getInsertOrDeleteRuntime(IndexOperation.DELETE, dataSource, propagatedSchema, typeEnv, keys, payload,
+                recordDesc, context, spec);
     }
 
-    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexInsertOrDeleteRuntime(IndexOperation indexOp,
-            IDataSourceIndex<String, AqlSourceId> dataSourceIndex, IOperatorSchema propagatedSchema,
-            IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys,
-            List<LogicalVariable> secondaryKeys, ILogicalExpression filterExpr, RecordDescriptor recordDesc,
-            JobGenContext context, JobSpecification spec) throws AlgebricksException {
+    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexInsertOrDeleteRuntime(
+            IndexOperation indexOp, IDataSourceIndex<String, AqlSourceId> dataSourceIndex,
+            IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv,
+            List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys, ILogicalExpression filterExpr,
+            RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec) throws AlgebricksException {
         String indexName = dataSourceIndex.getId();
         String datasetName = dataSourceIndex.getDataSource().getId().getDatasetName();
         Dataset dataset = metadata.findDataset(datasetName);
@@ -601,12 +658,12 @@
         AsterixTupleFilterFactory filterFactory = createTupleFilterFactory(inputSchemas, typeEnv, filterExpr, context);
         switch (secondaryIndex.getIndexType()) {
             case BTREE: {
-                return getBTreeDmlRuntime(datasetName, indexName, propagatedSchema, primaryKeys, secondaryKeys,
-                        filterFactory, recordDesc, context, spec, indexOp);
+                return getBTreeDmlRuntime(datasetName, indexName, propagatedSchema, typeEnv, primaryKeys,
+                        secondaryKeys, filterFactory, recordDesc, context, spec, indexOp);
             }
             case RTREE: {
-                return getRTreeDmlRuntime(datasetName, indexName, propagatedSchema, primaryKeys, secondaryKeys,
-                        filterFactory, recordDesc, context, spec, indexOp);
+                return getRTreeDmlRuntime(datasetName, indexName, propagatedSchema, typeEnv, primaryKeys,
+                        secondaryKeys, filterFactory, recordDesc, context, spec, indexOp);
             }
             default: {
                 throw new AlgebricksException("Insert and delete not implemented for index type: "
@@ -621,8 +678,8 @@
             IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys,
             List<LogicalVariable> secondaryKeys, ILogicalExpression filterExpr, RecordDescriptor recordDesc,
             JobGenContext context, JobSpecification spec) throws AlgebricksException {
-        return getIndexInsertOrDeleteRuntime(IndexOperation.INSERT, dataSourceIndex, propagatedSchema, inputSchemas, typeEnv,
-                primaryKeys, secondaryKeys, filterExpr, recordDesc, context, spec);
+        return getIndexInsertOrDeleteRuntime(IndexOperation.INSERT, dataSourceIndex, propagatedSchema, inputSchemas,
+                typeEnv, primaryKeys, secondaryKeys, filterExpr, recordDesc, context, spec);
     }
 
     @Override
@@ -631,8 +688,8 @@
             IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys,
             List<LogicalVariable> secondaryKeys, ILogicalExpression filterExpr, RecordDescriptor recordDesc,
             JobGenContext context, JobSpecification spec) throws AlgebricksException {
-        return getIndexInsertOrDeleteRuntime(IndexOperation.DELETE, dataSourceIndex, propagatedSchema, inputSchemas, typeEnv,
-                primaryKeys, secondaryKeys, filterExpr, recordDesc, context, spec);
+        return getIndexInsertOrDeleteRuntime(IndexOperation.DELETE, dataSourceIndex, propagatedSchema, inputSchemas,
+                typeEnv, primaryKeys, secondaryKeys, filterExpr, recordDesc, context, spec);
     }
 
     private AsterixTupleFilterFactory createTupleFilterFactory(IOperatorSchema[] inputSchemas,
@@ -649,9 +706,10 @@
     }
 
     private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getBTreeDmlRuntime(String datasetName,
-            String indexName, IOperatorSchema propagatedSchema, List<LogicalVariable> primaryKeys,
-            List<LogicalVariable> secondaryKeys, AsterixTupleFilterFactory filterFactory, RecordDescriptor recordDesc,
-            JobGenContext context, JobSpecification spec, IndexOperation indexOp) throws AlgebricksException {
+            String indexName, IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv,
+            List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
+            AsterixTupleFilterFactory filterFactory, RecordDescriptor recordDesc, JobGenContext context,
+            JobSpecification spec, IndexOperation indexOp) throws AlgebricksException {
         int numKeys = primaryKeys.size() + secondaryKeys.size();
         // generate field permutations
         int[] fieldPermutation = new int[numKeys];
@@ -703,20 +761,40 @@
         IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
         Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadata
                 .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, indexName);
+
+        //prepare callback
+        JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
+        int datasetId = dataset.getDatasetId();
+        int[] primaryKeyFields = new int[primaryKeys.size()];
+        i = 0;
+        for (LogicalVariable varKey : primaryKeys) {
+            int idx = propagatedSchema.findVariable(varKey);
+            primaryKeyFields[i] = idx;
+            i++;
+        }
+        IBinaryHashFunctionFactory[] entityIdFieldHashFunctionFactories = JobGenHelper
+                .variablesToBinaryHashFunctionFactories(primaryKeys, typeEnv, context);
+        TransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider();
+        SecondaryIndexModificationOperationCallbackFactory modificationCallbackFactory = new SecondaryIndexModificationOperationCallbackFactory(
+                jobId, datasetId, primaryKeyFields, entityIdFieldHashFunctionFactories, txnSubsystemProvider, indexOp,
+                ResourceType.LSM_BTREE);
+
         TreeIndexInsertUpdateDeleteOperatorDescriptor btreeBulkLoad = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
                 spec, recordDesc, appContext.getStorageManagerInterface(),
                 appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits,
                 comparatorFactories, fieldPermutation, indexOp, new LSMBTreeDataflowHelperFactory(
-                        AsterixRuntimeComponentsProvider.INSTANCE, AsterixRuntimeComponentsProvider.INSTANCE,
-                        AsterixRuntimeComponentsProvider.INSTANCE, AsterixRuntimeComponentsProvider.INSTANCE),
-                filterFactory, NoOpOperationCallbackFactory.INSTANCE);
+                        AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
+                        AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
+                        AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
+                        AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER), filterFactory, modificationCallbackFactory);
         return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeBulkLoad, splitsAndConstraint.second);
     }
 
     private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getRTreeDmlRuntime(String datasetName,
-            String indexName, IOperatorSchema propagatedSchema, List<LogicalVariable> primaryKeys,
-            List<LogicalVariable> secondaryKeys, AsterixTupleFilterFactory filterFactory, RecordDescriptor recordDesc,
-            JobGenContext context, JobSpecification spec, IndexOperation indexOp) throws AlgebricksException {
+            String indexName, IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv,
+            List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
+            AsterixTupleFilterFactory filterFactory, RecordDescriptor recordDesc, JobGenContext context,
+            JobSpecification spec, IndexOperation indexOp) throws AlgebricksException {
         Dataset dataset = metadata.findDataset(datasetName);
         String itemTypeName = dataset.getItemTypeName();
         IAType itemType = metadata.findType(itemTypeName);
@@ -768,15 +846,35 @@
         IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
         Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadata
                 .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, indexName);
+
+        //prepare callback
+        JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
+        int datasetId = dataset.getDatasetId();
+        int[] primaryKeyFields = new int[numKeys];
+        i = 0;
+        for (LogicalVariable varKey : primaryKeys) {
+            int idx = propagatedSchema.findVariable(varKey);
+            primaryKeyFields[i] = idx;
+            i++;
+        }
+        IBinaryHashFunctionFactory[] entityIdFieldHashFunctionFactories = JobGenHelper
+                .variablesToBinaryHashFunctionFactories(primaryKeys, typeEnv, context);
+        TransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider();
+        SecondaryIndexModificationOperationCallbackFactory modificationCallbackFactory = new SecondaryIndexModificationOperationCallbackFactory(
+                jobId, datasetId, primaryKeyFields, entityIdFieldHashFunctionFactories, txnSubsystemProvider, indexOp,
+                ResourceType.LSM_RTREE);
+
         TreeIndexInsertUpdateDeleteOperatorDescriptor rtreeUpdate = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
                 spec, recordDesc, appContext.getStorageManagerInterface(),
                 appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits,
                 comparatorFactories, fieldPermutation, indexOp, new LSMRTreeDataflowHelperFactory(
                         valueProviderFactories, RTreePolicyType.RTREE, primaryComparatorFactories,
-                        AsterixRuntimeComponentsProvider.INSTANCE, AsterixRuntimeComponentsProvider.INSTANCE,
-                        AsterixRuntimeComponentsProvider.INSTANCE, AsterixRuntimeComponentsProvider.INSTANCE,
-                        proposeLinearizer(nestedKeyType.getTypeTag(), comparatorFactories.length)), filterFactory,
-                NoOpOperationCallbackFactory.INSTANCE);
+                        AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
+                        AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
+                        AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
+                        AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER, proposeLinearizer(
+                                nestedKeyType.getTypeTag(), comparatorFactories.length)), filterFactory,
+                modificationCallbackFactory);
         return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(rtreeUpdate, splitsAndConstraint.second);
     }
 
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/job/listener/JobEventListenerFactory.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/job/listener/JobEventListenerFactory.java
index 471a0b8..45e19c6 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/job/listener/JobEventListenerFactory.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/job/listener/JobEventListenerFactory.java
@@ -22,6 +22,10 @@
         this.jobId = jobId;
         this.transactionalWrite = transactionalWrite;
     }
+    
+    public JobId getJobId() {
+        return jobId;
+    }
 
     @Override
     public IJobletEventListener createListener(final IHyracksJobletContext jobletContext) {
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java
index 8a38526..a51da07 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java
@@ -22,6 +22,11 @@
 public class LSMBTreeIOOperationCallbackFactory implements ILSMIOOperationCallbackFactory {
 
     private static final long serialVersionUID = 1L;
+    
+    public static LSMBTreeIOOperationCallbackFactory INSTANCE = new LSMBTreeIOOperationCallbackFactory();
+    
+    private LSMBTreeIOOperationCallbackFactory() {
+    }
 
     @Override
     public ILSMIOOperationCallback createIOOperationCallback(Object syncObj) {
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java
index d8b796c..790c60c 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java
@@ -23,6 +23,11 @@
 
     private static final long serialVersionUID = 1L;
 
+    public static LSMInvertedIndexIOOperationCallbackFactory INSTANCE = new LSMInvertedIndexIOOperationCallbackFactory();
+    
+    private LSMInvertedIndexIOOperationCallbackFactory() {
+    }
+    
     @Override
     public ILSMIOOperationCallback createIOOperationCallback(Object syncObj) {
         return new LSMInvertedIndexIOOperationCallback((IndexOperationTracker) syncObj);
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java
index 2a8af33..4b47a95 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java
@@ -22,6 +22,11 @@
 public class LSMRTreeIOOperationCallbackFactory implements ILSMIOOperationCallbackFactory {
 
     private static final long serialVersionUID = 1L;
+    
+    public static LSMRTreeIOOperationCallbackFactory INSTANCE = new LSMRTreeIOOperationCallbackFactory();
+    
+    private LSMRTreeIOOperationCallbackFactory() {
+    }
 
     @Override
     public ILSMIOOperationCallback createIOOperationCallback(Object syncObj) {
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/AbstractOperationCallback.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/AbstractOperationCallback.java
index 066a611..1f018b4 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/AbstractOperationCallback.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/AbstractOperationCallback.java
@@ -20,6 +20,7 @@
 import edu.uci.ics.asterix.transaction.management.service.transaction.FieldsHashValueGenerator;
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionContext;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunction;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
 
 public abstract class AbstractOperationCallback {
@@ -29,12 +30,20 @@
     protected final ILockManager lockManager;
     protected final TransactionContext txnCtx;
     protected int transactorLocalNumActiveOperations = 0;
-    
-    public AbstractOperationCallback(DatasetId datasetId, int[] primaryKeyFields,
-            IBinaryHashFunction[] primaryKeyHashFunctions, TransactionContext txnCtx, ILockManager lockManager) {
-        this.datasetId = datasetId;
+
+    public AbstractOperationCallback(int datasetId, int[] primaryKeyFields,
+            IBinaryHashFunctionFactory[] primaryKeyHashFunctionFactories, TransactionContext txnCtx,
+            ILockManager lockManager) {
+        this.datasetId = new DatasetId(datasetId);
         this.primaryKeyFields = primaryKeyFields;
-        this.primaryKeyHashFunctions = primaryKeyHashFunctions;
+        if (primaryKeyHashFunctionFactories != null) {
+            this.primaryKeyHashFunctions = new IBinaryHashFunction[primaryKeyHashFunctionFactories.length];
+            for (int i = 0; i < primaryKeyHashFunctionFactories.length; ++i) {
+                this.primaryKeyHashFunctions[i] = primaryKeyHashFunctionFactories[i].createBinaryHashFunction();
+            }
+        } else {
+            this.primaryKeyHashFunctions = null;
+        }
         this.txnCtx = txnCtx;
         this.lockManager = lockManager;
     }
@@ -43,19 +52,19 @@
             IBinaryHashFunction[] primaryKeyHashFunctions) {
         return FieldsHashValueGenerator.computeFieldsHashValue(tuple, primaryKeyFields, primaryKeyHashFunctions);
     }
-    
+
     public TransactionContext getTransactionContext() {
         return txnCtx;
     }
-    
+
     public int getLocalNumActiveOperations() {
         return transactorLocalNumActiveOperations;
     }
-    
+
     public void incrementLocalNumActiveOperations() {
         transactorLocalNumActiveOperations++;
     }
-    
+
     public void decrementLocalNumActiveOperations() {
         transactorLocalNumActiveOperations--;
     }
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/AbstractOperationCallbackFactory.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/AbstractOperationCallbackFactory.java
index 8d11d04..84c747b 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/AbstractOperationCallbackFactory.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/AbstractOperationCallbackFactory.java
@@ -15,24 +15,28 @@
 
 package edu.uci.ics.asterix.transaction.management.opcallbacks;
 
-import edu.uci.ics.asterix.transaction.management.service.transaction.DatasetId;
+import java.io.Serializable;
+
 import edu.uci.ics.asterix.transaction.management.service.transaction.ITransactionSubsystemProvider;
 import edu.uci.ics.asterix.transaction.management.service.transaction.JobId;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunction;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
 
-public abstract class AbstractOperationCallbackFactory {
+public abstract class AbstractOperationCallbackFactory implements Serializable{
     protected final JobId jobId;
-    protected final DatasetId datasetId;
+    protected final int datasetId;
     protected final int[] primaryKeyFields;
-    protected final IBinaryHashFunction[] primaryKeyHashFunctions;
+    protected final IBinaryHashFunctionFactory[] primaryKeyHashFunctionFactories;
     protected final ITransactionSubsystemProvider txnSubsystemProvider;
+    protected final byte resourceType;
 
-    public AbstractOperationCallbackFactory(JobId jobId, DatasetId datasetId, int[] primaryKeyFields,
-            IBinaryHashFunction[] primaryKeyHashFunctions, ITransactionSubsystemProvider txnSubsystemProvider) {
+    public AbstractOperationCallbackFactory(JobId jobId, int datasetId, int[] primaryKeyFields,
+            IBinaryHashFunctionFactory[] primaryKeyHashFunctionFactories,
+            ITransactionSubsystemProvider txnSubsystemProvider, byte resourceType) {
         this.jobId = jobId;
         this.datasetId = datasetId;
         this.primaryKeyFields = primaryKeyFields;
-        this.primaryKeyHashFunctions = primaryKeyHashFunctions;
+        this.primaryKeyHashFunctionFactories = primaryKeyHashFunctionFactories;
         this.txnSubsystemProvider = txnSubsystemProvider;
+        this.resourceType = resourceType;
     }
 }
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/IndexOperationTracker.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/IndexOperationTracker.java
index a34e6dd..67bf68a 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/IndexOperationTracker.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/IndexOperationTracker.java
@@ -16,13 +16,13 @@
 package edu.uci.ics.asterix.transaction.management.opcallbacks;
 
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallback;
 import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
-import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
 
 public class IndexOperationTracker implements ILSMOperationTracker {
@@ -38,12 +38,16 @@
         this.index = index;
         accessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
                 NoOpOperationCallback.INSTANCE);
-        ioOpCallback = ioOpCallbackFactory.createIOOperationCallback(this);
+        if (ioOpCallbackFactory != null) {
+            ioOpCallback = ioOpCallbackFactory.createIOOperationCallback(this);
+        } else {
+            ioOpCallback = null;
+        }
     }
 
     @Override
-    public synchronized boolean beforeOperation(ILSMIndexOperationContext opCtx, boolean tryOperation)
-            throws HyracksDataException {
+    public synchronized boolean beforeOperation(ISearchOperationCallback searchCallback,
+            IModificationOperationCallback modificationCallback, boolean tryOperation) throws HyracksDataException {
         // Wait for pending flushes to complete.
         // If flushFlag is set, then the flush is queued to occur by the last completing operation.
         // This operation should wait for that flush to occur before proceeding.
@@ -60,28 +64,32 @@
         numActiveOperations++;
 
         // Increment transactor-local active operations count.
-        AbstractOperationCallback opCallback = getOperationCallback(opCtx);
-        opCallback.incrementLocalNumActiveOperations();
-
+        AbstractOperationCallback opCallback = getOperationCallback(searchCallback, modificationCallback);
+        if (opCallback != null) {
+            opCallback.incrementLocalNumActiveOperations();
+        }
         return true;
     }
 
     @Override
-    public void afterOperation(ILSMIndexOperationContext opCtx) throws HyracksDataException {
+    public void afterOperation(ISearchOperationCallback searchCallback,
+            IModificationOperationCallback modificationCallback) throws HyracksDataException {
         // Searches are immediately considered complete, because they should not prevent the execution of flushes.
-        IndexOperation op = opCtx.getOperation();
-        if (op == IndexOperation.SEARCH || op == IndexOperation.DISKORDERSCAN) {
-            completeOperation(opCtx);
+        if (searchCallback != null) {
+            completeOperation(searchCallback, modificationCallback);
         }
     }
 
     @Override
-    public synchronized void completeOperation(ILSMIndexOperationContext opCtx) throws HyracksDataException {
+    public synchronized void completeOperation(ISearchOperationCallback searchCallback,
+            IModificationOperationCallback modificationCallback) throws HyracksDataException {
         numActiveOperations--;
 
         // Decrement transactor-local active operations count.
-        AbstractOperationCallback opCallback = getOperationCallback(opCtx);
-        opCallback.decrementLocalNumActiveOperations();
+        AbstractOperationCallback opCallback = getOperationCallback(searchCallback, modificationCallback);
+        if (opCallback != null) {
+            opCallback.decrementLocalNumActiveOperations();
+        }
 
         // If we need a flush, and this is the last completing operation, then schedule the flush.
         // Once the flush has completed notify all waiting operations.
@@ -90,11 +98,16 @@
         }
     }
 
-    private AbstractOperationCallback getOperationCallback(ILSMIndexOperationContext opCtx) {
-        if (opCtx.getSearchOperationCallback() != null) {
-            return (AbstractOperationCallback) opCtx.getSearchOperationCallback();
+    private AbstractOperationCallback getOperationCallback(ISearchOperationCallback searchCallback,
+            IModificationOperationCallback modificationCallback) {
+
+        if (searchCallback == NoOpOperationCallback.INSTANCE || modificationCallback == NoOpOperationCallback.INSTANCE) {
+            return null;
+        }
+        if (searchCallback != null) {
+            return (AbstractOperationCallback) searchCallback;
         } else {
-            return (AbstractOperationCallback) opCtx.getModificationCallback();
+            return (AbstractOperationCallback) modificationCallback;
         }
     }
 
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java
index 3c9f092..d3c7eb0 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java
@@ -18,12 +18,11 @@
 import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
 import edu.uci.ics.asterix.transaction.management.service.locking.ILockManager;
 import edu.uci.ics.asterix.transaction.management.service.logging.IndexLogger;
-import edu.uci.ics.asterix.transaction.management.service.transaction.DatasetId;
 import edu.uci.ics.asterix.transaction.management.service.transaction.IResourceManager.ResourceType;
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionContext;
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode;
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunction;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
 import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
@@ -38,14 +37,16 @@
         IModificationOperationCallback {
 
     protected final long resourceId;
+    protected final byte resourceType;
     protected final IndexOperation indexOp;
     protected final TransactionSubsystem txnSubsystem;
 
-    public PrimaryIndexModificationOperationCallback(DatasetId datasetId, int[] primaryKeyFields,
-            IBinaryHashFunction[] primaryKeyHashFunctions, TransactionContext txnCtx, ILockManager lockManager,
-            TransactionSubsystem txnSubsystem, long resourceId, IndexOperation indexOp) {
-        super(datasetId, primaryKeyFields, primaryKeyHashFunctions, txnCtx, lockManager);
+    public PrimaryIndexModificationOperationCallback(int datasetId, int[] primaryKeyFields,
+            IBinaryHashFunctionFactory[] primaryKeyHashFunctionFactories, TransactionContext txnCtx,
+            ILockManager lockManager, TransactionSubsystem txnSubsystem, long resourceId, byte resourceType, IndexOperation indexOp) {
+        super(datasetId, primaryKeyFields, primaryKeyHashFunctionFactories, txnCtx, lockManager);
         this.resourceId = resourceId;
+        this.resourceType = resourceType;
         this.indexOp = indexOp;
         this.txnSubsystem = txnSubsystem;
     }
@@ -62,11 +63,11 @@
 
     @Override
     public void found(ITupleReference before, ITupleReference after) throws HyracksDataException {
-        IndexLogger logger = txnSubsystem.getTreeLoggerRepository().getIndexLogger(resourceId, ResourceType.LSM_BTREE);
+        IndexLogger logger = txnSubsystem.getTreeLoggerRepository().getIndexLogger(resourceId, resourceType);
         int pkHash = computePrimaryKeyHashValue(after, primaryKeyFields, primaryKeyHashFunctions);
         LSMBTreeTupleReference lsmBTreeTuple = (LSMBTreeTupleReference) before;
         IndexOperation oldOp = IndexOperation.INSERT;
-        if (lsmBTreeTuple.isAntimatter()) {
+        if (lsmBTreeTuple != null && lsmBTreeTuple.isAntimatter()) {
             oldOp = IndexOperation.DELETE;
         }
         try {
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
index a11d00e..be972a7 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
@@ -16,13 +16,16 @@
 package edu.uci.ics.asterix.transaction.management.opcallbacks;
 
 import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
-import edu.uci.ics.asterix.transaction.management.service.transaction.DatasetId;
+import edu.uci.ics.asterix.transaction.management.resource.TransactionalResourceRepository;
+import edu.uci.ics.asterix.transaction.management.service.logging.IndexResourceManager;
+import edu.uci.ics.asterix.transaction.management.service.transaction.IResourceManager;
+import edu.uci.ics.asterix.transaction.management.service.transaction.IResourceManager.ResourceType;
 import edu.uci.ics.asterix.transaction.management.service.transaction.ITransactionSubsystemProvider;
 import edu.uci.ics.asterix.transaction.management.service.transaction.JobId;
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionContext;
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunction;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
 import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
@@ -38,24 +41,31 @@
     private static final long serialVersionUID = 1L;
     private final IndexOperation indexOp;
 
-    public PrimaryIndexModificationOperationCallbackFactory(JobId jobId, DatasetId datasetId, int[] primaryKeyFields,
-            IBinaryHashFunction[] primaryKeyHashFunctions, ITransactionSubsystemProvider txnSubsystemProvider,
-            IndexOperation indexOp) {
-        super(jobId, datasetId, primaryKeyFields, primaryKeyHashFunctions, txnSubsystemProvider);
+    public PrimaryIndexModificationOperationCallbackFactory(JobId jobId, int datasetId, int[] primaryKeyFields,
+            IBinaryHashFunctionFactory[] primaryKeyHashFunctionFactories,
+            ITransactionSubsystemProvider txnSubsystemProvider, IndexOperation indexOp, byte resourceType) {
+        super(jobId, datasetId, primaryKeyFields, primaryKeyHashFunctionFactories, txnSubsystemProvider, resourceType);
         this.indexOp = indexOp;
     }
 
     @Override
-    public IModificationOperationCallback createModificationOperationCallback(long resourceId, IHyracksTaskContext ctx)
-            throws HyracksDataException {
+    public IModificationOperationCallback createModificationOperationCallback(long resourceId, Object resource,
+            IHyracksTaskContext ctx) throws HyracksDataException {
         TransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
-        ILSMIndex index = (ILSMIndex) txnSubsystem.getTransactionalResourceRepository().getTransactionalResource(
-                resourceId);
+        TransactionalResourceRepository txnResourceRepository = txnSubsystem.getTransactionalResourceRepository();
+        ILSMIndex index = (ILSMIndex) txnResourceRepository.getTransactionalResource(resourceId);
+
+        //register the resource if it is not registered
+        if (index == null) {
+            txnSubsystem.getTransactionalResourceRepository().registerTransactionalResource(resourceId, resource);
+            index = (ILSMIndex) resource;
+        }
+
         try {
             TransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jobId);
             IModificationOperationCallback modCallback = new PrimaryIndexModificationOperationCallback(datasetId,
-                    primaryKeyFields, primaryKeyHashFunctions, txnCtx, txnSubsystem.getLockManager(), txnSubsystem,
-                    resourceId, indexOp);
+                    primaryKeyFields, primaryKeyHashFunctionFactories, txnCtx, txnSubsystem.getLockManager(),
+                    txnSubsystem, resourceId, resourceType, indexOp);
             txnCtx.registerIndexAndCallback(index, (AbstractOperationCallback) modCallback);
             return modCallback;
         } catch (ACIDException e) {
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallback.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallback.java
index d055126..85e8980 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallback.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallback.java
@@ -17,10 +17,9 @@
 
 import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
 import edu.uci.ics.asterix.transaction.management.service.locking.ILockManager;
-import edu.uci.ics.asterix.transaction.management.service.transaction.DatasetId;
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionContext;
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunction;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
 import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallback;
@@ -30,9 +29,10 @@
  */
 public class PrimaryIndexSearchOperationCallback extends AbstractOperationCallback implements ISearchOperationCallback {
 
-    public PrimaryIndexSearchOperationCallback(DatasetId datasetId, int[] entityIdFields,
-            IBinaryHashFunction[] entityIdFieldHashFunctions, ILockManager lockManager, TransactionContext txnCtx) {
-        super(datasetId, entityIdFields, entityIdFieldHashFunctions, txnCtx, lockManager);
+    public PrimaryIndexSearchOperationCallback(int datasetId, int[] entityIdFields,
+            IBinaryHashFunctionFactory[] entityIdFieldHashFunctionFactories, ILockManager lockManager,
+            TransactionContext txnCtx) {
+        super(datasetId, entityIdFields, entityIdFieldHashFunctionFactories, txnCtx, lockManager);
     }
 
     @Override
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java
index a124196..3e6dfcb 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java
@@ -16,13 +16,12 @@
 package edu.uci.ics.asterix.transaction.management.opcallbacks;
 
 import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
-import edu.uci.ics.asterix.transaction.management.service.transaction.DatasetId;
 import edu.uci.ics.asterix.transaction.management.service.transaction.ITransactionSubsystemProvider;
 import edu.uci.ics.asterix.transaction.management.service.transaction.JobId;
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionContext;
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunction;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallback;
 import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
@@ -32,9 +31,10 @@
 
     private static final long serialVersionUID = 1L;
 
-    public PrimaryIndexSearchOperationCallbackFactory(JobId jobId, DatasetId datasetId, int[] entityIdFields,
-            IBinaryHashFunction[] entityIdFieldHashFunctions, ITransactionSubsystemProvider txnSubsystemProvider) {
-        super(jobId, datasetId, entityIdFields, entityIdFieldHashFunctions, txnSubsystemProvider);
+    public PrimaryIndexSearchOperationCallbackFactory(JobId jobId, int datasetId, int[] entityIdFields,
+            IBinaryHashFunctionFactory[] entityIdFieldHashFunctionFactories,
+            ITransactionSubsystemProvider txnSubsystemProvider, byte resourceType) {
+        super(jobId, datasetId, entityIdFields, entityIdFieldHashFunctionFactories, txnSubsystemProvider, resourceType);
     }
 
     @Override
@@ -43,8 +43,8 @@
         TransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
         try {
             TransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jobId);
-            return new PrimaryIndexSearchOperationCallback(datasetId, primaryKeyFields, primaryKeyHashFunctions,
-                    txnSubsystem.getLockManager(), txnCtx);
+            return new PrimaryIndexSearchOperationCallback(datasetId, primaryKeyFields,
+                    primaryKeyHashFunctionFactories, txnSubsystem.getLockManager(), txnCtx);
         } catch (ACIDException e) {
             throw new HyracksDataException(e);
         }
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallback.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallback.java
index d2fa889..8c1aac0 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallback.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallback.java
@@ -18,11 +18,10 @@
 import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
 import edu.uci.ics.asterix.transaction.management.service.locking.ILockManager;
 import edu.uci.ics.asterix.transaction.management.service.logging.IndexLogger;
-import edu.uci.ics.asterix.transaction.management.service.transaction.DatasetId;
 import edu.uci.ics.asterix.transaction.management.service.transaction.IResourceManager.ResourceType;
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionContext;
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunction;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
 import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
@@ -37,15 +36,17 @@
         IModificationOperationCallback {
 
     protected final long resourceId;
+    protected final byte resourceType;
     protected final IndexOperation indexOp;
     protected final IndexOperation oldOp;
     protected final TransactionSubsystem txnSubsystem;
 
-    public SecondaryIndexModificationOperationCallback(DatasetId datasetId, int[] primaryKeyFields,
-            IBinaryHashFunction[] primaryKeyHashFunctions, TransactionContext txnCtx, ILockManager lockManager,
-            TransactionSubsystem txnSubsystem, long resourceId, IndexOperation indexOp) {
-        super(datasetId, primaryKeyFields, primaryKeyHashFunctions, txnCtx, lockManager);
+    public SecondaryIndexModificationOperationCallback(int datasetId, int[] primaryKeyFields,
+            IBinaryHashFunctionFactory[] primaryKeyHashFunctionFactories, TransactionContext txnCtx,
+            ILockManager lockManager, TransactionSubsystem txnSubsystem, long resourceId, byte resourceType, IndexOperation indexOp) {
+        super(datasetId, primaryKeyFields, primaryKeyHashFunctionFactories, txnCtx, lockManager);
         this.resourceId = resourceId;
+        this.resourceType = resourceType;
         this.indexOp = indexOp;
         oldOp = (indexOp == IndexOperation.DELETE) ? IndexOperation.INSERT : IndexOperation.DELETE;
         this.txnSubsystem = txnSubsystem;
@@ -58,7 +59,7 @@
 
     @Override
     public void found(ITupleReference before, ITupleReference after) throws HyracksDataException {
-        IndexLogger logger = txnSubsystem.getTreeLoggerRepository().getIndexLogger(resourceId, ResourceType.LSM_BTREE);
+        IndexLogger logger = txnSubsystem.getTreeLoggerRepository().getIndexLogger(resourceId, resourceType);
         int pkHash = computePrimaryKeyHashValue(after, primaryKeyFields, primaryKeyHashFunctions);
         try {
             logger.generateLogRecord(txnSubsystem, txnCtx, datasetId.getId(), pkHash, resourceId, indexOp, after,
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
index 51ea0bf..3fede95 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
@@ -16,17 +16,18 @@
 package edu.uci.ics.asterix.transaction.management.opcallbacks;
 
 import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
-import edu.uci.ics.asterix.transaction.management.service.transaction.DatasetId;
+import edu.uci.ics.asterix.transaction.management.resource.TransactionalResourceRepository;
 import edu.uci.ics.asterix.transaction.management.service.transaction.ITransactionSubsystemProvider;
 import edu.uci.ics.asterix.transaction.management.service.transaction.JobId;
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionContext;
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunction;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
 import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
 
 public class SecondaryIndexModificationOperationCallbackFactory extends AbstractOperationCallbackFactory implements
         IModificationOperationCallbackFactory {
@@ -34,21 +35,31 @@
     private static final long serialVersionUID = 1L;
     private final IndexOperation indexOp;
 
-    public SecondaryIndexModificationOperationCallbackFactory(JobId jobId, DatasetId datasetId, int[] primaryKeyFields,
-            IBinaryHashFunction[] primaryKeyHashFunctions, ITransactionSubsystemProvider txnSubsystemProvider,
-            IndexOperation indexOp) {
-        super(jobId, datasetId, primaryKeyFields, primaryKeyHashFunctions, txnSubsystemProvider);
+    public SecondaryIndexModificationOperationCallbackFactory(JobId jobId, int datasetId, int[] primaryKeyFields,
+            IBinaryHashFunctionFactory[] primaryKeyHashFunctionFactories,
+            ITransactionSubsystemProvider txnSubsystemProvider, IndexOperation indexOp, byte resourceType) {
+        super(jobId, datasetId, primaryKeyFields, primaryKeyHashFunctionFactories, txnSubsystemProvider, resourceType);
         this.indexOp = indexOp;
     }
 
     @Override
-    public IModificationOperationCallback createModificationOperationCallback(long resourceId, IHyracksTaskContext ctx)
+    public IModificationOperationCallback createModificationOperationCallback(long resourceId, Object resource, IHyracksTaskContext ctx)
             throws HyracksDataException {
         TransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
+        TransactionalResourceRepository txnResourceRepository = txnSubsystem.getTransactionalResourceRepository();
+        ILSMIndex index = (ILSMIndex) txnResourceRepository.getTransactionalResource(resourceId);
+
+        //register the resource if it is not registered
+        if (index == null) {
+            txnSubsystem.getTransactionalResourceRepository().registerTransactionalResource(resourceId, resource);
+            index = (ILSMIndex) resource;
+        }
+        
         try {
             TransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jobId);
             return new SecondaryIndexModificationOperationCallback(datasetId, primaryKeyFields,
-                    primaryKeyHashFunctions, txnCtx, txnSubsystem.getLockManager(), txnSubsystem, resourceId, indexOp);
+                    primaryKeyHashFunctionFactories, txnCtx, txnSubsystem.getLockManager(), txnSubsystem, resourceId,
+                    resourceType, indexOp);
         } catch (ACIDException e) {
             throw new HyracksDataException(e);
         }
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexSearchOperationCallback.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexSearchOperationCallback.java
index 67c9ba0..16aefee 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexSearchOperationCallback.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexSearchOperationCallback.java
@@ -26,7 +26,7 @@
         ISearchOperationCallback {
 
     public SecondaryIndexSearchOperationCallback() {
-        super(null, null, null, null, null);
+        super(-1, null, null, null, null);
     }
 
     @Override
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/TransactionalResourceRepository.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/TransactionalResourceRepository.java
index 5997bf1..fbaa12f 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/TransactionalResourceRepository.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/TransactionalResourceRepository.java
@@ -45,9 +45,6 @@
             if (resourceRepository.get(resourceId) == null) {
                 MutableResourceId newMutableResourceId = new MutableResourceId(resourceId);
                 resourceRepository.put(newMutableResourceId, resource);
-                
-                // wake up threads waiting for the resource
-                resourceRepository.notifyAll();
             }
         }
     }
@@ -56,9 +53,6 @@
         synchronized (resourceMgrRepository) {
             if (resourceMgrRepository.get(id) == null) {
                 resourceMgrRepository.put(id, resourceMgr);
-                
-                // wake up threads waiting for the resource manager
-                resourceMgrRepository.notifyAll();
             }
         }
     }
@@ -66,15 +60,6 @@
     public Object getTransactionalResource(long resourceId) {
         synchronized (resourceRepository) {
             mutableResourceId.setId(resourceId);
-            while (resourceRepository.get(mutableResourceId) == null) {
-                try {
-                    resourceRepository.wait();
-                } catch (InterruptedException ie) {
-                    ie.printStackTrace();
-                    break; // the thread might be interrupted due to other
-                    // failures occurring elsewhere, break from the loop
-                }
-            }
             return resourceRepository.get(mutableResourceId);
         }
     }
@@ -83,7 +68,5 @@
         synchronized (resourceMgrRepository) {
             return resourceMgrRepository.get(id);
         }
-
     }
-
 }
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IndexLogger.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IndexLogger.java
index 85d349b..1eeadff 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IndexLogger.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IndexLogger.java
@@ -28,8 +28,6 @@
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
 import edu.uci.ics.hyracks.storage.am.common.tuples.SimpleTupleWriter;
 
-
-
 public class IndexLogger implements ILogger, ICloseable {
 
     private final Map<Object, Object> jobId2ReusableLogContentObjectRepositoryMap = new ConcurrentHashMap<Object, Object>();
@@ -107,9 +105,15 @@
             reusableLogContentObject.setOldValue(oldValue);
         }
 
-        int logContentSize = 4/*TupleFieldCount*/+ 1/*NewOperation*/+ 4/*newValueLength*/+ tupleWriter
-                .bytesRequired(newValue);
-        logContentSize += 1/*OldOperation*/+ 4/*oldValueLength*/+ tupleWriter.bytesRequired(oldValue);
+        int logContentSize = 4/*TupleFieldCount*/+ 1/*NewOperation*/+ 4/*newValueLength*/;
+        if (newValue != null) {
+            logContentSize += tupleWriter.bytesRequired(newValue);
+        }
+
+        logContentSize += 1/*OldOperation*/+ 4/*oldValueLength*/;
+        if (oldValue != null) {
+            logContentSize += tupleWriter.bytesRequired(oldValue);
+        }
 
         txnSubsystem.getLogManager().log(LogType.UPDATE, context, datasetId, PKHashValue, resourceId, resourceType,
                 logContentSize, reusableLogContentObject, this, reusableLogContentObject.getLogicalLogLocator());
@@ -132,14 +136,18 @@
         offset += 1;
 
         //new tuple size
-        tupleSize = tupleWriter.bytesRequired(reusableLogContentObject.getNewValue());
+        if (reusableLogContentObject.getNewValue() != null) {
+            tupleSize = tupleWriter.bytesRequired(reusableLogContentObject.getNewValue());
+        }
         (logicalLogLocator.getBuffer()).writeInt(logicalLogLocator.getMemoryOffset() + offset, tupleSize);
         offset += 4;
 
         //new tuple
-        tupleWriter.writeTuple(reusableLogContentObject.getNewValue(), logicalLogLocator.getBuffer().getArray(),
-                logicalLogLocator.getMemoryOffset() + offset);
-        offset += tupleSize;
+        if (tupleSize != 0) {
+            tupleWriter.writeTuple(reusableLogContentObject.getNewValue(), logicalLogLocator.getBuffer().getArray(),
+                    logicalLogLocator.getMemoryOffset() + offset);
+            offset += tupleSize;
+        }
 
         if (resourceType == ResourceType.LSM_BTREE) {
             //old operation
@@ -149,13 +157,19 @@
 
             if (reusableLogContentObject.getOldOperation() != IndexOperation.NOOP) {
                 //old tuple size
-                tupleSize = tupleWriter.bytesRequired(reusableLogContentObject.getOldValue());
+                if (reusableLogContentObject.getOldValue() != null) {
+                    tupleSize = tupleWriter.bytesRequired(reusableLogContentObject.getOldValue());
+                } else {
+                    tupleSize = 0;
+                }
                 (logicalLogLocator.getBuffer()).writeInt(logicalLogLocator.getMemoryOffset() + offset, tupleSize);
                 offset += 4;
 
-                //old tuple
-                tupleWriter.writeTuple(reusableLogContentObject.getNewValue(),
-                        logicalLogLocator.getBuffer().getArray(), logicalLogLocator.getMemoryOffset() + offset);
+                if (tupleSize != 0) {
+                    //old tuple
+                    tupleWriter.writeTuple(reusableLogContentObject.getNewValue(), logicalLogLocator.getBuffer()
+                            .getArray(), logicalLogLocator.getMemoryOffset() + offset);
+                }
             }
         }
     }
@@ -169,7 +183,7 @@
     public void preLog(TransactionContext context, ReusableLogContentObject reusableLogContentObject)
             throws ACIDException {
     }
-    
+
     /**
      * Represents a utility class for generating log records corresponding to
      * operations on a ITreeIndex implementation. A TreeLogger instance is thread
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
index 77a7b1b..9a097d6 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
@@ -27,14 +27,13 @@
 import java.util.logging.Logger;
 
 import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
-import edu.uci.ics.asterix.transaction.management.resource.TransactionalResourceRepository;
 import edu.uci.ics.asterix.transaction.management.service.logging.FileUtil;
 import edu.uci.ics.asterix.transaction.management.service.logging.IBuffer;
 import edu.uci.ics.asterix.transaction.management.service.logging.ILogCursor;
 import edu.uci.ics.asterix.transaction.management.service.logging.ILogFilter;
 import edu.uci.ics.asterix.transaction.management.service.logging.ILogManager;
 import edu.uci.ics.asterix.transaction.management.service.logging.ILogRecordHelper;
-import edu.uci.ics.asterix.transaction.management.service.logging.LogActionType;
+import edu.uci.ics.asterix.transaction.management.service.logging.IndexResourceManager;
 import edu.uci.ics.asterix.transaction.management.service.logging.LogRecordHelper;
 import edu.uci.ics.asterix.transaction.management.service.logging.LogType;
 import edu.uci.ics.asterix.transaction.management.service.logging.LogUtil;
@@ -54,7 +53,7 @@
 public class RecoveryManager implements IRecoveryManager {
 
     private static final Logger LOGGER = Logger.getLogger(RecoveryManager.class.getName());
-    private TransactionSubsystem transactionProvider;
+    private TransactionSubsystem txnSubsystem;
 
     /**
      * A file at a known location that contains the LSN of the last log record
@@ -66,7 +65,7 @@
     private Map<Long, List<PhysicalLogLocator>> dirtyPagesTable;
 
     public RecoveryManager(TransactionSubsystem TransactionProvider) throws ACIDException {
-        this.transactionProvider = TransactionProvider;
+        this.txnSubsystem = TransactionProvider;
         try {
             FileUtil.createFileIfNotExists(checkpoint_record_file);
         } catch (IOException ioe) {
@@ -79,14 +78,14 @@
     }
 
     private PhysicalLogLocator getBeginRecoveryLSN() throws ACIDException {
-        return new PhysicalLogLocator(0, transactionProvider.getLogManager());
+        return new PhysicalLogLocator(0, txnSubsystem.getLogManager());
     }
 
     /**
      * TODO:This method is currently not implemented completely.
      */
     public SystemState startRecovery(boolean synchronous) throws IOException, ACIDException {
-        ILogManager logManager = transactionProvider.getLogManager();
+        ILogManager logManager = txnSubsystem.getLogManager();
         state = SystemState.RECOVERING;
         transactionTable = new HashMap<Long, TransactionTableEntry>();
         dirtyPagesTable = new HashMap<Long, List<PhysicalLogLocator>>();
@@ -110,13 +109,16 @@
                     break;
                 }
                 byte resourceMgrId = parser.getResourceMgrId(memLSN);
-                IResourceManager resourceMgr = transactionProvider.getTransactionalResourceRepository()
+                IResourceManager resourceMgr = txnSubsystem.getTransactionalResourceRepository()
                         .getTransactionalResourceMgr(resourceMgrId);
+                //register resourceMgr if it is not registered.
                 if (resourceMgr == null) {
-                    throw new ACIDException("unknown resource mgr with id " + resourceMgrId);
-                } else {
-                    resourceMgr.redo(parser, memLSN);
+                    resourceMgr = new IndexResourceManager(resourceMgrId, txnSubsystem);
+                    txnSubsystem.getTransactionalResourceRepository().registerTransactionalResourceManager(
+                            resourceMgrId, resourceMgr);
                 }
+                resourceMgr.redo(parser, memLSN);
+
                 writeCheckpointRecord(memLSN.getLsn());
             }
             state = SystemState.HEALTHY;
@@ -162,7 +164,7 @@
      */
     @Override
     public void rollbackTransaction(TransactionContext txnContext) throws ACIDException {
-        ILogManager logManager = transactionProvider.getLogManager();
+        ILogManager logManager = txnSubsystem.getLogManager();
         ILogRecordHelper parser = logManager.getLogRecordHelper();
 
         // Obtain the last log record written by the transaction
@@ -209,16 +211,21 @@
                     }
 
                     // look up the repository to get the resource manager
-                    IResourceManager resourceMgr = transactionProvider.getTransactionalResourceRepository()
+                    IResourceManager resourceMgr = txnSubsystem.getTransactionalResourceRepository()
                             .getTransactionalResourceMgr(resourceMgrId);
+
+                    // register resourceMgr if it is not registered. 
                     if (resourceMgr == null) {
-                        throw new ACIDException(txnContext, " unknown resource manager " + resourceMgrId);
-                    } else {
-                        resourceMgr.undo(parser, logLocator);
+                        resourceMgr = new IndexResourceManager(resourceMgrId, txnSubsystem);
+                        txnSubsystem.getTransactionalResourceRepository().registerTransactionalResourceManager(
+                                resourceMgrId, resourceMgr);
                     }
+                    resourceMgr.undo(parser, logLocator);
                     break;
+
                 case LogType.COMMIT:
                     throw new ACIDException(txnContext, " cannot rollback commmitted transaction");
+
                 default:
                     throw new ACIDException("Unsupported LogType: " + logType);
 
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/FieldsHashValueGenerator.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/FieldsHashValueGenerator.java
index ca5a71e..d4f15d9 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/FieldsHashValueGenerator.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/FieldsHashValueGenerator.java
@@ -27,6 +27,9 @@
             int fh = fieldHashFunctions[i].hash(tuple.getFieldData(primaryKeyFieldIdx),
                     tuple.getFieldStart(primaryKeyFieldIdx), tuple.getFieldLength(primaryKeyFieldIdx));
             h = h * 31 + fh;
+            if (h < 0) {
+                h = h*(-1);
+            }
         }
         return h;
     }
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/ITransactionSubsystemProvider.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/ITransactionSubsystemProvider.java
index b5eb7f8..857d8ae 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/ITransactionSubsystemProvider.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/ITransactionSubsystemProvider.java
@@ -15,8 +15,10 @@
 
 package edu.uci.ics.asterix.transaction.management.service.transaction;
 
+import java.io.Serializable;
+
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 
-public interface ITransactionSubsystemProvider {
+public interface ITransactionSubsystemProvider extends Serializable{
     public TransactionSubsystem getTransactionSubsystem(IHyracksTaskContext ctx);
 }
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
index ebcd506..bbade35 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
@@ -27,6 +27,8 @@
 import edu.uci.ics.asterix.transaction.management.service.logging.LogUtil;
 import edu.uci.ics.asterix.transaction.management.service.logging.LogicalLogLocator;
 import edu.uci.ics.asterix.transaction.management.service.transaction.ITransactionManager.TransactionState;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
 
 /**
@@ -93,6 +95,16 @@
         }
     }
 
+    public void decreaseActiveTransactionCountOnIndexes() throws HyracksDataException {
+        synchronized (indexes) {
+            for (int i = 0; i < indexes.size(); i++) {
+                ILSMIndex index = indexes.get(i);
+                IModificationOperationCallback modificationCallback = (IModificationOperationCallback) callbacks.get(i);
+                ((IndexOperationTracker) index.getOperationTracker()).completeOperation(null, modificationCallback);
+            }
+        }
+    }
+
     public void setTransactionType(TransactionType transactionType) {
         this.transactionType = transactionType;
     }
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
index b93425c..a22a7a2 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
@@ -21,6 +21,7 @@
 
 import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
 import edu.uci.ics.asterix.transaction.management.service.logging.LogType;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 
 /**
  * An implementation of the @see ITransactionManager interface that provides
@@ -96,6 +97,12 @@
             //for entity-level commit
             if (PKHashVal != -1) {
                 transactionProvider.getLockManager().unlock(datasetId, PKHashVal, txnContext, true);
+                try {
+                    //decrease the transaction reference count on index
+                    txnContext.decreaseActiveTransactionCountOnIndexes();
+                } catch (HyracksDataException e) {
+                    throw new ACIDException("failed to complete index operation", e);
+                }
                 return;
             }