checkpoint towards fixing LSN related issues(issue 591, 609, and 614) and more
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/InvertedIndexPOperator.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/InvertedIndexPOperator.java
index 56fc0e7..af503a6 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/InvertedIndexPOperator.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/InvertedIndexPOperator.java
@@ -220,17 +220,15 @@
                 dataflowHelperFactory = new LSMInvertedIndexDataflowHelperFactory(
                         new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
                         AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, new SecondaryIndexOperationTrackerProvider(
-                                LSMInvertedIndexIOOperationCallbackFactory.INSTANCE, dataset.getDatasetId()),
-                        AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                        AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                                dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                        LSMInvertedIndexIOOperationCallbackFactory.INSTANCE,
                         storageProperties.getBloomFilterFalsePositiveRate());
             } else {
                 dataflowHelperFactory = new PartitionedLSMInvertedIndexDataflowHelperFactory(
                         new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
                         AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, new SecondaryIndexOperationTrackerProvider(
-                                LSMInvertedIndexIOOperationCallbackFactory.INSTANCE, dataset.getDatasetId()),
-                        AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                        AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                                dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                        LSMInvertedIndexIOOperationCallbackFactory.INSTANCE,
                         storageProperties.getBloomFilterFalsePositiveRate());
             }
             LSMInvertedIndexSearchOperatorDescriptor invIndexSearchOp = new LSMInvertedIndexSearchOperatorDescriptor(
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContextProviderForRecovery.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContextProviderForRecovery.java
index ee9dfae..b528381 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContextProviderForRecovery.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContextProviderForRecovery.java
@@ -18,10 +18,8 @@
 
 import edu.uci.ics.asterix.common.transactions.IAsterixAppRuntimeContextProvider;
 import edu.uci.ics.asterix.common.transactions.ITransactionSubsystem;
-import edu.uci.ics.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
 import edu.uci.ics.hyracks.api.io.IIOManager;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManager;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackProvider;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
@@ -95,26 +93,6 @@
     }
 
     @Override
-    public ILSMIOOperationCallbackProvider getNoOpIOOperationCallbackProvider() {
-        return AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER;
-    }
-
-    @Override
-    public ILSMIOOperationCallbackProvider getLSMBTreeIOOperationCallbackProvider(boolean isPrimary) {
-        return AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER;
-    }
-
-    @Override
-    public ILSMIOOperationCallbackProvider getLSMRTreeIOOperationCallbackProvider() {
-        return AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER;
-    }
-
-    @Override
-    public ILSMIOOperationCallbackProvider getLSMInvertedIndexIOOperationCallbackProvider() {
-        return AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER;
-    }
-
-    @Override
     public ILSMOperationTracker getLSMBTreeOperationTracker(int datasetID) {
         return asterixAppRuntimeContext.getLSMBTreeOperationTracker(datasetID);
     }
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java
index 3e41a77..ccc5e85 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java
@@ -30,6 +30,7 @@
 import edu.uci.ics.asterix.common.context.AsterixVirtualBufferCacheProvider;
 import edu.uci.ics.asterix.common.exceptions.ACIDException;
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
 import edu.uci.ics.asterix.formats.base.IDataFormat;
 import edu.uci.ics.asterix.metadata.MetadataManager;
 import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
@@ -127,8 +128,7 @@
                 splitsAndConstraint.first, new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(
                         dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
                         new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
-                        AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                        AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                        AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
                         storageProperties.getBloomFilterFalsePositiveRate()));
         AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(specPrimary, primaryBtreeDrop,
                 splitsAndConstraint.second);
@@ -180,7 +180,7 @@
                 new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
                         AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, new PrimaryIndexOperationTrackerProvider(
                                 dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                        AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, storageProperties
+                        LSMBTreeIOOperationCallbackFactory.INSTANCE, storageProperties
                                 .getBloomFilterFalsePositiveRate()), localResourceFactoryProvider,
                 NoOpOperationCallbackFactory.INSTANCE);
         AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, indexCreateOp,
@@ -267,7 +267,7 @@
                             AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
                             new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
                             AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                            AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                            LSMBTreeIOOperationCallbackFactory.INSTANCE,
                             storageProperties.getBloomFilterFalsePositiveRate()), NoOpOperationCallbackFactory.INSTANCE);
             AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, btreeBulkLoad,
                     splitsAndConstraint.second);
@@ -293,7 +293,7 @@
                             AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
                             new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
                             AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                            AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                            LSMBTreeIOOperationCallbackFactory.INSTANCE,
                             storageProperties.getBloomFilterFalsePositiveRate()), NoOpOperationCallbackFactory.INSTANCE);
             AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, btreeBulkLoad,
                     splitsAndConstraint.second);
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/IndexOperations.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/IndexOperations.java
index de4d075..e090c2c 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/IndexOperations.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/IndexOperations.java
@@ -59,17 +59,16 @@
                 metadataProvider, physicalOptimizationConfig);
         return secondaryIndexCreator.buildLoadingJobSpec();
     }
-    
-    public static void addExternalDatasetFilesToMetadata(AqlMetadataProvider metadataProvider, 
-			Dataset dataset) throws AlgebricksException, MetadataException{
-			//get the file list
-			ArrayList<ExternalFile> files = metadataProvider.getExternalDatasetFiles(dataset);
-			//add files to metadata
-			for(int i=0; i < files.size(); i++)
-			{
-				MetadataManager.INSTANCE.addExternalFile(metadataProvider.getMetadataTxnContext(), files.get(i));
-			}
-	}
+
+    public static void addExternalDatasetFilesToMetadata(AqlMetadataProvider metadataProvider, Dataset dataset)
+            throws AlgebricksException, MetadataException {
+        //get the file list
+        ArrayList<ExternalFile> files = metadataProvider.getExternalDatasetFiles(dataset);
+        //add files to metadata
+        for (int i = 0; i < files.size(); i++) {
+            MetadataManager.INSTANCE.addExternalFile(metadataProvider.getMetadataTxnContext(), files.get(i));
+        }
+    }
 
     public static JobSpecification buildDropSecondaryIndexJobSpec(CompiledIndexDropStatement indexDropStmt,
             AqlMetadataProvider metadataProvider, Dataset dataset) throws AlgebricksException, MetadataException {
@@ -86,9 +85,8 @@
                 AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
                 splitsAndConstraint.first, new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(
                         dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                        new SecondaryIndexOperationTrackerProvider(LSMBTreeIOOperationCallbackFactory.INSTANCE, dataset
-                                .getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                        AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                        new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+                        AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
                         storageProperties.getBloomFilterFalsePositiveRate()));
         AlgebricksPartitionConstraintHelper
                 .setPartitionConstraintInJobSpec(spec, btreeDrop, splitsAndConstraint.second);
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryBTreeCreator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryBTreeCreator.java
index baf16de..55cccbc 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryBTreeCreator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryBTreeCreator.java
@@ -16,17 +16,16 @@
 
 import edu.uci.ics.asterix.common.api.ILocalResourceMetadata;
 import edu.uci.ics.asterix.common.config.AsterixStorageProperties;
-import edu.uci.ics.asterix.common.config.IAsterixPropertiesProvider;
 import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
+import edu.uci.ics.asterix.common.config.IAsterixPropertiesProvider;
 import edu.uci.ics.asterix.common.context.AsterixVirtualBufferCacheProvider;
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
 import edu.uci.ics.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
-import edu.uci.ics.asterix.transaction.management.opcallbacks.SecondaryIndexOperationTrackerProvider;
-
 import edu.uci.ics.asterix.external.data.operator.ExternalDataIndexingOperatorDescriptor;
 import edu.uci.ics.asterix.external.util.ExternalIndexHashPartitionComputerFactory;
 import edu.uci.ics.asterix.metadata.utils.DatasetUtils;
 import edu.uci.ics.asterix.runtime.formats.NonTaggedDataFormat;
+import edu.uci.ics.asterix.transaction.management.opcallbacks.SecondaryIndexOperationTrackerProvider;
 import edu.uci.ics.asterix.transaction.management.resource.LSMBTreeLocalResourceMetadata;
 import edu.uci.ics.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider;
 import edu.uci.ics.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
@@ -60,24 +59,23 @@
         super(physOptConf, propertiesProvider);
     }
 
-	@Override
-	public JobSpecification buildCreationJobSpec() throws AsterixException, AlgebricksException {
-		JobSpecification spec = JobSpecificationUtils.createJobSpecification();
-		AsterixStorageProperties storageProperties = propertiesProvider.getStorageProperties();
-		//prepare a LocalResourceMetadata which will be stored in NC's local resource repository
-		ILocalResourceMetadata localResourceMetadata = new LSMBTreeLocalResourceMetadata(
-				secondaryRecDesc.getTypeTraits(), secondaryComparatorFactories, secondaryBloomFilterKeyFields, true,
-				dataset.getDatasetId());
-		ILocalResourceFactoryProvider localResourceFactoryProvider = new PersistentLocalResourceFactoryProvider(
-				localResourceMetadata, LocalResource.LSMBTreeResource);
+    @Override
+    public JobSpecification buildCreationJobSpec() throws AsterixException, AlgebricksException {
+        JobSpecification spec = JobSpecificationUtils.createJobSpecification();
+        AsterixStorageProperties storageProperties = propertiesProvider.getStorageProperties();
+        //prepare a LocalResourceMetadata which will be stored in NC's local resource repository
+        ILocalResourceMetadata localResourceMetadata = new LSMBTreeLocalResourceMetadata(
+                secondaryRecDesc.getTypeTraits(), secondaryComparatorFactories, secondaryBloomFilterKeyFields, true,
+                dataset.getDatasetId());
+        ILocalResourceFactoryProvider localResourceFactoryProvider = new PersistentLocalResourceFactoryProvider(
+                localResourceMetadata, LocalResource.LSMBTreeResource);
         TreeIndexCreateOperatorDescriptor secondaryIndexCreateOp = new TreeIndexCreateOperatorDescriptor(spec,
                 AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
                 secondaryFileSplitProvider, secondaryRecDesc.getTypeTraits(), secondaryComparatorFactories,
                 secondaryBloomFilterKeyFields, new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(
                         dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                        new SecondaryIndexOperationTrackerProvider(LSMBTreeIOOperationCallbackFactory.INSTANCE, dataset
-                                .getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                        AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                        new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+                        AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
                         storageProperties.getBloomFilterFalsePositiveRate()), localResourceFactoryProvider,
                 NoOpOperationCallbackFactory.INSTANCE);
         AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, secondaryIndexCreateOp,
@@ -87,117 +85,112 @@
         return spec;
     }
 
+    @Override
+    public JobSpecification buildLoadingJobSpec() throws AsterixException, AlgebricksException {
+        if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
+            JobSpecification spec = JobSpecificationUtils.createJobSpecification();
+            Pair<ExternalDataIndexingOperatorDescriptor, AlgebricksPartitionConstraint> RIDScanOpAndConstraints;
+            AlgebricksMetaOperatorDescriptor asterixAssignOp;
+            try {
+                //create external indexing scan operator
+                RIDScanOpAndConstraints = createExternalIndexingOp(spec);
 
-	@Override
-	public JobSpecification buildLoadingJobSpec() throws AsterixException, AlgebricksException{
-		if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
-			JobSpecification spec = JobSpecificationUtils.createJobSpecification();
-			Pair<ExternalDataIndexingOperatorDescriptor, AlgebricksPartitionConstraint> RIDScanOpAndConstraints;
-			AlgebricksMetaOperatorDescriptor asterixAssignOp;
-			try
-			{
-				//create external indexing scan operator
-				RIDScanOpAndConstraints = createExternalIndexingOp(spec);
+                //create assign operator
+                asterixAssignOp = createExternalAssignOp(spec);
+                AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, asterixAssignOp,
+                        RIDScanOpAndConstraints.second);
+            } catch (Exception e) {
+                throw new AsterixException("Failed to create external index scanning and loading job");
+            }
 
-				//create assign operator
-				asterixAssignOp = createExternalAssignOp(spec);
-				AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, asterixAssignOp,
-						RIDScanOpAndConstraints.second);
-			}
-			catch(Exception e)
-			{
-				throw new AsterixException("Failed to create external index scanning and loading job");
-			}
+            // If any of the secondary fields are nullable, then add a select op that filters nulls.
+            AlgebricksMetaOperatorDescriptor selectOp = null;
+            if (anySecondaryKeyIsNullable) {
+                selectOp = createFilterNullsSelectOp(spec, numSecondaryKeys);
+            }
 
-			// If any of the secondary fields are nullable, then add a select op that filters nulls.
-			AlgebricksMetaOperatorDescriptor selectOp = null;
-			if (anySecondaryKeyIsNullable) {
-				selectOp = createFilterNullsSelectOp(spec, numSecondaryKeys);
-			}
+            // Sort by secondary keys.
+            ExternalSortOperatorDescriptor sortOp = createSortOp(spec, secondaryComparatorFactories, secondaryRecDesc,
+                    RIDScanOpAndConstraints.second);
+            AsterixStorageProperties storageProperties = propertiesProvider.getStorageProperties();
+            // Create secondary BTree bulk load op.
+            TreeIndexBulkLoadOperatorDescriptor secondaryBulkLoadOp = createTreeIndexBulkLoadOp(
+                    spec,
+                    numSecondaryKeys,
+                    new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
+                            AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                            new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+                            AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                            LSMBTreeIOOperationCallbackFactory.INSTANCE, storageProperties
+                                    .getBloomFilterFalsePositiveRate()), BTree.DEFAULT_FILL_FACTOR);
+            IBinaryHashFunctionFactory[] hashFactories = DatasetUtils.computeExternalDataKeysBinaryHashFunFactories(
+                    dataset, NonTaggedDataFormat.INSTANCE.getBinaryHashFunctionFactoryProvider());
 
-			// Sort by secondary keys.
-			ExternalSortOperatorDescriptor sortOp = createSortOp(spec, secondaryComparatorFactories, secondaryRecDesc,RIDScanOpAndConstraints.second);
-			AsterixStorageProperties storageProperties = propertiesProvider.getStorageProperties();
-			// Create secondary BTree bulk load op.
-        TreeIndexBulkLoadOperatorDescriptor secondaryBulkLoadOp = createTreeIndexBulkLoadOp(
-                spec,
-                numSecondaryKeys,
-                new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
-                        AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, new SecondaryIndexOperationTrackerProvider(
-                                LSMBTreeIOOperationCallbackFactory.INSTANCE, dataset.getDatasetId()),
-                        AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                        AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, storageProperties
-                                .getBloomFilterFalsePositiveRate()), BTree.DEFAULT_FILL_FACTOR);
-			IBinaryHashFunctionFactory[] hashFactories = DatasetUtils.computeExternalDataKeysBinaryHashFunFactories(dataset, NonTaggedDataFormat.INSTANCE.getBinaryHashFunctionFactoryProvider());
+            //select partitioning keys (always the first 2 after secondary keys)
+            int[] keys = new int[2];
+            keys[0] = numSecondaryKeys;
+            keys[1] = numSecondaryKeys + 1;
 
-			//select partitioning keys (always the first 2 after secondary keys)
-			int[] keys = new int[2];
-			keys[0] = numSecondaryKeys;
-			keys[1] = numSecondaryKeys + 1;
+            IConnectorDescriptor hashConn = new MToNPartitioningConnectorDescriptor(spec,
+                    new ExternalIndexHashPartitionComputerFactory(keys, hashFactories));
 
-			IConnectorDescriptor hashConn = new MToNPartitioningConnectorDescriptor(spec,
-					new ExternalIndexHashPartitionComputerFactory(keys, hashFactories));
+            spec.connect(new OneToOneConnectorDescriptor(spec), RIDScanOpAndConstraints.first, 0, asterixAssignOp, 0);
+            if (anySecondaryKeyIsNullable) {
+                spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, selectOp, 0);
+                spec.connect(hashConn, selectOp, 0, sortOp, 0);
+            } else {
+                spec.connect(hashConn, asterixAssignOp, 0, sortOp, 0);
+            }
+            spec.connect(new OneToOneConnectorDescriptor(spec), sortOp, 0, secondaryBulkLoadOp, 0);
+            spec.addRoot(secondaryBulkLoadOp);
+            spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+            return spec;
+        } else {
+            JobSpecification spec = JobSpecificationUtils.createJobSpecification();
 
-			spec.connect(new OneToOneConnectorDescriptor(spec), RIDScanOpAndConstraints.first, 0, asterixAssignOp, 0);
-			if (anySecondaryKeyIsNullable) {
-				spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, selectOp, 0);
-				spec.connect(hashConn, selectOp, 0, sortOp, 0);
-			} else {
-				spec.connect(hashConn, asterixAssignOp, 0, sortOp, 0);
-			}
-			spec.connect(new OneToOneConnectorDescriptor(spec), sortOp, 0, secondaryBulkLoadOp, 0);
-			spec.addRoot(secondaryBulkLoadOp);
-			spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
-			return spec;
-		}
-		else
-		{
-			JobSpecification spec = JobSpecificationUtils.createJobSpecification();
+            // Create dummy key provider for feeding the primary index scan. 
+            AbstractOperatorDescriptor keyProviderOp = createDummyKeyProviderOp(spec);
 
-			// Create dummy key provider for feeding the primary index scan. 
-			AbstractOperatorDescriptor keyProviderOp = createDummyKeyProviderOp(spec);
+            // Create primary index scan op.
+            BTreeSearchOperatorDescriptor primaryScanOp = createPrimaryIndexScanOp(spec);
 
-			// Create primary index scan op.
-			BTreeSearchOperatorDescriptor primaryScanOp = createPrimaryIndexScanOp(spec);
+            // Assign op.
+            AlgebricksMetaOperatorDescriptor asterixAssignOp = createAssignOp(spec, primaryScanOp, numSecondaryKeys);
 
-			// Assign op.
-			AlgebricksMetaOperatorDescriptor asterixAssignOp = createAssignOp(spec, primaryScanOp, numSecondaryKeys);
+            // If any of the secondary fields are nullable, then add a select op that filters nulls.
+            AlgebricksMetaOperatorDescriptor selectOp = null;
+            if (anySecondaryKeyIsNullable) {
+                selectOp = createFilterNullsSelectOp(spec, numSecondaryKeys);
+            }
 
-			// If any of the secondary fields are nullable, then add a select op that filters nulls.
-			AlgebricksMetaOperatorDescriptor selectOp = null;
-			if (anySecondaryKeyIsNullable) {
-				selectOp = createFilterNullsSelectOp(spec, numSecondaryKeys);
-			}
+            // Sort by secondary keys.
+            ExternalSortOperatorDescriptor sortOp = createSortOp(spec, secondaryComparatorFactories, secondaryRecDesc);
 
-			// Sort by secondary keys.
-			ExternalSortOperatorDescriptor sortOp = createSortOp(spec, secondaryComparatorFactories, secondaryRecDesc);
+            AsterixStorageProperties storageProperties = propertiesProvider.getStorageProperties();
+            // Create secondary BTree bulk load op.
+            TreeIndexBulkLoadOperatorDescriptor secondaryBulkLoadOp = createTreeIndexBulkLoadOp(
+                    spec,
+                    numSecondaryKeys,
+                    new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
+                            AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                            new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+                            AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                            LSMBTreeIOOperationCallbackFactory.INSTANCE, storageProperties
+                                    .getBloomFilterFalsePositiveRate()), BTree.DEFAULT_FILL_FACTOR);
 
-			AsterixStorageProperties storageProperties = propertiesProvider.getStorageProperties();
-			// Create secondary BTree bulk load op.
-        TreeIndexBulkLoadOperatorDescriptor secondaryBulkLoadOp = createTreeIndexBulkLoadOp(
-                spec,
-                numSecondaryKeys,
-                new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
-                        AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, new SecondaryIndexOperationTrackerProvider(
-                                LSMBTreeIOOperationCallbackFactory.INSTANCE, dataset.getDatasetId()),
-                        AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                        AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, storageProperties
-                                .getBloomFilterFalsePositiveRate()), BTree.DEFAULT_FILL_FACTOR);
-
-			// Connect the operators.
-			spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primaryScanOp, 0);
-			spec.connect(new OneToOneConnectorDescriptor(spec), primaryScanOp, 0, asterixAssignOp, 0);
-			if (anySecondaryKeyIsNullable) {
-				spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, selectOp, 0);
-				spec.connect(new OneToOneConnectorDescriptor(spec), selectOp, 0, sortOp, 0);
-			} else {
-				spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, sortOp, 0);
-			}
-			spec.connect(new OneToOneConnectorDescriptor(spec), sortOp, 0, secondaryBulkLoadOp, 0);
-			spec.addRoot(secondaryBulkLoadOp);
-			spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
-			return spec;
-		}
-	}
+            // Connect the operators.
+            spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primaryScanOp, 0);
+            spec.connect(new OneToOneConnectorDescriptor(spec), primaryScanOp, 0, asterixAssignOp, 0);
+            if (anySecondaryKeyIsNullable) {
+                spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, selectOp, 0);
+                spec.connect(new OneToOneConnectorDescriptor(spec), selectOp, 0, sortOp, 0);
+            } else {
+                spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, sortOp, 0);
+            }
+            spec.connect(new OneToOneConnectorDescriptor(spec), sortOp, 0, secondaryBulkLoadOp, 0);
+            spec.addRoot(secondaryBulkLoadOp);
+            spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+            return spec;
+        }
+    }
 }
-
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java
index 5da336f..1418c32 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java
@@ -19,9 +19,6 @@
 import java.io.IOException;
 import java.util.List;
 
-import edu.uci.ics.asterix.external.adapter.factory.HDFSAdapterFactory;
-import edu.uci.ics.asterix.external.data.operator.ExternalDataIndexingOperatorDescriptor;
-import edu.uci.ics.asterix.external.dataset.adapter.HiveAdapter;
 import edu.uci.ics.asterix.common.config.AsterixStorageProperties;
 import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
 import edu.uci.ics.asterix.common.config.IAsterixPropertiesProvider;
@@ -29,25 +26,30 @@
 import edu.uci.ics.asterix.common.context.ITransactionSubsystemProvider;
 import edu.uci.ics.asterix.common.context.TransactionSubsystemProvider;
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
 import edu.uci.ics.asterix.common.transactions.IRecoveryManager.ResourceType;
 import edu.uci.ics.asterix.common.transactions.JobId;
+import edu.uci.ics.asterix.external.adapter.factory.HDFSAdapterFactory;
+import edu.uci.ics.asterix.external.data.operator.ExternalDataIndexingOperatorDescriptor;
+import edu.uci.ics.asterix.external.dataset.adapter.HiveAdapter;
 import edu.uci.ics.asterix.formats.nontagged.AqlBinaryBooleanInspectorImpl;
 import edu.uci.ics.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
 import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
 import edu.uci.ics.asterix.formats.nontagged.AqlTypeTraitProvider;
-import edu.uci.ics.asterix.metadata.entities.ExternalDatasetDetails;
 import edu.uci.ics.asterix.metadata.MetadataException;
 import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
 import edu.uci.ics.asterix.metadata.entities.Dataset;
+import edu.uci.ics.asterix.metadata.entities.ExternalDatasetDetails;
 import edu.uci.ics.asterix.metadata.entities.Index;
 import edu.uci.ics.asterix.metadata.utils.DatasetUtils;
-import edu.uci.ics.asterix.om.types.BuiltinType;
 import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.BuiltinType;
 import edu.uci.ics.asterix.om.types.IAType;
 import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
 import edu.uci.ics.asterix.runtime.evaluators.functions.AndDescriptor;
 import edu.uci.ics.asterix.runtime.evaluators.functions.IsNullDescriptor;
 import edu.uci.ics.asterix.runtime.evaluators.functions.NotDescriptor;
+import edu.uci.ics.asterix.runtime.formats.NonTaggedDataFormat;
 import edu.uci.ics.asterix.runtime.job.listener.JobEventListenerFactory;
 import edu.uci.ics.asterix.transaction.management.opcallbacks.PrimaryIndexInstantSearchOperationCallbackFactory;
 import edu.uci.ics.asterix.transaction.management.opcallbacks.PrimaryIndexOperationTrackerProvider;
@@ -83,7 +85,6 @@
 import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
 import edu.uci.ics.hyracks.dataflow.std.misc.ConstantTupleSourceOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
-import edu.uci.ics.asterix.runtime.formats.NonTaggedDataFormat;
 import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
@@ -91,7 +92,6 @@
 import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeDataflowHelperFactory;
 
-
 @SuppressWarnings("rawtypes")
 // TODO: We should eventually have a hierarchy of classes that can create all
 // possible index job specs,
@@ -165,68 +165,67 @@
     public abstract JobSpecification buildLoadingJobSpec() throws AsterixException, AlgebricksException;
 
     protected void init(CompiledCreateIndexStatement createIndexStmt, AqlMetadataProvider metadataProvider)
-                        throws AsterixException, AlgebricksException {
-                this.metadataProvider = metadataProvider;
-                dataverseName = createIndexStmt.getDataverseName() == null ? metadataProvider.getDefaultDataverseName()
-                                : createIndexStmt.getDataverseName();
-                datasetName = createIndexStmt.getDatasetName();
-                secondaryIndexName = createIndexStmt.getIndexName();
-                dataset = metadataProvider.findDataset(dataverseName, datasetName);
-                if (dataset == null) {
-                        throw new AsterixException("Unknown dataset " + datasetName);
-                }
-                if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
-                        //get external dataset details
-                        ExternalDatasetDetails edsd = ((ExternalDatasetDetails)dataset.getDatasetDetails());
-                        //get adapter name
-                        String adapter = edsd.getAdapter();
-                        //if not an hdfs adapter, throw an exception
-                        if(!adapter.equals(HDFSAdapterFactory.HDFS_ADAPTER_NAME) && !adapter.equals(HiveAdapter.class.getName()))
-                        {
-                                throw new AsterixException("Cannot index an external dataset with adapter type(" + adapter + ").");
-                        }
-                        //get the item type
-                        ARecordType externalItemType = (ARecordType) metadataProvider.findType(dataset.getDataverseName(), dataset.getItemTypeName());
-                        //number of primary keys here depends on the file input, 3 for rcfiles and 2 for text and sequence files.
-                        numPrimaryKeys = DatasetUtils.getExternalRIDSize(dataset);
-                        itemType = createExternalItemTypeWithRID(externalItemType);
-                        payloadSerde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(itemType);
-                        numSecondaryKeys = createIndexStmt.getKeyFields().size();
-                        //splits and constraints <--They don't exist-->
-                        primaryFileSplitProvider = null;
-                        primaryPartitionConstraint = null;
-                        //create secondary split and constraints
-                        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint = metadataProvider
-                                        .splitProviderAndPartitionConstraintsForExternalDataset(dataverseName, datasetName,
-                                                        secondaryIndexName);
-                        secondaryFileSplitProvider = secondarySplitsAndConstraint.first;
-                        secondaryPartitionConstraint = secondarySplitsAndConstraint.second;
-                        // Must be called in this order.
-                        setExternalRIDDescAndComparators();
-                        setExternalSecondaryRecDescAndComparators(createIndexStmt, metadataProvider);
-                        numElementsHint = metadataProvider.getCardinalityPerPartitionHint(dataset);
-                }
-                else
-                {
-                        itemType = (ARecordType) metadataProvider.findType(dataset.getDataverseName(), dataset.getItemTypeName());
-                        payloadSerde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(itemType);
-                        numPrimaryKeys = DatasetUtils.getPartitioningKeys(dataset).size();
-                        numSecondaryKeys = createIndexStmt.getKeyFields().size();
-                        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primarySplitsAndConstraint = metadataProvider
-                                        .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(dataverseName, datasetName, datasetName);
-                        primaryFileSplitProvider = primarySplitsAndConstraint.first;
-                        primaryPartitionConstraint = primarySplitsAndConstraint.second;
-                        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint = metadataProvider
-                                        .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(dataverseName, datasetName,
-                                                        secondaryIndexName);
-                        secondaryFileSplitProvider = secondarySplitsAndConstraint.first;
-                        secondaryPartitionConstraint = secondarySplitsAndConstraint.second;
-                        // Must be called in this order.
-                        setPrimaryRecDescAndComparators();
-                        setSecondaryRecDescAndComparators(createIndexStmt, metadataProvider);
-                        numElementsHint = metadataProvider.getCardinalityPerPartitionHint(dataset);
-                }
+            throws AsterixException, AlgebricksException {
+        this.metadataProvider = metadataProvider;
+        dataverseName = createIndexStmt.getDataverseName() == null ? metadataProvider.getDefaultDataverseName()
+                : createIndexStmt.getDataverseName();
+        datasetName = createIndexStmt.getDatasetName();
+        secondaryIndexName = createIndexStmt.getIndexName();
+        dataset = metadataProvider.findDataset(dataverseName, datasetName);
+        if (dataset == null) {
+            throw new AsterixException("Unknown dataset " + datasetName);
         }
+        if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
+            //get external dataset details
+            ExternalDatasetDetails edsd = ((ExternalDatasetDetails) dataset.getDatasetDetails());
+            //get adapter name
+            String adapter = edsd.getAdapter();
+            //if not an hdfs adapter, throw an exception
+            if (!adapter.equals(HDFSAdapterFactory.HDFS_ADAPTER_NAME) && !adapter.equals(HiveAdapter.class.getName())) {
+                throw new AsterixException("Cannot index an external dataset with adapter type(" + adapter + ").");
+            }
+            //get the item type
+            ARecordType externalItemType = (ARecordType) metadataProvider.findType(dataset.getDataverseName(),
+                    dataset.getItemTypeName());
+            //number of primary keys here depends on the file input, 3 for rcfiles and 2 for text and sequence files.
+            numPrimaryKeys = DatasetUtils.getExternalRIDSize(dataset);
+            itemType = createExternalItemTypeWithRID(externalItemType);
+            payloadSerde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(itemType);
+            numSecondaryKeys = createIndexStmt.getKeyFields().size();
+            //splits and constraints <--They don't exist-->
+            primaryFileSplitProvider = null;
+            primaryPartitionConstraint = null;
+            //create secondary split and constraints
+            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint = metadataProvider
+                    .splitProviderAndPartitionConstraintsForExternalDataset(dataverseName, datasetName,
+                            secondaryIndexName);
+            secondaryFileSplitProvider = secondarySplitsAndConstraint.first;
+            secondaryPartitionConstraint = secondarySplitsAndConstraint.second;
+            // Must be called in this order.
+            setExternalRIDDescAndComparators();
+            setExternalSecondaryRecDescAndComparators(createIndexStmt, metadataProvider);
+            numElementsHint = metadataProvider.getCardinalityPerPartitionHint(dataset);
+        } else {
+            itemType = (ARecordType) metadataProvider.findType(dataset.getDataverseName(), dataset.getItemTypeName());
+            payloadSerde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(itemType);
+            numPrimaryKeys = DatasetUtils.getPartitioningKeys(dataset).size();
+            numSecondaryKeys = createIndexStmt.getKeyFields().size();
+            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primarySplitsAndConstraint = metadataProvider
+                    .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(dataverseName, datasetName,
+                            datasetName);
+            primaryFileSplitProvider = primarySplitsAndConstraint.first;
+            primaryPartitionConstraint = primarySplitsAndConstraint.second;
+            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint = metadataProvider
+                    .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(dataverseName, datasetName,
+                            secondaryIndexName);
+            secondaryFileSplitProvider = secondarySplitsAndConstraint.first;
+            secondaryPartitionConstraint = secondarySplitsAndConstraint.second;
+            // Must be called in this order.
+            setPrimaryRecDescAndComparators();
+            setSecondaryRecDescAndComparators(createIndexStmt, metadataProvider);
+            numElementsHint = metadataProvider.getCardinalityPerPartitionHint(dataset);
+        }
+    }
 
     protected void setPrimaryRecDescAndComparators() throws AlgebricksException {
         List<String> partitioningKeys = DatasetUtils.getPartitioningKeys(dataset);
@@ -310,209 +309,195 @@
         return keyProviderOp;
     }
 
-protected ARecordType createExternalItemTypeWithRID(
-                        ARecordType externalItemType) throws AsterixException {
+    protected ARecordType createExternalItemTypeWithRID(ARecordType externalItemType) throws AsterixException {
 
-                String[] fieldsNames = new String[externalItemType.getFieldNames().length+numPrimaryKeys];
-                IAType[] fieldsTypes = new IAType[externalItemType.getFieldTypes().length+numPrimaryKeys];
+        String[] fieldsNames = new String[externalItemType.getFieldNames().length + numPrimaryKeys];
+        IAType[] fieldsTypes = new IAType[externalItemType.getFieldTypes().length + numPrimaryKeys];
 
-                //add RID fields names and types
-                if(AqlMetadataProvider.isOptimizeExternalIndexes())
-                {
-                        fieldsNames[0] = "_file-number";
-                        fieldsTypes[0] = BuiltinType.AINT32;
-                }
-                else
-                {
-                        fieldsNames[0] = "_file-name";
-                        fieldsTypes[0] = BuiltinType.ASTRING;
-                }
-                fieldsNames[1] = "_byte-location";
-                fieldsTypes[1] = BuiltinType.AINT64;
-                if(numPrimaryKeys == 3)
-                {       
-                        //add the row number for rc files
-                        fieldsNames[2] = "_row-number";
-                        fieldsTypes[2] = BuiltinType.AINT32;
-                }
-                
-                //add the original fields names and types
-                for(int i=0; i < externalItemType.getFieldNames().length; i++)
-                {
-                        fieldsNames[i+numPrimaryKeys] = externalItemType.getFieldNames()[i];
-                        fieldsTypes[i+numPrimaryKeys] = externalItemType.getFieldTypes()[i];
-                }
-                return new ARecordType(externalItemType.getTypeName(), fieldsNames, fieldsTypes, externalItemType.isOpen());
+        //add RID fields names and types
+        if (AqlMetadataProvider.isOptimizeExternalIndexes()) {
+            fieldsNames[0] = "_file-number";
+            fieldsTypes[0] = BuiltinType.AINT32;
+        } else {
+            fieldsNames[0] = "_file-name";
+            fieldsTypes[0] = BuiltinType.ASTRING;
+        }
+        fieldsNames[1] = "_byte-location";
+        fieldsTypes[1] = BuiltinType.AINT64;
+        if (numPrimaryKeys == 3) {
+            //add the row number for rc files
+            fieldsNames[2] = "_row-number";
+            fieldsTypes[2] = BuiltinType.AINT32;
         }
 
-        protected void setExternalRIDDescAndComparators() throws AlgebricksException {
+        //add the original fields names and types
+        for (int i = 0; i < externalItemType.getFieldNames().length; i++) {
+            fieldsNames[i + numPrimaryKeys] = externalItemType.getFieldNames()[i];
+            fieldsTypes[i + numPrimaryKeys] = externalItemType.getFieldTypes()[i];
+        }
+        return new ARecordType(externalItemType.getTypeName(), fieldsNames, fieldsTypes, externalItemType.isOpen());
+    }
 
-                ISerializerDeserializer[] externalRecFields = new ISerializerDeserializer[itemType.getFieldNames().length];
-                ITypeTraits[] externalTypeTraits = new ITypeTraits[itemType.getFieldNames().length];
+    protected void setExternalRIDDescAndComparators() throws AlgebricksException {
 
-                primaryComparatorFactories = new IBinaryComparatorFactory[numPrimaryKeys];
-                primaryBloomFilterKeyFields = new int[numPrimaryKeys];
-                ISerializerDeserializerProvider serdeProvider = metadataProvider.getFormat().getSerdeProvider();
-                
-                if(AqlMetadataProvider.isOptimizeExternalIndexes())
-                {
-                        primaryComparatorFactories[0] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(BuiltinType.AINT32, true);
-                }
-                else
-                {
-                        primaryComparatorFactories[0] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(BuiltinType.ASTRING, true);
-                }
-                primaryComparatorFactories[1] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(BuiltinType.AINT64, true);
+        ISerializerDeserializer[] externalRecFields = new ISerializerDeserializer[itemType.getFieldNames().length];
+        ITypeTraits[] externalTypeTraits = new ITypeTraits[itemType.getFieldNames().length];
 
-                primaryBloomFilterKeyFields[0]=0;
-                primaryBloomFilterKeyFields[1]=1;
+        primaryComparatorFactories = new IBinaryComparatorFactory[numPrimaryKeys];
+        primaryBloomFilterKeyFields = new int[numPrimaryKeys];
+        ISerializerDeserializerProvider serdeProvider = metadataProvider.getFormat().getSerdeProvider();
 
-                if(numPrimaryKeys == 3)
-                {
-                        primaryComparatorFactories[2] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(BuiltinType.AINT32, true);
-                        primaryBloomFilterKeyFields[2]=2;
-                }
+        if (AqlMetadataProvider.isOptimizeExternalIndexes()) {
+            primaryComparatorFactories[0] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
+                    BuiltinType.AINT32, true);
+        } else {
+            primaryComparatorFactories[0] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
+                    BuiltinType.ASTRING, true);
+        }
+        primaryComparatorFactories[1] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
+                BuiltinType.AINT64, true);
 
-                for(int i=0; i < itemType.getFieldNames().length; i++)
-                {
-                        externalRecFields[i] = serdeProvider.getSerializerDeserializer(itemType.getFieldTypes()[i]); 
-                        externalTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(itemType.getFieldTypes()[i]);
-                }
-                primaryRecDesc = new RecordDescriptor(externalRecFields, externalTypeTraits);
+        primaryBloomFilterKeyFields[0] = 0;
+        primaryBloomFilterKeyFields[1] = 1;
+
+        if (numPrimaryKeys == 3) {
+            primaryComparatorFactories[2] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
+                    BuiltinType.AINT32, true);
+            primaryBloomFilterKeyFields[2] = 2;
         }
 
-protected void setExternalSecondaryRecDescAndComparators(CompiledCreateIndexStatement createIndexStmt,
-                        AqlMetadataProvider metadataProvider) throws AlgebricksException, AsterixException {
-                secondaryKeyFields = createIndexStmt.getKeyFields();
-                secondaryFieldAccessEvalFactories = new ICopyEvaluatorFactory[numSecondaryKeys+ numPrimaryKeys];
-                secondaryComparatorFactories = new IBinaryComparatorFactory[numSecondaryKeys + numPrimaryKeys];
-                secondaryBloomFilterKeyFields = new int[numSecondaryKeys];
-                ISerializerDeserializer[] secondaryRecFields = new ISerializerDeserializer[numPrimaryKeys + numSecondaryKeys];
-                ITypeTraits[] secondaryTypeTraits = new ITypeTraits[numSecondaryKeys + numPrimaryKeys];
-                ISerializerDeserializerProvider serdeProvider = metadataProvider.getFormat().getSerdeProvider();
-                ITypeTraitProvider typeTraitProvider = metadataProvider.getFormat().getTypeTraitProvider();
-                IBinaryComparatorFactoryProvider comparatorFactoryProvider = metadataProvider.getFormat()
-                                .getBinaryComparatorFactoryProvider();
+        for (int i = 0; i < itemType.getFieldNames().length; i++) {
+            externalRecFields[i] = serdeProvider.getSerializerDeserializer(itemType.getFieldTypes()[i]);
+            externalTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(itemType.getFieldTypes()[i]);
+        }
+        primaryRecDesc = new RecordDescriptor(externalRecFields, externalTypeTraits);
+    }
 
-                for (int i = 0; i < numSecondaryKeys; i++) {
-                        secondaryFieldAccessEvalFactories[i] = metadataProvider.getFormat().getFieldAccessEvaluatorFactory(
-                                        itemType, secondaryKeyFields.get(i), 0);
-                        Pair<IAType, Boolean> keyTypePair = Index.getNonNullableKeyFieldType(secondaryKeyFields.get(i), itemType);
-                        IAType keyType = keyTypePair.first;
-                        anySecondaryKeyIsNullable = anySecondaryKeyIsNullable || keyTypePair.second;
-                        ISerializerDeserializer keySerde = serdeProvider.getSerializerDeserializer(keyType);
-                        secondaryRecFields[i] = keySerde;
-                        secondaryComparatorFactories[i] = comparatorFactoryProvider.getBinaryComparatorFactory(keyType, true);
-                        secondaryTypeTraits[i] = typeTraitProvider.getTypeTrait(keyType);
-                        secondaryBloomFilterKeyFields[i] = i;
-                }
+    protected void setExternalSecondaryRecDescAndComparators(CompiledCreateIndexStatement createIndexStmt,
+            AqlMetadataProvider metadataProvider) throws AlgebricksException, AsterixException {
+        secondaryKeyFields = createIndexStmt.getKeyFields();
+        secondaryFieldAccessEvalFactories = new ICopyEvaluatorFactory[numSecondaryKeys + numPrimaryKeys];
+        secondaryComparatorFactories = new IBinaryComparatorFactory[numSecondaryKeys + numPrimaryKeys];
+        secondaryBloomFilterKeyFields = new int[numSecondaryKeys];
+        ISerializerDeserializer[] secondaryRecFields = new ISerializerDeserializer[numPrimaryKeys + numSecondaryKeys];
+        ITypeTraits[] secondaryTypeTraits = new ITypeTraits[numSecondaryKeys + numPrimaryKeys];
+        ISerializerDeserializerProvider serdeProvider = metadataProvider.getFormat().getSerdeProvider();
+        ITypeTraitProvider typeTraitProvider = metadataProvider.getFormat().getTypeTraitProvider();
+        IBinaryComparatorFactoryProvider comparatorFactoryProvider = metadataProvider.getFormat()
+                .getBinaryComparatorFactoryProvider();
 
-                if(AqlMetadataProvider.isOptimizeExternalIndexes())
-                {
-                        secondaryFieldAccessEvalFactories[numSecondaryKeys] = metadataProvider.getFormat().getFieldAccessEvaluatorFactory(
-                                itemType, "_file-number", 0);
-                }
-                else
-                {
-                        secondaryFieldAccessEvalFactories[numSecondaryKeys] = metadataProvider.getFormat().getFieldAccessEvaluatorFactory(
-                                        itemType, "_file-name", 0);
-                }
-                secondaryFieldAccessEvalFactories[numSecondaryKeys+1] = metadataProvider.getFormat().getFieldAccessEvaluatorFactory(
-                                itemType, "_byte-location", 0);
-                if(numPrimaryKeys == 3)
-                {
-                        secondaryFieldAccessEvalFactories[numSecondaryKeys+2] = metadataProvider.getFormat().getFieldAccessEvaluatorFactory(
-                                        itemType, "_row-number", 0);
-                }
-
-                for (int i = 0; i < numPrimaryKeys; i++) {
-                        secondaryRecFields[numSecondaryKeys + i] = primaryRecDesc.getFields()[i];
-                        secondaryTypeTraits[numSecondaryKeys + i] = primaryRecDesc.getTypeTraits()[i];
-                        secondaryComparatorFactories[numSecondaryKeys + i] = primaryComparatorFactories[i];
-                }
-                secondaryRecDesc = new RecordDescriptor(secondaryRecFields, secondaryTypeTraits);
+        for (int i = 0; i < numSecondaryKeys; i++) {
+            secondaryFieldAccessEvalFactories[i] = metadataProvider.getFormat().getFieldAccessEvaluatorFactory(
+                    itemType, secondaryKeyFields.get(i), 0);
+            Pair<IAType, Boolean> keyTypePair = Index.getNonNullableKeyFieldType(secondaryKeyFields.get(i), itemType);
+            IAType keyType = keyTypePair.first;
+            anySecondaryKeyIsNullable = anySecondaryKeyIsNullable || keyTypePair.second;
+            ISerializerDeserializer keySerde = serdeProvider.getSerializerDeserializer(keyType);
+            secondaryRecFields[i] = keySerde;
+            secondaryComparatorFactories[i] = comparatorFactoryProvider.getBinaryComparatorFactory(keyType, true);
+            secondaryTypeTraits[i] = typeTraitProvider.getTypeTrait(keyType);
+            secondaryBloomFilterKeyFields[i] = i;
         }
 
-protected Pair<ExternalDataIndexingOperatorDescriptor, AlgebricksPartitionConstraint> createExternalIndexingOp(JobSpecification spec) throws Exception {
-                Pair<ExternalDataIndexingOperatorDescriptor,AlgebricksPartitionConstraint> indexingOpAndConstraints = metadataProvider.buildExternalDataIndexingRuntime(spec, itemType, dataset, NonTaggedDataFormat.INSTANCE);
-                AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, indexingOpAndConstraints.first,
-                                indexingOpAndConstraints.second);
-                return indexingOpAndConstraints;
+        if (AqlMetadataProvider.isOptimizeExternalIndexes()) {
+            secondaryFieldAccessEvalFactories[numSecondaryKeys] = metadataProvider.getFormat()
+                    .getFieldAccessEvaluatorFactory(itemType, "_file-number", 0);
+        } else {
+            secondaryFieldAccessEvalFactories[numSecondaryKeys] = metadataProvider.getFormat()
+                    .getFieldAccessEvaluatorFactory(itemType, "_file-name", 0);
+        }
+        secondaryFieldAccessEvalFactories[numSecondaryKeys + 1] = metadataProvider.getFormat()
+                .getFieldAccessEvaluatorFactory(itemType, "_byte-location", 0);
+        if (numPrimaryKeys == 3) {
+            secondaryFieldAccessEvalFactories[numSecondaryKeys + 2] = metadataProvider.getFormat()
+                    .getFieldAccessEvaluatorFactory(itemType, "_row-number", 0);
         }
 
-protected AlgebricksMetaOperatorDescriptor createExternalAssignOp(JobSpecification spec) throws AlgebricksException {
-                int[] outColumns = new int[numSecondaryKeys + numPrimaryKeys];
-                int[] projectionList = new int[numSecondaryKeys + numPrimaryKeys];
-                for (int i = 0; i < numSecondaryKeys + numPrimaryKeys; i++) {
-                        outColumns[i] = i;
-                        projectionList[i] = i;
-                }
+        for (int i = 0; i < numPrimaryKeys; i++) {
+            secondaryRecFields[numSecondaryKeys + i] = primaryRecDesc.getFields()[i];
+            secondaryTypeTraits[numSecondaryKeys + i] = primaryRecDesc.getTypeTraits()[i];
+            secondaryComparatorFactories[numSecondaryKeys + i] = primaryComparatorFactories[i];
+        }
+        secondaryRecDesc = new RecordDescriptor(secondaryRecFields, secondaryTypeTraits);
+    }
 
-                IScalarEvaluatorFactory[] sefs = new IScalarEvaluatorFactory[secondaryFieldAccessEvalFactories.length];
-                for (int i = 0; i < secondaryFieldAccessEvalFactories.length; ++i) {
-                        sefs[i] = new LogicalExpressionJobGenToExpressionRuntimeProviderAdapter.ScalarEvaluatorFactoryAdapter(
-                                        secondaryFieldAccessEvalFactories[i]);
-                }
-                AssignRuntimeFactory assign = new AssignRuntimeFactory(outColumns, sefs, projectionList);
-                AlgebricksMetaOperatorDescriptor asterixAssignOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 1,
-                                new IPushRuntimeFactory[] { assign }, new RecordDescriptor[] { secondaryRecDesc });
-                return asterixAssignOp;
+    protected Pair<ExternalDataIndexingOperatorDescriptor, AlgebricksPartitionConstraint> createExternalIndexingOp(
+            JobSpecification spec) throws Exception {
+        Pair<ExternalDataIndexingOperatorDescriptor, AlgebricksPartitionConstraint> indexingOpAndConstraints = metadataProvider
+                .buildExternalDataIndexingRuntime(spec, itemType, dataset, NonTaggedDataFormat.INSTANCE);
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, indexingOpAndConstraints.first,
+                indexingOpAndConstraints.second);
+        return indexingOpAndConstraints;
+    }
+
+    protected AlgebricksMetaOperatorDescriptor createExternalAssignOp(JobSpecification spec) throws AlgebricksException {
+        int[] outColumns = new int[numSecondaryKeys + numPrimaryKeys];
+        int[] projectionList = new int[numSecondaryKeys + numPrimaryKeys];
+        for (int i = 0; i < numSecondaryKeys + numPrimaryKeys; i++) {
+            outColumns[i] = i;
+            projectionList[i] = i;
         }
 
-        protected ExternalSortOperatorDescriptor createSortOp(JobSpecification spec,
-                        IBinaryComparatorFactory[] secondaryComparatorFactories, RecordDescriptor secondaryRecDesc,
-                        AlgebricksPartitionConstraint partitionConstraints) {
-                int[] sortFields = new int[secondaryComparatorFactories.length];
-                for (int i = 0; i < secondaryComparatorFactories.length; i++) {
-                        sortFields[i] = i;
-                }
-                ExternalSortOperatorDescriptor sortOp = new ExternalSortOperatorDescriptor(spec,
-                                physOptConf.getMaxFramesExternalSort(), sortFields, secondaryComparatorFactories, secondaryRecDesc);
-                AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, sortOp, partitionConstraints);
-                return sortOp;
+        IScalarEvaluatorFactory[] sefs = new IScalarEvaluatorFactory[secondaryFieldAccessEvalFactories.length];
+        for (int i = 0; i < secondaryFieldAccessEvalFactories.length; ++i) {
+            sefs[i] = new LogicalExpressionJobGenToExpressionRuntimeProviderAdapter.ScalarEvaluatorFactoryAdapter(
+                    secondaryFieldAccessEvalFactories[i]);
+        }
+        AssignRuntimeFactory assign = new AssignRuntimeFactory(outColumns, sefs, projectionList);
+        AlgebricksMetaOperatorDescriptor asterixAssignOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 1,
+                new IPushRuntimeFactory[] { assign }, new RecordDescriptor[] { secondaryRecDesc });
+        return asterixAssignOp;
+    }
+
+    protected ExternalSortOperatorDescriptor createSortOp(JobSpecification spec,
+            IBinaryComparatorFactory[] secondaryComparatorFactories, RecordDescriptor secondaryRecDesc,
+            AlgebricksPartitionConstraint partitionConstraints) {
+        int[] sortFields = new int[secondaryComparatorFactories.length];
+        for (int i = 0; i < secondaryComparatorFactories.length; i++) {
+            sortFields[i] = i;
+        }
+        ExternalSortOperatorDescriptor sortOp = new ExternalSortOperatorDescriptor(spec,
+                physOptConf.getMaxFramesExternalSort(), sortFields, secondaryComparatorFactories, secondaryRecDesc);
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, sortOp, partitionConstraints);
+        return sortOp;
+    }
+
+    protected ARecordType createSecondaryItemType(ARecordType externalItemType, boolean isRCFile)
+            throws AsterixException {
+
+        String[] fieldsNames = new String[numSecondaryKeys + numPrimaryKeys];
+        IAType[] fieldsTypes = new IAType[numSecondaryKeys + numPrimaryKeys];
+
+        //first create the secondary index fields
+        for (int i = 0; i < numSecondaryKeys; i++) {
+            fieldsNames[i] = secondaryKeyFields.get(i);
+            try {
+                fieldsTypes[i] = externalItemType.getFieldType(fieldsNames[i]);
+            } catch (IOException e) {
+                // TODO Auto-generated catch block
+                throw new AsterixException(e);
+            }
         }
 
-protected ARecordType createSecondaryItemType(ARecordType externalItemType, boolean isRCFile) throws AsterixException
-        {
-
-                String[] fieldsNames = new String[numSecondaryKeys+numPrimaryKeys];
-                IAType[] fieldsTypes = new IAType[numSecondaryKeys+numPrimaryKeys];
-
-                //first create the secondary index fields
-                for(int i=0; i<numSecondaryKeys; i++)
-                {
-                        fieldsNames[i] = secondaryKeyFields.get(i);
-                        try {
-                                fieldsTypes[i] = externalItemType.getFieldType(fieldsNames[i]);
-                        } catch (IOException e) {
-                                // TODO Auto-generated catch block
-                                throw new AsterixException(e);
-                        }
-                }
-
-                //second add RID fields (File name or number and byte location)
-                if(AqlMetadataProvider.isOptimizeExternalIndexes())
-                {
-                        fieldsNames[numSecondaryKeys] = "_file-number";
-                        fieldsTypes[numSecondaryKeys] = BuiltinType.ASTRING;
-                }
-                else
-                {
-                        fieldsNames[numSecondaryKeys] = "_file-name";
-                        fieldsTypes[numSecondaryKeys] = BuiltinType.ASTRING;
-                }
-                fieldsNames[numSecondaryKeys+1] = "_byte-location";
-                fieldsTypes[numSecondaryKeys+1] = BuiltinType.AINT64;
-
-                if(isRCFile)
-                {
-                        fieldsNames[numSecondaryKeys+2] = "_row-Number";
-                        fieldsTypes[numSecondaryKeys+2] = BuiltinType.AINT32;
-                }
-
-                //return type
-                return new ARecordType(externalItemType.getTypeName(), fieldsNames, fieldsTypes, externalItemType.isOpen());
+        //second add RID fields (File name or number and byte location)
+        if (AqlMetadataProvider.isOptimizeExternalIndexes()) {
+            fieldsNames[numSecondaryKeys] = "_file-number";
+            fieldsTypes[numSecondaryKeys] = BuiltinType.ASTRING;
+        } else {
+            fieldsNames[numSecondaryKeys] = "_file-name";
+            fieldsTypes[numSecondaryKeys] = BuiltinType.ASTRING;
         }
+        fieldsNames[numSecondaryKeys + 1] = "_byte-location";
+        fieldsTypes[numSecondaryKeys + 1] = BuiltinType.AINT64;
+
+        if (isRCFile) {
+            fieldsNames[numSecondaryKeys + 2] = "_row-Number";
+            fieldsTypes[numSecondaryKeys + 2] = BuiltinType.AINT32;
+        }
+
+        //return type
+        return new ARecordType(externalItemType.getTypeName(), fieldsNames, fieldsTypes, externalItemType.isOpen());
+    }
 
     protected BTreeSearchOperatorDescriptor createPrimaryIndexScanOp(JobSpecification spec) throws AlgebricksException {
         // -Infinity
@@ -537,7 +522,7 @@
                 new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
                         AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, new PrimaryIndexOperationTrackerProvider(
                                 dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                        AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                        LSMBTreeIOOperationCallbackFactory.INSTANCE,
                         storageProperties.getBloomFilterFalsePositiveRate()), false, searchCallbackFactory);
         AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, primarySearchOp,
                 primaryPartitionConstraint);
@@ -633,4 +618,3 @@
         return asterixSelectOp;
     }
 }
-
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryInvertedIndexCreator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryInvertedIndexCreator.java
index 6f64aa2..a3261e6 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryInvertedIndexCreator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryInvertedIndexCreator.java
@@ -283,23 +283,23 @@
         if (!isPartitioned) {
             return new LSMInvertedIndexDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(
                     dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                    new SecondaryIndexOperationTrackerProvider(LSMInvertedIndexIOOperationCallbackFactory.INSTANCE,
-                            dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                    new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
                     AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                    LSMInvertedIndexIOOperationCallbackFactory.INSTANCE,
                     storageProperties.getBloomFilterFalsePositiveRate());
         } else {
             return new PartitionedLSMInvertedIndexDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(
                     dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                    new SecondaryIndexOperationTrackerProvider(LSMInvertedIndexIOOperationCallbackFactory.INSTANCE,
-                            dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                    new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
                     AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                    LSMInvertedIndexIOOperationCallbackFactory.INSTANCE,
                     storageProperties.getBloomFilterFalsePositiveRate());
         }
     }
-    
+
     @Override
-	protected void setExternalSecondaryRecDescAndComparators(CompiledCreateIndexStatement createIndexStmt,
-			AqlMetadataProvider metadataProvider) throws AlgebricksException, AsterixException {
-    	throw new AsterixException("Cannot create inverted index on external dataset due to composite RID Fields.");
-	}
+    protected void setExternalSecondaryRecDescAndComparators(CompiledCreateIndexStatement createIndexStmt,
+            AqlMetadataProvider metadataProvider) throws AlgebricksException, AsterixException {
+        throw new AsterixException("Cannot create inverted index on external dataset due to composite RID Fields.");
+    }
 }
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryRTreeCreator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryRTreeCreator.java
index 6ce694c..ebfee66 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryRTreeCreator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryRTreeCreator.java
@@ -102,10 +102,9 @@
                 new LSMRTreeDataflowHelperFactory(valueProviderFactories, RTreePolicyType.RTREE,
                         primaryComparatorFactories, new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
                         AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, new SecondaryIndexOperationTrackerProvider(
-                                LSMRTreeIOOperationCallbackFactory.INSTANCE, dataset.getDatasetId()),
-                        AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                        AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AqlMetadataProvider.proposeLinearizer(
-                                keyType, secondaryComparatorFactories.length), storageProperties
+                                dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                        LSMRTreeIOOperationCallbackFactory.INSTANCE, AqlMetadataProvider.proposeLinearizer(keyType,
+                                secondaryComparatorFactories.length), storageProperties
                                 .getBloomFilterFalsePositiveRate()), localResourceFactoryProvider,
                 NoOpOperationCallbackFactory.INSTANCE);
         AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, secondaryIndexCreateOp,
@@ -161,165 +160,159 @@
     }
 
     @Override
-	protected void setExternalSecondaryRecDescAndComparators(CompiledCreateIndexStatement createIndexStmt,
-			AqlMetadataProvider metadataProvider) throws AlgebricksException, AsterixException {
-		secondaryKeyFields = createIndexStmt.getKeyFields();
-		if (numSecondaryKeys != 1) {
-			throw new AsterixException(
-					"Cannot use "
-							+ numSecondaryKeys
-							+ " fields as a key for the R-tree index. There can be only one field as a key for the R-tree index.");
-		}
-		Pair<IAType, Boolean> spatialTypePair = Index.getNonNullableKeyFieldType(secondaryKeyFields.get(0), itemType);
-		IAType spatialType = spatialTypePair.first;
-		anySecondaryKeyIsNullable = spatialTypePair.second;
-		if (spatialType == null) {
-			throw new AsterixException("Could not find field " + secondaryKeyFields.get(0) + " in the schema.");
-		}
-		int numDimensions = NonTaggedFormatUtil.getNumDimensions(spatialType.getTypeTag());
-		numNestedSecondaryKeyFields = numDimensions * 2;
-		secondaryFieldAccessEvalFactories = metadataProvider.getFormat().createMBRFactory(itemType, secondaryKeyFields.get(0),
-				numPrimaryKeys, numDimensions);
-		secondaryComparatorFactories = new IBinaryComparatorFactory[numNestedSecondaryKeyFields];
-		valueProviderFactories = new IPrimitiveValueProviderFactory[numNestedSecondaryKeyFields];
-		ISerializerDeserializer[] secondaryRecFields = new ISerializerDeserializer[numPrimaryKeys
-		                                                                           + numNestedSecondaryKeyFields];
-		ITypeTraits[] secondaryTypeTraits = new ITypeTraits[numNestedSecondaryKeyFields + numPrimaryKeys];
-		IAType nestedKeyType = NonTaggedFormatUtil.getNestedSpatialType(spatialType.getTypeTag());
-		keyType = nestedKeyType.getTypeTag();
-		for (int i = 0; i < numNestedSecondaryKeyFields; i++) {
-			ISerializerDeserializer keySerde = AqlSerializerDeserializerProvider.INSTANCE
-					.getSerializerDeserializer(nestedKeyType);
-			secondaryRecFields[i] = keySerde;
-			secondaryComparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
-					nestedKeyType, true);
-			secondaryTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(nestedKeyType);
-			valueProviderFactories[i] = AqlPrimitiveValueProviderFactory.INSTANCE;
-		}
+    protected void setExternalSecondaryRecDescAndComparators(CompiledCreateIndexStatement createIndexStmt,
+            AqlMetadataProvider metadataProvider) throws AlgebricksException, AsterixException {
+        secondaryKeyFields = createIndexStmt.getKeyFields();
+        if (numSecondaryKeys != 1) {
+            throw new AsterixException(
+                    "Cannot use "
+                            + numSecondaryKeys
+                            + " fields as a key for the R-tree index. There can be only one field as a key for the R-tree index.");
+        }
+        Pair<IAType, Boolean> spatialTypePair = Index.getNonNullableKeyFieldType(secondaryKeyFields.get(0), itemType);
+        IAType spatialType = spatialTypePair.first;
+        anySecondaryKeyIsNullable = spatialTypePair.second;
+        if (spatialType == null) {
+            throw new AsterixException("Could not find field " + secondaryKeyFields.get(0) + " in the schema.");
+        }
+        int numDimensions = NonTaggedFormatUtil.getNumDimensions(spatialType.getTypeTag());
+        numNestedSecondaryKeyFields = numDimensions * 2;
+        secondaryFieldAccessEvalFactories = metadataProvider.getFormat().createMBRFactory(itemType,
+                secondaryKeyFields.get(0), numPrimaryKeys, numDimensions);
+        secondaryComparatorFactories = new IBinaryComparatorFactory[numNestedSecondaryKeyFields];
+        valueProviderFactories = new IPrimitiveValueProviderFactory[numNestedSecondaryKeyFields];
+        ISerializerDeserializer[] secondaryRecFields = new ISerializerDeserializer[numPrimaryKeys
+                + numNestedSecondaryKeyFields];
+        ITypeTraits[] secondaryTypeTraits = new ITypeTraits[numNestedSecondaryKeyFields + numPrimaryKeys];
+        IAType nestedKeyType = NonTaggedFormatUtil.getNestedSpatialType(spatialType.getTypeTag());
+        keyType = nestedKeyType.getTypeTag();
+        for (int i = 0; i < numNestedSecondaryKeyFields; i++) {
+            ISerializerDeserializer keySerde = AqlSerializerDeserializerProvider.INSTANCE
+                    .getSerializerDeserializer(nestedKeyType);
+            secondaryRecFields[i] = keySerde;
+            secondaryComparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
+                    nestedKeyType, true);
+            secondaryTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(nestedKeyType);
+            valueProviderFactories[i] = AqlPrimitiveValueProviderFactory.INSTANCE;
+        }
 
-		// Add serializers and comparators for primary index fields.
-		for (int i = 0; i < numPrimaryKeys; i++) {
-			secondaryRecFields[numNestedSecondaryKeyFields + i] = primaryRecDesc.getFields()[i];
-			secondaryTypeTraits[numNestedSecondaryKeyFields + i] = primaryRecDesc.getTypeTraits()[i];
-		}
-		secondaryRecDesc = new RecordDescriptor(secondaryRecFields, secondaryTypeTraits);
-	}
-    
+        // Add serializers and comparators for primary index fields.
+        for (int i = 0; i < numPrimaryKeys; i++) {
+            secondaryRecFields[numNestedSecondaryKeyFields + i] = primaryRecDesc.getFields()[i];
+            secondaryTypeTraits[numNestedSecondaryKeyFields + i] = primaryRecDesc.getTypeTraits()[i];
+        }
+        secondaryRecDesc = new RecordDescriptor(secondaryRecFields, secondaryTypeTraits);
+    }
+
     @Override
     public JobSpecification buildLoadingJobSpec() throws AsterixException, AlgebricksException {
         JobSpecification spec = JobSpecificationUtils.createJobSpecification();
         if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
-			Pair<ExternalDataIndexingOperatorDescriptor, AlgebricksPartitionConstraint> RIDScanOpAndConstraints;
-			AlgebricksMetaOperatorDescriptor asterixAssignOp;
-			try
-			{
-				//create external indexing scan operator
-				RIDScanOpAndConstraints = createExternalIndexingOp(spec);
-				//create assign operator
-				asterixAssignOp = createExternalAssignOp(spec);
-				AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, asterixAssignOp,
-						RIDScanOpAndConstraints.second);
-			}
-			catch(Exception e)
-			{
-				throw new AsterixException("Failed to create external index scanning and loading job");
-			}
+            Pair<ExternalDataIndexingOperatorDescriptor, AlgebricksPartitionConstraint> RIDScanOpAndConstraints;
+            AlgebricksMetaOperatorDescriptor asterixAssignOp;
+            try {
+                //create external indexing scan operator
+                RIDScanOpAndConstraints = createExternalIndexingOp(spec);
+                //create assign operator
+                asterixAssignOp = createExternalAssignOp(spec);
+                AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, asterixAssignOp,
+                        RIDScanOpAndConstraints.second);
+            } catch (Exception e) {
+                throw new AsterixException("Failed to create external index scanning and loading job");
+            }
 
-			// If any of the secondary fields are nullable, then add a select op that filters nulls.
-			AlgebricksMetaOperatorDescriptor selectOp = null;
-			if (anySecondaryKeyIsNullable) {
-				selectOp = createFilterNullsSelectOp(spec, numSecondaryKeys);
-			}
+            // If any of the secondary fields are nullable, then add a select op that filters nulls.
+            AlgebricksMetaOperatorDescriptor selectOp = null;
+            if (anySecondaryKeyIsNullable) {
+                selectOp = createFilterNullsSelectOp(spec, numSecondaryKeys);
+            }
 
-			// Create secondary RTree bulk load op.
-			AsterixStorageProperties storageProperties = propertiesProvider.getStorageProperties();
-			TreeIndexBulkLoadOperatorDescriptor secondaryBulkLoadOp = createTreeIndexBulkLoadOp(
-					spec,
-					numNestedSecondaryKeyFields,
-					new LSMRTreeDataflowHelperFactory(valueProviderFactories, RTreePolicyType.RTREE,
-	                        primaryComparatorFactories, new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
-	                        AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, new SecondaryIndexOperationTrackerProvider(
-	                                LSMRTreeIOOperationCallbackFactory.INSTANCE, dataset.getDatasetId()),
-	                        AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-	                        AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AqlMetadataProvider.proposeLinearizer(
-	                                keyType, secondaryComparatorFactories.length), storageProperties
-	                                .getBloomFilterFalsePositiveRate()), BTree.DEFAULT_FILL_FACTOR);
-			// Connect the operators.
-			// Create a hash partitioning connector
-			ExternalDatasetDetails edsd = (ExternalDatasetDetails)dataset.getDatasetDetails();
-			IBinaryHashFunctionFactory[] hashFactories = null;
-			if(edsd.getProperties().get(HDFSAdapterFactory.KEY_INPUT_FORMAT).trim().equals(HDFSAdapterFactory.INPUT_FORMAT_RC))
-			{
-				hashFactories = DatasetUtils.computeExternalDataKeysBinaryHashFunFactories(dataset, NonTaggedDataFormat.INSTANCE.getBinaryHashFunctionFactoryProvider());
-			}
-			else
-			{
-				hashFactories = DatasetUtils.computeExternalDataKeysBinaryHashFunFactories(dataset, NonTaggedDataFormat.INSTANCE.getBinaryHashFunctionFactoryProvider());
-			}	 
-			//select partitioning keys (always the first 2 after secondary keys)
-			int[] keys = new int[2];
-			keys[0] = numSecondaryKeys;
-			keys[1] = numSecondaryKeys + 1;
+            // Create secondary RTree bulk load op.
+            AsterixStorageProperties storageProperties = propertiesProvider.getStorageProperties();
+            TreeIndexBulkLoadOperatorDescriptor secondaryBulkLoadOp = createTreeIndexBulkLoadOp(
+                    spec,
+                    numNestedSecondaryKeyFields,
+                    new LSMRTreeDataflowHelperFactory(valueProviderFactories, RTreePolicyType.RTREE,
+                            primaryComparatorFactories, new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
+                            AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                            new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+                            AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                            LSMRTreeIOOperationCallbackFactory.INSTANCE, AqlMetadataProvider.proposeLinearizer(keyType,
+                                    secondaryComparatorFactories.length), storageProperties
+                                    .getBloomFilterFalsePositiveRate()), BTree.DEFAULT_FILL_FACTOR);
+            // Connect the operators.
+            // Create a hash partitioning connector
+            ExternalDatasetDetails edsd = (ExternalDatasetDetails) dataset.getDatasetDetails();
+            IBinaryHashFunctionFactory[] hashFactories = null;
+            if (edsd.getProperties().get(HDFSAdapterFactory.KEY_INPUT_FORMAT).trim()
+                    .equals(HDFSAdapterFactory.INPUT_FORMAT_RC)) {
+                hashFactories = DatasetUtils.computeExternalDataKeysBinaryHashFunFactories(dataset,
+                        NonTaggedDataFormat.INSTANCE.getBinaryHashFunctionFactoryProvider());
+            } else {
+                hashFactories = DatasetUtils.computeExternalDataKeysBinaryHashFunFactories(dataset,
+                        NonTaggedDataFormat.INSTANCE.getBinaryHashFunctionFactoryProvider());
+            }
+            //select partitioning keys (always the first 2 after secondary keys)
+            int[] keys = new int[2];
+            keys[0] = numSecondaryKeys;
+            keys[1] = numSecondaryKeys + 1;
 
-			IConnectorDescriptor hashConn = new MToNPartitioningConnectorDescriptor(spec,
-					new ExternalIndexHashPartitionComputerFactory(keys, hashFactories));
-			spec.connect(new OneToOneConnectorDescriptor(spec), RIDScanOpAndConstraints.first, 0, asterixAssignOp, 0);
-			if (anySecondaryKeyIsNullable) {
-				spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, selectOp, 0);
-				spec.connect(hashConn, selectOp, 0, secondaryBulkLoadOp, 0);
-			} else {
-				spec.connect(hashConn, asterixAssignOp, 0, secondaryBulkLoadOp, 0);
-			}
-			spec.addRoot(secondaryBulkLoadOp);
-			spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
-			return spec;
-		}
-		else
-		{
-
-        // Create dummy key provider for feeding the primary index scan. 
-        AbstractOperatorDescriptor keyProviderOp = createDummyKeyProviderOp(spec);
-
-        // Create primary index scan op.
-        BTreeSearchOperatorDescriptor primaryScanOp = createPrimaryIndexScanOp(spec);
-
-        // Assign op.
-        AlgebricksMetaOperatorDescriptor asterixAssignOp = createAssignOp(spec, primaryScanOp,
-                numNestedSecondaryKeyFields);
-
-        // If any of the secondary fields are nullable, then add a select op that filters nulls.
-        AlgebricksMetaOperatorDescriptor selectOp = null;
-        if (anySecondaryKeyIsNullable) {
-            selectOp = createFilterNullsSelectOp(spec, numNestedSecondaryKeyFields);
-        }
-
-        AsterixStorageProperties storageProperties = propertiesProvider.getStorageProperties();
-        // Create secondary RTree bulk load op.
-        TreeIndexBulkLoadOperatorDescriptor secondaryBulkLoadOp = createTreeIndexBulkLoadOp(
-                spec,
-                numNestedSecondaryKeyFields,
-                new LSMRTreeDataflowHelperFactory(valueProviderFactories, RTreePolicyType.RTREE,
-                        primaryComparatorFactories, new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
-                        AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, new SecondaryIndexOperationTrackerProvider(
-                                LSMRTreeIOOperationCallbackFactory.INSTANCE, dataset.getDatasetId()),
-                        AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                        AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AqlMetadataProvider.proposeLinearizer(
-                                keyType, secondaryComparatorFactories.length), storageProperties
-                                .getBloomFilterFalsePositiveRate()), BTree.DEFAULT_FILL_FACTOR);
-
-        // Connect the operators.
-        spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primaryScanOp, 0);
-        spec.connect(new OneToOneConnectorDescriptor(spec), primaryScanOp, 0, asterixAssignOp, 0);
-        if (anySecondaryKeyIsNullable) {
-            spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, selectOp, 0);
-            spec.connect(new OneToOneConnectorDescriptor(spec), selectOp, 0, secondaryBulkLoadOp, 0);
+            IConnectorDescriptor hashConn = new MToNPartitioningConnectorDescriptor(spec,
+                    new ExternalIndexHashPartitionComputerFactory(keys, hashFactories));
+            spec.connect(new OneToOneConnectorDescriptor(spec), RIDScanOpAndConstraints.first, 0, asterixAssignOp, 0);
+            if (anySecondaryKeyIsNullable) {
+                spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, selectOp, 0);
+                spec.connect(hashConn, selectOp, 0, secondaryBulkLoadOp, 0);
+            } else {
+                spec.connect(hashConn, asterixAssignOp, 0, secondaryBulkLoadOp, 0);
+            }
+            spec.addRoot(secondaryBulkLoadOp);
+            spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+            return spec;
         } else {
-            spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, secondaryBulkLoadOp, 0);
+
+            // Create dummy key provider for feeding the primary index scan. 
+            AbstractOperatorDescriptor keyProviderOp = createDummyKeyProviderOp(spec);
+
+            // Create primary index scan op.
+            BTreeSearchOperatorDescriptor primaryScanOp = createPrimaryIndexScanOp(spec);
+
+            // Assign op.
+            AlgebricksMetaOperatorDescriptor asterixAssignOp = createAssignOp(spec, primaryScanOp,
+                    numNestedSecondaryKeyFields);
+
+            // If any of the secondary fields are nullable, then add a select op that filters nulls.
+            AlgebricksMetaOperatorDescriptor selectOp = null;
+            if (anySecondaryKeyIsNullable) {
+                selectOp = createFilterNullsSelectOp(spec, numNestedSecondaryKeyFields);
+            }
+
+            AsterixStorageProperties storageProperties = propertiesProvider.getStorageProperties();
+            // Create secondary RTree bulk load op.
+            TreeIndexBulkLoadOperatorDescriptor secondaryBulkLoadOp = createTreeIndexBulkLoadOp(
+                    spec,
+                    numNestedSecondaryKeyFields,
+                    new LSMRTreeDataflowHelperFactory(valueProviderFactories, RTreePolicyType.RTREE,
+                            primaryComparatorFactories, new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
+                            AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                            new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+                            AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                            LSMRTreeIOOperationCallbackFactory.INSTANCE, AqlMetadataProvider.proposeLinearizer(keyType,
+                                    secondaryComparatorFactories.length), storageProperties
+                                    .getBloomFilterFalsePositiveRate()), BTree.DEFAULT_FILL_FACTOR);
+
+            // Connect the operators.
+            spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primaryScanOp, 0);
+            spec.connect(new OneToOneConnectorDescriptor(spec), primaryScanOp, 0, asterixAssignOp, 0);
+            if (anySecondaryKeyIsNullable) {
+                spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, selectOp, 0);
+                spec.connect(new OneToOneConnectorDescriptor(spec), selectOp, 0, secondaryBulkLoadOp, 0);
+            } else {
+                spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, secondaryBulkLoadOp, 0);
+            }
+            spec.addRoot(secondaryBulkLoadOp);
+            spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+            return spec;
         }
-        spec.addRoot(secondaryBulkLoadOp);
-        spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
-        return spec;
-		}
     }
 }
-
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/BaseOperationTracker.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/BaseOperationTracker.java
index 3610478..7e7ffd9 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/BaseOperationTracker.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/BaseOperationTracker.java
@@ -17,52 +17,18 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
 import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallback;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMOperationType;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallback;
 
 public class BaseOperationTracker implements ILSMOperationTracker {
 
     protected final DatasetLifecycleManager datasetLifecycleManager;
-    protected final ILSMIOOperationCallback ioOpCallback;
-    protected long lastLSN;
-    protected long firstLSN;
     protected final int datasetID;
 
-    public BaseOperationTracker(DatasetLifecycleManager datasetLifecycleManager,
-            ILSMIOOperationCallbackFactory ioOpCallbackFactory, int datasetID) {
+    public BaseOperationTracker(DatasetLifecycleManager datasetLifecycleManager, int datasetID) {
         this.datasetLifecycleManager = datasetLifecycleManager;
-        this.ioOpCallback = ioOpCallbackFactory == null ? NoOpIOOperationCallback.INSTANCE : ioOpCallbackFactory
-                .createIOOperationCallback(this);
         this.datasetID = datasetID;
-        resetLSNs();
-    }
-
-    public ILSMIOOperationCallback getIOOperationCallback() {
-        return ioOpCallback;
-    }
-
-    public long getLastLSN() {
-        return lastLSN;
-    }
-
-    public long getFirstLSN() {
-        return firstLSN;
-    }
-
-    public void updateLastLSN(long lastLSN) {
-        if (firstLSN == -1) {
-            firstLSN = lastLSN;
-        }
-        this.lastLSN = Math.max(this.lastLSN, lastLSN);
-    }
-
-    public void resetLSNs() {
-        lastLSN = -1;
-        firstLSN = -1;
     }
 
     @Override
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/DatasetLifecycleManager.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/DatasetLifecycleManager.java
index 617b6ff..6335fb1 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/DatasetLifecycleManager.java
@@ -26,7 +26,6 @@
 
 import edu.uci.ics.asterix.common.api.ILocalResourceMetadata;
 import edu.uci.ics.asterix.common.config.AsterixStorageProperties;
-import edu.uci.ics.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.lifecycle.ILifeCycleComponent;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndex;
@@ -253,7 +252,7 @@
         if (iInfo.isOpen) {
             ILSMIndexAccessor accessor = (ILSMIndexAccessor) iInfo.index.createAccessor(NoOpOperationCallback.INSTANCE,
                     NoOpOperationCallback.INSTANCE);
-            accessor.scheduleFlush(((BaseOperationTracker) iInfo.index.getOperationTracker()).getIOOperationCallback());
+            accessor.scheduleFlush(iInfo.index.getIOOperationCallback());
         }
         // Wait for the above flush op.
         while (dsInfo.numActiveIOOps > 0) {
@@ -316,8 +315,7 @@
         synchronized (datasetOpTrackers) {
             ILSMOperationTracker opTracker = datasetOpTrackers.get(datasetID);
             if (opTracker == null) {
-                opTracker = new PrimaryIndexOperationTracker(this, datasetID,
-                        LSMBTreeIOOperationCallbackFactory.INSTANCE);
+                opTracker = new PrimaryIndexOperationTracker(this, datasetID);
                 datasetOpTrackers.put(datasetID, opTracker);
             }
 
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/PrimaryIndexOperationTracker.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/PrimaryIndexOperationTracker.java
index 2ed4b0ec..92a33cd 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/PrimaryIndexOperationTracker.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -22,7 +22,6 @@
 import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
 import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallback;
 import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexInternal;
@@ -33,9 +32,8 @@
     // Number of active operations on an ILSMIndex instance.
     private final AtomicInteger numActiveOperations;
 
-    public PrimaryIndexOperationTracker(DatasetLifecycleManager datasetLifecycleManager, int datasetID,
-            ILSMIOOperationCallbackFactory ioOpCallbackFactory) {
-        super(datasetLifecycleManager, ioOpCallbackFactory, datasetID);
+    public PrimaryIndexOperationTracker(DatasetLifecycleManager datasetLifecycleManager, int datasetID) {
+        super(datasetLifecycleManager, datasetID);
         this.numActiveOperations = new AtomicInteger();
     }
 
@@ -88,8 +86,7 @@
                 for (ILSMIndex lsmIndex : indexes) {
                     ILSMIndexAccessor accessor = (ILSMIndexAccessor) lsmIndex.createAccessor(
                             NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
-                    accessor.scheduleFlush(((BaseOperationTracker) lsmIndex.getOperationTracker())
-                            .getIOOperationCallback());
+                    accessor.scheduleFlush(lsmIndex.getIOOperationCallback());
                 }
             }
         }
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
index da08cd8..4ebd921 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
@@ -17,7 +17,6 @@
 
 import java.util.List;
 
-import edu.uci.ics.asterix.common.context.BaseOperationTracker;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexMetaDataFrame;
@@ -29,10 +28,11 @@
 
 public abstract class AbstractLSMIOOperationCallback implements ILSMIOOperationCallback {
 
-    protected final BaseOperationTracker opTracker;
+    protected long firstLSN;
+    protected long lastLSN;
 
-    public AbstractLSMIOOperationCallback(BaseOperationTracker opTracker) {
-        this.opTracker = opTracker;
+    public AbstractLSMIOOperationCallback() {
+        resetLSNs();
     }
 
     @Override
@@ -42,7 +42,7 @@
 
     @Override
     public void afterFinalize(ILSMComponent newComponent) {
-        opTracker.resetLSNs();
+        resetLSNs();
     }
 
     public abstract long getComponentLSN(List<ILSMComponent> oldComponents) throws HyracksDataException;
@@ -80,4 +80,25 @@
             bufferCache.unpin(metadataPage);
         }
     }
+
+    protected void resetLSNs() {
+        firstLSN = -1;
+        lastLSN = -1;
+    }
+    
+    public void updateLastLSN(long lastLSN) {
+        if (firstLSN == -1) {
+            firstLSN = lastLSN;
+        }
+        this.lastLSN = Math.max(this.lastLSN, lastLSN);
+    }
+    
+    public long getFirstLSN() {
+        return firstLSN;
+    }
+    
+    public long getLastLSN() {
+        return lastLSN;
+    }
+
 }
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java
index b6025cb..d2eb5ba 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java
@@ -25,8 +25,8 @@
 
 public class LSMBTreeIOOperationCallback extends AbstractLSMIOOperationCallback {
 
-    public LSMBTreeIOOperationCallback(BaseOperationTracker opTracker) {
-        super(opTracker);
+    public LSMBTreeIOOperationCallback() {
+        super();
     }
 
     @Override
@@ -42,7 +42,7 @@
     public long getComponentLSN(List<ILSMComponent> diskComponents) throws HyracksDataException {
         if (diskComponents == null) {
             // Implies a flush IO operation.
-            return opTracker.getLastLSN();
+            return lastLSN;
         }
         // Get max LSN from the diskComponents. Implies a merge IO operation or Recovery operation.
         long maxLSN = -1;
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java
index 92ba9ec..1028015 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java
@@ -15,7 +15,6 @@
 
 package edu.uci.ics.asterix.common.ioopcallbacks;
 
-import edu.uci.ics.asterix.common.context.BaseOperationTracker;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
 
@@ -29,7 +28,7 @@
     }
 
     @Override
-    public ILSMIOOperationCallback createIOOperationCallback(Object syncObj) {
-        return new LSMBTreeIOOperationCallback((BaseOperationTracker) syncObj);
+    public ILSMIOOperationCallback createIOOperationCallback() {
+        return new LSMBTreeIOOperationCallback();
     }
 }
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
index 4f99ae6..60d4af1 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
@@ -17,15 +17,14 @@
 
 import java.util.List;
 
-import edu.uci.ics.asterix.common.context.BaseOperationTracker;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.impls.LSMInvertedIndexDiskComponent;
 
 public class LSMInvertedIndexIOOperationCallback extends AbstractLSMIOOperationCallback {
 
-    public LSMInvertedIndexIOOperationCallback(BaseOperationTracker opTracker) {
-        super(opTracker);
+    public LSMInvertedIndexIOOperationCallback() {
+        super();
     }
 
     @Override
@@ -41,7 +40,7 @@
     public long getComponentLSN(List<ILSMComponent> diskComponents) throws HyracksDataException {
         if (diskComponents == null) {
             // Implies a flush IO operation.
-            return opTracker.getLastLSN();
+            return lastLSN;
         }
         // Get max LSN from the diskComponents. Implies a merge IO operation or Recovery operation.
         long maxLSN = -1;
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java
index c20cdb3..5dc0c0b 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java
@@ -15,7 +15,6 @@
 
 package edu.uci.ics.asterix.common.ioopcallbacks;
 
-import edu.uci.ics.asterix.common.context.BaseOperationTracker;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
 
@@ -29,7 +28,7 @@
     }
 
     @Override
-    public ILSMIOOperationCallback createIOOperationCallback(Object syncObj) {
-        return new LSMInvertedIndexIOOperationCallback((BaseOperationTracker) syncObj);
+    public ILSMIOOperationCallback createIOOperationCallback() {
+        return new LSMInvertedIndexIOOperationCallback();
     }
 }
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java
index cd7b7a0..770d514 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java
@@ -17,15 +17,14 @@
 
 import java.util.List;
 
-import edu.uci.ics.asterix.common.context.BaseOperationTracker;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
 import edu.uci.ics.hyracks.storage.am.lsm.rtree.impls.LSMRTreeDiskComponent;
 
 public class LSMRTreeIOOperationCallback extends AbstractLSMIOOperationCallback {
 
-    public LSMRTreeIOOperationCallback(BaseOperationTracker opTracker) {
-        super(opTracker);
+    public LSMRTreeIOOperationCallback() {
+        super();
     }
 
     @Override
@@ -42,7 +41,7 @@
     public long getComponentLSN(List<ILSMComponent> diskComponents) throws HyracksDataException {
         if (diskComponents == null) {
             // Implies a flush IO operation.
-            return opTracker.getLastLSN();
+            return lastLSN;
         }
         // Get max LSN from the diskComponents. Implies a merge IO operation or Recovery operation.
         long maxLSN = -1;
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java
index 0cd2539..841a1d5 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java
@@ -15,7 +15,6 @@
 
 package edu.uci.ics.asterix.common.ioopcallbacks;
 
-import edu.uci.ics.asterix.common.context.BaseOperationTracker;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
 
@@ -29,7 +28,7 @@
     }
 
     @Override
-    public ILSMIOOperationCallback createIOOperationCallback(Object syncObj) {
-        return new LSMRTreeIOOperationCallback((BaseOperationTracker) syncObj);
+    public ILSMIOOperationCallback createIOOperationCallback() {
+        return new LSMRTreeIOOperationCallback();
     }
 }
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java
index 2f522b9..2638c9f 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java
@@ -18,7 +18,6 @@
 
 import edu.uci.ics.hyracks.api.io.IIOManager;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManager;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackProvider;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
@@ -44,14 +43,6 @@
 
     public ILSMOperationTracker getLSMBTreeOperationTracker(int datasetID);
 
-    public ILSMIOOperationCallbackProvider getLSMBTreeIOOperationCallbackProvider(boolean isPrimary);
-
-    public ILSMIOOperationCallbackProvider getLSMRTreeIOOperationCallbackProvider();
-
-    public ILSMIOOperationCallbackProvider getLSMInvertedIndexIOOperationCallbackProvider();
-
-    public ILSMIOOperationCallbackProvider getNoOpIOOperationCallbackProvider();
-
     public ILSMIOOperationScheduler getLSMIOScheduler();
 
     public ILocalResourceRepository getLocalResourceRepository();
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
index ed89cfc..0c64497 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -336,11 +336,12 @@
         AsterixRuntimeComponentsProvider rtcProvider = AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER;
         ILSMOperationTracker opTracker = index.isPrimaryIndex() ? runtimeContext.getLSMBTreeOperationTracker(index
                 .getDatasetId().getId()) : new BaseOperationTracker((DatasetLifecycleManager) indexLifecycleManager,
-                LSMBTreeIOOperationCallbackFactory.INSTANCE, index.getDatasetId().getId());
+                index.getDatasetId().getId());
         if (create) {
             lsmBtree = LSMBTreeUtils.createLSMTree(virtualBufferCaches, file, bufferCache, fileMapProvider, typeTraits,
                     comparatorFactories, bloomFilterKeyFields, runtimeContext.getBloomFilterFalsePositiveRate(),
-                    runtimeContext.getLSMMergePolicy(), opTracker, runtimeContext.getLSMIOScheduler(), rtcProvider);
+                    runtimeContext.getLSMMergePolicy(), opTracker, runtimeContext.getLSMIOScheduler(),
+                    LSMBTreeIOOperationCallbackFactory.INSTANCE.createIOOperationCallback());
             lsmBtree.create();
             resourceID = runtimeContext.getResourceIdFactory().createId();
             ILocalResourceMetadata localResourceMetadata = new LSMBTreeLocalResourceMetadata(typeTraits,
@@ -359,7 +360,7 @@
                         typeTraits, comparatorFactories, bloomFilterKeyFields,
                         runtimeContext.getBloomFilterFalsePositiveRate(), runtimeContext.getLSMMergePolicy(),
                         opTracker, runtimeContext.getLSMIOScheduler(),
-                        AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER);
+                        LSMBTreeIOOperationCallbackFactory.INSTANCE.createIOOperationCallback());
                 indexLifecycleManager.register(resourceID, lsmBtree);
             }
         }
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
index 078b340d..de94045 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
@@ -257,7 +257,7 @@
     public IResultSerializerFactoryProvider getResultSerializerFactoryProvider() {
         return resultSerializerFactoryProvider;
     }
-    
+
     public String getPropertyValue(String propertyName) {
         return config.get(propertyName);
     }
@@ -275,12 +275,12 @@
     }
 
     public static boolean isOptimizeExternalIndexes() {
-		return optimizeExternalIndexes;
-	}
-    
+        return optimizeExternalIndexes;
+    }
+
     public static void setOptimizeExternalIndexes(boolean optimizeExternalIndexes) {
-		AqlMetadataProvider.optimizeExternalIndexes = optimizeExternalIndexes;
-	}
+        AqlMetadataProvider.optimizeExternalIndexes = optimizeExternalIndexes;
+    }
 
     @Override
     public AqlDataSource findDataSource(AqlSourceId id) throws AlgebricksException {
@@ -419,215 +419,206 @@
 
         return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(dataScanner, constraint);
     }
-    
+
     @SuppressWarnings("rawtypes")
-	public Pair<ExternalDataIndexingOperatorDescriptor, AlgebricksPartitionConstraint> buildExternalDataIndexingRuntime(
-			JobSpecification jobSpec, IAType itemType, Dataset dataset, IDataFormat format)
-					throws AlgebricksException {
-		IGenericDatasetAdapterFactory adapterFactory;
-		IDatasourceAdapter adapter;
-		String adapterName;
-		DatasourceAdapter adapterEntity;
-		String adapterFactoryClassname;
-		ExternalDatasetDetails datasetDetails = null;
-		try {
-			datasetDetails = (ExternalDatasetDetails) dataset.getDatasetDetails();
-			adapterName = datasetDetails.getAdapter();
-			adapterEntity = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, MetadataConstants.METADATA_DATAVERSE_NAME,
-					adapterName);
-			if (adapterEntity != null) {
-				adapterFactoryClassname = adapterEntity.getClassname();
-				adapterFactory = (IGenericDatasetAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
-			} else {
-				adapterFactoryClassname = adapterFactoryMapping.get(adapterName);
-				if (adapterFactoryClassname == null) {
-					throw new AlgebricksException(" Unknown adapter :" + adapterName);
-				}
-				adapterFactory = (IGenericDatasetAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
-			}
+    public Pair<ExternalDataIndexingOperatorDescriptor, AlgebricksPartitionConstraint> buildExternalDataIndexingRuntime(
+            JobSpecification jobSpec, IAType itemType, Dataset dataset, IDataFormat format) throws AlgebricksException {
+        IGenericDatasetAdapterFactory adapterFactory;
+        IDatasourceAdapter adapter;
+        String adapterName;
+        DatasourceAdapter adapterEntity;
+        String adapterFactoryClassname;
+        ExternalDatasetDetails datasetDetails = null;
+        try {
+            datasetDetails = (ExternalDatasetDetails) dataset.getDatasetDetails();
+            adapterName = datasetDetails.getAdapter();
+            adapterEntity = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, MetadataConstants.METADATA_DATAVERSE_NAME,
+                    adapterName);
+            if (adapterEntity != null) {
+                adapterFactoryClassname = adapterEntity.getClassname();
+                adapterFactory = (IGenericDatasetAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
+            } else {
+                adapterFactoryClassname = adapterFactoryMapping.get(adapterName);
+                if (adapterFactoryClassname == null) {
+                    throw new AlgebricksException(" Unknown adapter :" + adapterName);
+                }
+                adapterFactory = (IGenericDatasetAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
+            }
 
-			adapter = ((IGenericDatasetAdapterFactory) adapterFactory).createIndexingAdapter(
-					wrapProperties(datasetDetails.getProperties()), itemType, null);
-		} catch (AlgebricksException ae) {
-			throw ae;
-		} catch (Exception e) {
-			e.printStackTrace();
-			throw new AlgebricksException("Unable to create adapter " + e);
-		}
-		if (!(adapter.getAdapterType().equals(IDatasourceAdapter.AdapterType.READ) || adapter.getAdapterType().equals(
-				IDatasourceAdapter.AdapterType.READ_WRITE))) {
-			throw new AlgebricksException("external dataset adapter does not support read operation");
-		}
-		ARecordType rt = (ARecordType) itemType;
-		ISerializerDeserializer payloadSerde = format.getSerdeProvider().getSerializerDeserializer(itemType);
-		RecordDescriptor indexerDesc = new RecordDescriptor(new ISerializerDeserializer[] { payloadSerde });
-		ExternalDataIndexingOperatorDescriptor dataIndexScanner = null;
-		List<ExternalFile> files = null;
-		HashMap<String, Integer> filesNumbers = null;
-		if(optimizeExternalIndexes)
-		{
-			try {
-				files = MetadataManager.INSTANCE.getDatasetExternalFiles(mdTxnCtx, dataset);
-			} catch (MetadataException e) {
-				e.printStackTrace();
-				throw new AlgebricksException("Unable to get list of external files from metadata " + e);
-			}
-			
-			filesNumbers = new HashMap<String,Integer>();
-			for(int i=0; i< files.size(); i++)
-			{
-				filesNumbers.put(files.get(i).getFileName(), files.get(i).getFileNumber());
-			}
-			
-			dataIndexScanner = new ExternalDataIndexingOperatorDescriptor(jobSpec,
-					wrapPropertiesEmpty(datasetDetails.getProperties()), rt, indexerDesc, adapterFactory,filesNumbers);
-		}
-		else
-		{
-		dataIndexScanner = new ExternalDataIndexingOperatorDescriptor(jobSpec,
-				wrapPropertiesEmpty(datasetDetails.getProperties()), rt, indexerDesc, adapterFactory,filesNumbers);
-		}
-		AlgebricksPartitionConstraint constraint;
-		try {
-			constraint = adapter.getPartitionConstraint();
-		} catch (Exception e) {
-			throw new AlgebricksException(e);
-		}
-		return new Pair<ExternalDataIndexingOperatorDescriptor, AlgebricksPartitionConstraint>(dataIndexScanner, constraint);
-	}
-    
-    public ArrayList<ExternalFile> getExternalDatasetFiles(Dataset dataset) throws AlgebricksException
-	{
-    	ArrayList<ExternalFile> files = new ArrayList<ExternalFile>();
-		if(dataset.getDatasetType() != DatasetType.EXTERNAL)
-		{
-			throw new AlgebricksException("Can only get external dataset files");
-		}
-		ExternalDatasetDetails datasetDetails = (ExternalDatasetDetails)dataset.getDatasetDetails();
-		IGenericDatasetAdapterFactory adapterFactory;
-		IDatasourceAdapter adapter;
-		String adapterName;
-		DatasourceAdapter adapterEntity;
-		String adapterFactoryClassname;
-		try {
-			adapterName = datasetDetails.getAdapter();
-			adapterEntity = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, MetadataConstants.METADATA_DATAVERSE_NAME,
-					adapterName);
-			if (adapterEntity != null) {
-				adapterFactoryClassname = adapterEntity.getClassname();
-				adapterFactory = (IGenericDatasetAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
-			} else {
-				adapterFactoryClassname = adapterFactoryMapping.get(adapterName);
-				if (adapterFactoryClassname == null) {
-					throw new AlgebricksException(" Unknown adapter :" + adapterName);
-				}
-				adapterFactory = (IGenericDatasetAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
-			}
+            adapter = ((IGenericDatasetAdapterFactory) adapterFactory).createIndexingAdapter(
+                    wrapProperties(datasetDetails.getProperties()), itemType, null);
+        } catch (AlgebricksException ae) {
+            throw ae;
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw new AlgebricksException("Unable to create adapter " + e);
+        }
+        if (!(adapter.getAdapterType().equals(IDatasourceAdapter.AdapterType.READ) || adapter.getAdapterType().equals(
+                IDatasourceAdapter.AdapterType.READ_WRITE))) {
+            throw new AlgebricksException("external dataset adapter does not support read operation");
+        }
+        ARecordType rt = (ARecordType) itemType;
+        ISerializerDeserializer payloadSerde = format.getSerdeProvider().getSerializerDeserializer(itemType);
+        RecordDescriptor indexerDesc = new RecordDescriptor(new ISerializerDeserializer[] { payloadSerde });
+        ExternalDataIndexingOperatorDescriptor dataIndexScanner = null;
+        List<ExternalFile> files = null;
+        HashMap<String, Integer> filesNumbers = null;
+        if (optimizeExternalIndexes) {
+            try {
+                files = MetadataManager.INSTANCE.getDatasetExternalFiles(mdTxnCtx, dataset);
+            } catch (MetadataException e) {
+                e.printStackTrace();
+                throw new AlgebricksException("Unable to get list of external files from metadata " + e);
+            }
 
-			adapter = ((IGenericDatasetAdapterFactory) adapterFactory).createAdapter(
-					wrapProperties(datasetDetails.getProperties()), null);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			throw new AlgebricksException("Unable to create adapter " + e);
-		}
-		
-		try {
-			ArrayList<FileStatus> fileStatuses = ExternalDataFilesMetadataProvider.getHDFSFileStatus((AbstractDatasourceAdapter) adapter);
-			for(int i=0; i<fileStatuses.size(); i++)
-			{
-				files.add(new ExternalFile(dataset.getDataverseName(), dataset.getDatasetName(), new Date(fileStatuses.get(i).getModificationTime()),
-						fileStatuses.get(i).getLen(),
-						fileStatuses.get(i).getPath().toUri().getPath(),
-						i));
-			}
-			return files;
-		} catch (IOException e) {
-			e.printStackTrace();
-			throw new AlgebricksException("Unable to get list of HDFS files " + e);
-		}
-	}
+            filesNumbers = new HashMap<String, Integer>();
+            for (int i = 0; i < files.size(); i++) {
+                filesNumbers.put(files.get(i).getFileName(), files.get(i).getFileNumber());
+            }
 
-	@SuppressWarnings("rawtypes")
-	public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildExternalDataAccesByRIDRuntime(
-			JobSpecification jobSpec, Dataset dataset, Index secondaryIndex)
-					throws AlgebricksException {
-		IAType itemType = null;
-		try {
-			itemType = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataset.getDataverseName(), dataset.getItemTypeName()).getDatatype();
-		} catch (MetadataException e) {
-			e.printStackTrace();
-			throw new AlgebricksException("Unable to get item type from metadata " + e);
-		}
-		if (itemType.getTypeTag() != ATypeTag.RECORD) {
-			throw new AlgebricksException("Can only scan datasets of records.");
-		}
+            dataIndexScanner = new ExternalDataIndexingOperatorDescriptor(jobSpec,
+                    wrapPropertiesEmpty(datasetDetails.getProperties()), rt, indexerDesc, adapterFactory, filesNumbers);
+        } else {
+            dataIndexScanner = new ExternalDataIndexingOperatorDescriptor(jobSpec,
+                    wrapPropertiesEmpty(datasetDetails.getProperties()), rt, indexerDesc, adapterFactory, filesNumbers);
+        }
+        AlgebricksPartitionConstraint constraint;
+        try {
+            constraint = adapter.getPartitionConstraint();
+        } catch (Exception e) {
+            throw new AlgebricksException(e);
+        }
+        return new Pair<ExternalDataIndexingOperatorDescriptor, AlgebricksPartitionConstraint>(dataIndexScanner,
+                constraint);
+    }
 
-		ExternalDatasetDetails datasetDetails = (ExternalDatasetDetails)dataset.getDatasetDetails();
-		IGenericDatasetAdapterFactory adapterFactory;
-		IDatasourceAdapter adapter;
-		String adapterName;
-		DatasourceAdapter adapterEntity;
-		String adapterFactoryClassname;
-		try {
-			adapterName = datasetDetails.getAdapter();
-			adapterEntity = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, MetadataConstants.METADATA_DATAVERSE_NAME,
-					adapterName);
-			if (adapterEntity != null) {
-				adapterFactoryClassname = adapterEntity.getClassname();
-				adapterFactory = (IGenericDatasetAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
-			} else {
-				adapterFactoryClassname = adapterFactoryMapping.get(adapterName);
-				if (adapterFactoryClassname == null) {
-					throw new AlgebricksException(" Unknown adapter :" + adapterName);
-				}
-				adapterFactory = (IGenericDatasetAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
-			}
+    public ArrayList<ExternalFile> getExternalDatasetFiles(Dataset dataset) throws AlgebricksException {
+        ArrayList<ExternalFile> files = new ArrayList<ExternalFile>();
+        if (dataset.getDatasetType() != DatasetType.EXTERNAL) {
+            throw new AlgebricksException("Can only get external dataset files");
+        }
+        ExternalDatasetDetails datasetDetails = (ExternalDatasetDetails) dataset.getDatasetDetails();
+        IGenericDatasetAdapterFactory adapterFactory;
+        IDatasourceAdapter adapter;
+        String adapterName;
+        DatasourceAdapter adapterEntity;
+        String adapterFactoryClassname;
+        try {
+            adapterName = datasetDetails.getAdapter();
+            adapterEntity = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, MetadataConstants.METADATA_DATAVERSE_NAME,
+                    adapterName);
+            if (adapterEntity != null) {
+                adapterFactoryClassname = adapterEntity.getClassname();
+                adapterFactory = (IGenericDatasetAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
+            } else {
+                adapterFactoryClassname = adapterFactoryMapping.get(adapterName);
+                if (adapterFactoryClassname == null) {
+                    throw new AlgebricksException(" Unknown adapter :" + adapterName);
+                }
+                adapterFactory = (IGenericDatasetAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
+            }
 
-			adapter = ((IGenericDatasetAdapterFactory) adapterFactory).createAdapter(
-					wrapProperties(datasetDetails.getProperties()), itemType);
-		} catch (AlgebricksException ae) {
-			throw ae;
-		} catch (Exception e) {
-			e.printStackTrace();
-			throw new AlgebricksException("Unable to create adapter " + e);
-		}
+            adapter = ((IGenericDatasetAdapterFactory) adapterFactory).createAdapter(
+                    wrapProperties(datasetDetails.getProperties()), null);
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw new AlgebricksException("Unable to create adapter " + e);
+        }
 
-		if (!(adapter.getAdapterType().equals(IDatasourceAdapter.AdapterType.READ) || adapter.getAdapterType().equals(
-				IDatasourceAdapter.AdapterType.READ_WRITE))) {
-			throw new AlgebricksException("external dataset adapter does not support read operation");
-		}
-		IDataFormat format = NonTaggedDataFormat.INSTANCE;
-		ISerializerDeserializer payloadSerde = format.getSerdeProvider().getSerializerDeserializer(itemType);
-		RecordDescriptor outRecDesc = new RecordDescriptor(new ISerializerDeserializer[] { payloadSerde });
+        try {
+            ArrayList<FileStatus> fileStatuses = ExternalDataFilesMetadataProvider
+                    .getHDFSFileStatus((AbstractDatasourceAdapter) adapter);
+            for (int i = 0; i < fileStatuses.size(); i++) {
+                files.add(new ExternalFile(dataset.getDataverseName(), dataset.getDatasetName(), new Date(fileStatuses
+                        .get(i).getModificationTime()), fileStatuses.get(i).getLen(), fileStatuses.get(i).getPath()
+                        .toUri().getPath(), i));
+            }
+            return files;
+        } catch (IOException e) {
+            e.printStackTrace();
+            throw new AlgebricksException("Unable to get list of HDFS files " + e);
+        }
+    }
 
-		ExternalDataAccessByRIDOperatorDescriptor dataAccessOperator = null;
-		if(optimizeExternalIndexes)
-		{
-			//create the hashmap
-			List<ExternalFile> files=null;
-			try {
-				files = MetadataManager.INSTANCE.getDatasetExternalFiles(mdTxnCtx, dataset);
-			} catch (MetadataException e) {
-				e.printStackTrace();
-				throw new AlgebricksException("Couldn't get file names for access by optimized RIDs",e);
-			}
-			HashMap<Integer, String> filesMapping = new HashMap<Integer, String>();
-			for(int i=0; i < files.size(); i++)
-			{
-				filesMapping.put(files.get(i).getFileNumber(), files.get(i).getFileName());
-			}
-			dataAccessOperator = new ExternalDataAccessByRIDOperatorDescriptor(jobSpec, wrapPropertiesEmpty(datasetDetails.getProperties()),
-					itemType, outRecDesc, adapterFactory, filesMapping);
-		}
-		else
-		{
-			dataAccessOperator = new ExternalDataAccessByRIDOperatorDescriptor(jobSpec, wrapPropertiesEmpty(datasetDetails.getProperties()),
-					itemType, outRecDesc, adapterFactory, null);
-		}
-		Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraints = splitProviderAndPartitionConstraintsForExternalDataset(dataset.getDataverseName(),dataset.getDatasetName(),secondaryIndex.getIndexName());
-		return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(dataAccessOperator, splitsAndConstraints.second);
-	}
+    @SuppressWarnings("rawtypes")
+    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildExternalDataAccesByRIDRuntime(
+            JobSpecification jobSpec, Dataset dataset, Index secondaryIndex) throws AlgebricksException {
+        IAType itemType = null;
+        try {
+            itemType = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataset.getDataverseName(),
+                    dataset.getItemTypeName()).getDatatype();
+        } catch (MetadataException e) {
+            e.printStackTrace();
+            throw new AlgebricksException("Unable to get item type from metadata " + e);
+        }
+        if (itemType.getTypeTag() != ATypeTag.RECORD) {
+            throw new AlgebricksException("Can only scan datasets of records.");
+        }
+
+        ExternalDatasetDetails datasetDetails = (ExternalDatasetDetails) dataset.getDatasetDetails();
+        IGenericDatasetAdapterFactory adapterFactory;
+        IDatasourceAdapter adapter;
+        String adapterName;
+        DatasourceAdapter adapterEntity;
+        String adapterFactoryClassname;
+        try {
+            adapterName = datasetDetails.getAdapter();
+            adapterEntity = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, MetadataConstants.METADATA_DATAVERSE_NAME,
+                    adapterName);
+            if (adapterEntity != null) {
+                adapterFactoryClassname = adapterEntity.getClassname();
+                adapterFactory = (IGenericDatasetAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
+            } else {
+                adapterFactoryClassname = adapterFactoryMapping.get(adapterName);
+                if (adapterFactoryClassname == null) {
+                    throw new AlgebricksException(" Unknown adapter :" + adapterName);
+                }
+                adapterFactory = (IGenericDatasetAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
+            }
+
+            adapter = ((IGenericDatasetAdapterFactory) adapterFactory).createAdapter(
+                    wrapProperties(datasetDetails.getProperties()), itemType);
+        } catch (AlgebricksException ae) {
+            throw ae;
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw new AlgebricksException("Unable to create adapter " + e);
+        }
+
+        if (!(adapter.getAdapterType().equals(IDatasourceAdapter.AdapterType.READ) || adapter.getAdapterType().equals(
+                IDatasourceAdapter.AdapterType.READ_WRITE))) {
+            throw new AlgebricksException("external dataset adapter does not support read operation");
+        }
+        IDataFormat format = NonTaggedDataFormat.INSTANCE;
+        ISerializerDeserializer payloadSerde = format.getSerdeProvider().getSerializerDeserializer(itemType);
+        RecordDescriptor outRecDesc = new RecordDescriptor(new ISerializerDeserializer[] { payloadSerde });
+
+        ExternalDataAccessByRIDOperatorDescriptor dataAccessOperator = null;
+        if (optimizeExternalIndexes) {
+            //create the hashmap
+            List<ExternalFile> files = null;
+            try {
+                files = MetadataManager.INSTANCE.getDatasetExternalFiles(mdTxnCtx, dataset);
+            } catch (MetadataException e) {
+                e.printStackTrace();
+                throw new AlgebricksException("Couldn't get file names for access by optimized RIDs", e);
+            }
+            HashMap<Integer, String> filesMapping = new HashMap<Integer, String>();
+            for (int i = 0; i < files.size(); i++) {
+                filesMapping.put(files.get(i).getFileNumber(), files.get(i).getFileName());
+            }
+            dataAccessOperator = new ExternalDataAccessByRIDOperatorDescriptor(jobSpec,
+                    wrapPropertiesEmpty(datasetDetails.getProperties()), itemType, outRecDesc, adapterFactory,
+                    filesMapping);
+        } else {
+            dataAccessOperator = new ExternalDataAccessByRIDOperatorDescriptor(jobSpec,
+                    wrapPropertiesEmpty(datasetDetails.getProperties()), itemType, outRecDesc, adapterFactory, null);
+        }
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraints = splitProviderAndPartitionConstraintsForExternalDataset(
+                dataset.getDataverseName(), dataset.getDatasetName(), secondaryIndex.getIndexName());
+        return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(dataAccessOperator,
+                splitsAndConstraints.second);
+    }
 
     @SuppressWarnings("rawtypes")
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildScannerRuntime(JobSpecification jobSpec,
@@ -730,142 +721,141 @@
     }
 
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildBtreeRuntime(JobSpecification jobSpec,
-			List<LogicalVariable> outputVars, IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv,
-			JobGenContext context, boolean retainInput, Dataset dataset, String indexName, int[] lowKeyFields,
-			int[] highKeyFields, boolean lowKeyInclusive, boolean highKeyInclusive, Object implConfig)
-					throws AlgebricksException {
-		boolean isSecondary = true;
-		if(dataset.getDatasetType() == DatasetType.EXTERNAL){
-			try {
-				int numPrimaryKeys = DatasetUtils.getExternalRIDSize(dataset);
-				RecordDescriptor outputRecDesc = JobGenHelper.mkRecordDescriptor(typeEnv, opSchema, context);
-				int numKeys = numPrimaryKeys;;
-				ITypeTraits[] typeTraits = null;
-				int[] bloomFilterKeyFields;
-				Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
-						dataset.getDatasetName(), indexName);
-				int numSecondaryKeys = secondaryIndex.getKeyFieldNames().size();
-				numKeys += numSecondaryKeys;
-				int keysStartIndex = outputVars.size() - numKeys;
-				typeTraits = JobGenHelper.variablesToTypeTraits(outputVars, keysStartIndex, numKeys, typeEnv, context);
-				bloomFilterKeyFields = new int[numSecondaryKeys];
-				for (int i = 0; i < numSecondaryKeys; i++) {
-					bloomFilterKeyFields[i] = i;
-				}
-				IBinaryComparatorFactory[] comparatorFactories = JobGenHelper.variablesToAscBinaryComparatorFactories(
-						outputVars, keysStartIndex, numKeys, typeEnv, context);
-				IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
-				Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc;
-				try {
-					spPc = splitProviderAndPartitionConstraintsForExternalDataset(dataset.getDataverseName(),
-							dataset.getDatasetName(), indexName);
-				} catch (Exception e) {
-					throw new AlgebricksException(e);
-				}
-				ISearchOperationCallbackFactory searchCallbackFactory = null;
-				searchCallbackFactory = new SecondaryIndexSearchOperationCallbackFactory();
-				AsterixRuntimeComponentsProvider rtcProvider = AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER;
-				BTreeSearchOperatorDescriptor btreeSearchOp = new BTreeSearchOperatorDescriptor(jobSpec, outputRecDesc,
-	                    appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(), spPc.first,
-	                    typeTraits, comparatorFactories, bloomFilterKeyFields, lowKeyFields, highKeyFields,
-	                    lowKeyInclusive, highKeyInclusive, new LSMBTreeDataflowHelperFactory(
-	                            new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), rtcProvider,
-	                            isSecondary ? new SecondaryIndexOperationTrackerProvider(
-	                                    LSMBTreeIOOperationCallbackFactory.INSTANCE, dataset.getDatasetId())
-	                                    : new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()), rtcProvider,
-	                            rtcProvider, storageProperties.getBloomFilterFalsePositiveRate()), retainInput,
-	                    searchCallbackFactory);
-				return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeSearchOp, spPc.second);
-			} catch (MetadataException me) {
-				throw new AlgebricksException(me);
-			}
-		}
-		else
-		{
-			try {
-				Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
-						dataset.getDatasetName(), dataset.getDatasetName());
-				if (primaryIndex != null) {
-					isSecondary = !indexName.equals(primaryIndex.getIndexName());
-				}
-				int numPrimaryKeys = DatasetUtils.getPartitioningKeys(dataset).size();
-				RecordDescriptor outputRecDesc = JobGenHelper.mkRecordDescriptor(typeEnv, opSchema, context);
-				int numKeys = numPrimaryKeys;
-				int keysStartIndex = outputRecDesc.getFieldCount() - numKeys - 1;
-				ITypeTraits[] typeTraits = null;
-				int[] bloomFilterKeyFields;
-				if (isSecondary) {
-					Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
-							dataset.getDatasetName(), indexName);
-					int numSecondaryKeys = secondaryIndex.getKeyFieldNames().size();
-					numKeys += numSecondaryKeys;
-					keysStartIndex = outputVars.size() - numKeys;
-					typeTraits = JobGenHelper.variablesToTypeTraits(outputVars, keysStartIndex, numKeys, typeEnv, context);
-					bloomFilterKeyFields = new int[numSecondaryKeys];
-					for (int i = 0; i < numSecondaryKeys; i++) {
-						bloomFilterKeyFields[i] = i;
-					}
-				} else {
-					typeTraits = JobGenHelper.variablesToTypeTraits(outputVars, keysStartIndex, numKeys + 1, typeEnv,
-							context);
-					bloomFilterKeyFields = new int[numPrimaryKeys];
-					for (int i = 0; i < numPrimaryKeys; i++) {
-						bloomFilterKeyFields[i] = i;
-					}
-				}
-				IBinaryComparatorFactory[] comparatorFactories = JobGenHelper.variablesToAscBinaryComparatorFactories(
-						outputVars, keysStartIndex, numKeys, typeEnv, context);
-
-				IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
-				Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc;
-				try {
-					spPc = splitProviderAndPartitionConstraintsForInternalOrFeedDataset(dataset.getDataverseName(),
-							dataset.getDatasetName(), indexName);
-				} catch (Exception e) {
-					throw new AlgebricksException(e);
-				}
-
-				ISearchOperationCallbackFactory searchCallbackFactory = null;
-				if (isSecondary) {
-					searchCallbackFactory = new SecondaryIndexSearchOperationCallbackFactory();
-				} else {
-					JobId jobId = ((JobEventListenerFactory) jobSpec.getJobletEventListenerFactory()).getJobId();
-					int datasetId = dataset.getDatasetId();
-					int[] primaryKeyFields = new int[numPrimaryKeys];
-					for (int i = 0; i < numPrimaryKeys; i++) {
-						primaryKeyFields[i] = i;
-					}
-
-                AqlMetadataImplConfig aqlMetadataImplConfig = (AqlMetadataImplConfig) implConfig;
-                ITransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider();
-                if (aqlMetadataImplConfig != null && aqlMetadataImplConfig.isInstantLock()) {
-                    searchCallbackFactory = new PrimaryIndexInstantSearchOperationCallbackFactory(jobId, datasetId,
-                            primaryKeyFields, txnSubsystemProvider, ResourceType.LSM_BTREE);
-                } else {
-                    searchCallbackFactory = new PrimaryIndexSearchOperationCallbackFactory(jobId, datasetId,
-                            primaryKeyFields, txnSubsystemProvider, ResourceType.LSM_BTREE);
+            List<LogicalVariable> outputVars, IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv,
+            JobGenContext context, boolean retainInput, Dataset dataset, String indexName, int[] lowKeyFields,
+            int[] highKeyFields, boolean lowKeyInclusive, boolean highKeyInclusive, Object implConfig)
+            throws AlgebricksException {
+        boolean isSecondary = true;
+        if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
+            try {
+                int numPrimaryKeys = DatasetUtils.getExternalRIDSize(dataset);
+                RecordDescriptor outputRecDesc = JobGenHelper.mkRecordDescriptor(typeEnv, opSchema, context);
+                int numKeys = numPrimaryKeys;;
+                ITypeTraits[] typeTraits = null;
+                int[] bloomFilterKeyFields;
+                Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
+                        dataset.getDatasetName(), indexName);
+                int numSecondaryKeys = secondaryIndex.getKeyFieldNames().size();
+                numKeys += numSecondaryKeys;
+                int keysStartIndex = outputVars.size() - numKeys;
+                typeTraits = JobGenHelper.variablesToTypeTraits(outputVars, keysStartIndex, numKeys, typeEnv, context);
+                bloomFilterKeyFields = new int[numSecondaryKeys];
+                for (int i = 0; i < numSecondaryKeys; i++) {
+                    bloomFilterKeyFields[i] = i;
                 }
+                IBinaryComparatorFactory[] comparatorFactories = JobGenHelper.variablesToAscBinaryComparatorFactories(
+                        outputVars, keysStartIndex, numKeys, typeEnv, context);
+                IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
+                Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc;
+                try {
+                    spPc = splitProviderAndPartitionConstraintsForExternalDataset(dataset.getDataverseName(),
+                            dataset.getDatasetName(), indexName);
+                } catch (Exception e) {
+                    throw new AlgebricksException(e);
+                }
+                ISearchOperationCallbackFactory searchCallbackFactory = null;
+                searchCallbackFactory = new SecondaryIndexSearchOperationCallbackFactory();
+                AsterixRuntimeComponentsProvider rtcProvider = AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER;
+                BTreeSearchOperatorDescriptor btreeSearchOp = new BTreeSearchOperatorDescriptor(jobSpec, outputRecDesc,
+                        appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(),
+                        spPc.first, typeTraits, comparatorFactories, bloomFilterKeyFields, lowKeyFields, highKeyFields,
+                        lowKeyInclusive, highKeyInclusive, new LSMBTreeDataflowHelperFactory(
+                                new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), rtcProvider,
+                                isSecondary ? new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId())
+                                        : new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+                                rtcProvider, LSMBTreeIOOperationCallbackFactory.INSTANCE,
+                                storageProperties.getBloomFilterFalsePositiveRate()), retainInput,
+                        searchCallbackFactory);
+                return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeSearchOp, spPc.second);
+            } catch (MetadataException me) {
+                throw new AlgebricksException(me);
             }
-            AsterixRuntimeComponentsProvider rtcProvider = AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER;
-            BTreeSearchOperatorDescriptor btreeSearchOp = new BTreeSearchOperatorDescriptor(jobSpec, outputRecDesc,
-                    appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(), spPc.first,
-                    typeTraits, comparatorFactories, bloomFilterKeyFields, lowKeyFields, highKeyFields,
-                    lowKeyInclusive, highKeyInclusive, new LSMBTreeDataflowHelperFactory(
-                            new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), rtcProvider,
-                            isSecondary ? new SecondaryIndexOperationTrackerProvider(
-                                    LSMBTreeIOOperationCallbackFactory.INSTANCE, dataset.getDatasetId())
-                                    : new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()), rtcProvider,
-                            rtcProvider, storageProperties.getBloomFilterFalsePositiveRate()), retainInput,
-                    searchCallbackFactory);
+        } else {
+            try {
+                Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
+                        dataset.getDatasetName(), dataset.getDatasetName());
+                if (primaryIndex != null) {
+                    isSecondary = !indexName.equals(primaryIndex.getIndexName());
+                }
+                int numPrimaryKeys = DatasetUtils.getPartitioningKeys(dataset).size();
+                RecordDescriptor outputRecDesc = JobGenHelper.mkRecordDescriptor(typeEnv, opSchema, context);
+                int numKeys = numPrimaryKeys;
+                int keysStartIndex = outputRecDesc.getFieldCount() - numKeys - 1;
+                ITypeTraits[] typeTraits = null;
+                int[] bloomFilterKeyFields;
+                if (isSecondary) {
+                    Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
+                            dataset.getDatasetName(), indexName);
+                    int numSecondaryKeys = secondaryIndex.getKeyFieldNames().size();
+                    numKeys += numSecondaryKeys;
+                    keysStartIndex = outputVars.size() - numKeys;
+                    typeTraits = JobGenHelper.variablesToTypeTraits(outputVars, keysStartIndex, numKeys, typeEnv,
+                            context);
+                    bloomFilterKeyFields = new int[numSecondaryKeys];
+                    for (int i = 0; i < numSecondaryKeys; i++) {
+                        bloomFilterKeyFields[i] = i;
+                    }
+                } else {
+                    typeTraits = JobGenHelper.variablesToTypeTraits(outputVars, keysStartIndex, numKeys + 1, typeEnv,
+                            context);
+                    bloomFilterKeyFields = new int[numPrimaryKeys];
+                    for (int i = 0; i < numPrimaryKeys; i++) {
+                        bloomFilterKeyFields[i] = i;
+                    }
+                }
+                IBinaryComparatorFactory[] comparatorFactories = JobGenHelper.variablesToAscBinaryComparatorFactories(
+                        outputVars, keysStartIndex, numKeys, typeEnv, context);
 
-				return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeSearchOp, spPc.second);
+                IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
+                Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc;
+                try {
+                    spPc = splitProviderAndPartitionConstraintsForInternalOrFeedDataset(dataset.getDataverseName(),
+                            dataset.getDatasetName(), indexName);
+                } catch (Exception e) {
+                    throw new AlgebricksException(e);
+                }
 
-			} catch (MetadataException me) {
-				throw new AlgebricksException(me);
-			}
-		}
-	}
-    
+                ISearchOperationCallbackFactory searchCallbackFactory = null;
+                if (isSecondary) {
+                    searchCallbackFactory = new SecondaryIndexSearchOperationCallbackFactory();
+                } else {
+                    JobId jobId = ((JobEventListenerFactory) jobSpec.getJobletEventListenerFactory()).getJobId();
+                    int datasetId = dataset.getDatasetId();
+                    int[] primaryKeyFields = new int[numPrimaryKeys];
+                    for (int i = 0; i < numPrimaryKeys; i++) {
+                        primaryKeyFields[i] = i;
+                    }
+
+                    AqlMetadataImplConfig aqlMetadataImplConfig = (AqlMetadataImplConfig) implConfig;
+                    ITransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider();
+                    if (aqlMetadataImplConfig != null && aqlMetadataImplConfig.isInstantLock()) {
+                        searchCallbackFactory = new PrimaryIndexInstantSearchOperationCallbackFactory(jobId, datasetId,
+                                primaryKeyFields, txnSubsystemProvider, ResourceType.LSM_BTREE);
+                    } else {
+                        searchCallbackFactory = new PrimaryIndexSearchOperationCallbackFactory(jobId, datasetId,
+                                primaryKeyFields, txnSubsystemProvider, ResourceType.LSM_BTREE);
+                    }
+                }
+                AsterixRuntimeComponentsProvider rtcProvider = AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER;
+                BTreeSearchOperatorDescriptor btreeSearchOp = new BTreeSearchOperatorDescriptor(jobSpec, outputRecDesc,
+                        appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(),
+                        spPc.first, typeTraits, comparatorFactories, bloomFilterKeyFields, lowKeyFields, highKeyFields,
+                        lowKeyInclusive, highKeyInclusive, new LSMBTreeDataflowHelperFactory(
+                                new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), rtcProvider,
+                                isSecondary ? new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId())
+                                        : new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+                                rtcProvider, LSMBTreeIOOperationCallbackFactory.INSTANCE,
+                                storageProperties.getBloomFilterFalsePositiveRate()), retainInput,
+                        searchCallbackFactory);
+
+                return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeSearchOp, spPc.second);
+
+            } catch (MetadataException me) {
+                throw new AlgebricksException(me);
+            }
+        }
+    }
+
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildRtreeRuntime(JobSpecification jobSpec,
             List<LogicalVariable> outputVars, IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv,
             JobGenContext context, boolean retainInput, Dataset dataset, String indexName, int[] keyFields)
@@ -923,11 +913,11 @@
                             valueProviderFactories, RTreePolicyType.RTREE, primaryComparatorFactories,
                             new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
                             AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                            new SecondaryIndexOperationTrackerProvider(LSMRTreeIOOperationCallbackFactory.INSTANCE,
-                                    dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                            AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, proposeLinearizer(
-                                    nestedKeyType.getTypeTag(), comparatorFactories.length),
-                            storageProperties.getBloomFilterFalsePositiveRate()), retainInput, searchCallbackFactory);
+                            new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+                            AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                            LSMRTreeIOOperationCallbackFactory.INSTANCE, proposeLinearizer(nestedKeyType.getTypeTag(),
+                                    comparatorFactories.length), storageProperties.getBloomFilterFalsePositiveRate()),
+                    retainInput, searchCallbackFactory);
             return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(rtreeSearchOp, spPc.second);
 
         } catch (MetadataException me) {
@@ -1083,7 +1073,7 @@
                             AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
                             new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
                             AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                            AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                            LSMBTreeIOOperationCallbackFactory.INSTANCE,
                             storageProperties.getBloomFilterFalsePositiveRate()), NoOpOperationCallbackFactory.INSTANCE);
             return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeBulkLoad,
                     splitsAndConstraint.second);
@@ -1151,7 +1141,7 @@
                             AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
                             new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
                             AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                            AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, storageProperties
+                            LSMBTreeIOOperationCallbackFactory.INSTANCE, storageProperties
                                     .getBloomFilterFalsePositiveRate()), null, modificationCallbackFactory, true);
 
             return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(insertDeleteOp,
@@ -1345,9 +1335,9 @@
                     comparatorFactories, bloomFilterKeyFields, fieldPermutation, indexOp,
                     new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(datasetId),
                             AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                            new SecondaryIndexOperationTrackerProvider(LSMBTreeIOOperationCallbackFactory.INSTANCE,
-                                    dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                            AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, storageProperties
+                            new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+                            AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                            LSMBTreeIOOperationCallbackFactory.INSTANCE, storageProperties
                                     .getBloomFilterFalsePositiveRate()), filterFactory, modificationCallbackFactory,
                     false);
             return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeBulkLoad,
@@ -1473,10 +1463,9 @@
                     invListsTypeTraits, invListComparatorFactories, tokenizerFactory, fieldPermutation, indexOp,
                     new LSMInvertedIndexDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(datasetId),
                             AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                            new SecondaryIndexOperationTrackerProvider(
-                                    LSMInvertedIndexIOOperationCallbackFactory.INSTANCE, dataset.getDatasetId()),
+                            new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
                             AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                            AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, storageProperties
+                            LSMInvertedIndexIOOperationCallbackFactory.INSTANCE, storageProperties
                                     .getBloomFilterFalsePositiveRate()), filterFactory, modificationCallbackFactory);
             return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(insertDeleteOp,
                     splitsAndConstraint.second);
@@ -1568,12 +1557,11 @@
                             valueProviderFactories, RTreePolicyType.RTREE, primaryComparatorFactories,
                             new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
                             AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                            new SecondaryIndexOperationTrackerProvider(LSMRTreeIOOperationCallbackFactory.INSTANCE,
-                                    dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                            AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, proposeLinearizer(
-                                    nestedKeyType.getTypeTag(), comparatorFactories.length),
-                            storageProperties.getBloomFilterFalsePositiveRate()), filterFactory,
-                    modificationCallbackFactory, false);
+                            new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+                            AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                            LSMRTreeIOOperationCallbackFactory.INSTANCE, proposeLinearizer(nestedKeyType.getTypeTag(),
+                                    comparatorFactories.length), storageProperties.getBloomFilterFalsePositiveRate()),
+                    filterFactory, modificationCallbackFactory, false);
             return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(rtreeUpdate, splitsAndConstraint.second);
         } catch (MetadataException | IOException e) {
             throw new AlgebricksException(e);
@@ -1622,18 +1610,15 @@
 
         int numPartitions = 0;
         List<String> nodeGroup = null;
-        if(dataset.getDatasetType() == DatasetType.EXTERNAL)
-		{
-			ExternalDatasetDetails datasetDetails = (ExternalDatasetDetails) dataset.getDatasetDetails();
-			nodeGroup = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, datasetDetails.getNodeGroupName())
-					.getNodeNames();
-		}
-		else
-		{
-			InternalDatasetDetails datasetDetails = (InternalDatasetDetails) dataset.getDatasetDetails();
-			nodeGroup = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, datasetDetails.getNodeGroupName())
-					.getNodeNames();
-		}
+        if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
+            ExternalDatasetDetails datasetDetails = (ExternalDatasetDetails) dataset.getDatasetDetails();
+            nodeGroup = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, datasetDetails.getNodeGroupName())
+                    .getNodeNames();
+        } else {
+            InternalDatasetDetails datasetDetails = (InternalDatasetDetails) dataset.getDatasetDetails();
+            nodeGroup = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, datasetDetails.getNodeGroupName())
+                    .getNodeNames();
+        }
 
         for (String nd : nodeGroup) {
             numPartitions += AsterixClusterProperties.INSTANCE.getNumberOfIODevices(nd);
@@ -1653,11 +1638,11 @@
     }
 
     public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndPartitionConstraintsForExternalDataset(
-			String dataverseName, String datasetName, String targetIdxName) throws AlgebricksException {
-		FileSplit[] splits = splitsForExternalDataset(mdTxnCtx, dataverseName, datasetName, targetIdxName);
-		return splitProviderAndPartitionConstraints(splits);
-	}
-    
+            String dataverseName, String datasetName, String targetIdxName) throws AlgebricksException {
+        FileSplit[] splits = splitsForExternalDataset(mdTxnCtx, dataverseName, datasetName, targetIdxName);
+        return splitProviderAndPartitionConstraints(splits);
+    }
+
     public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndPartitionConstraintsForDataverse(
             String dataverse) {
         FileSplit[] splits = splitsForDataverse(mdTxnCtx, dataverse);
@@ -1749,55 +1734,55 @@
     }
 
     private FileSplit[] splitsForExternalDataset(MetadataTransactionContext mdTxnCtx, String dataverseName,
-			String datasetName, String targetIdxName) throws AlgebricksException {
+            String datasetName, String targetIdxName) throws AlgebricksException {
 
-		try {
-			File relPathFile = new File(getRelativePath(dataverseName, datasetName + "_idx_" + targetIdxName));
-			Dataset dataset = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
-			if (dataset.getDatasetType() != DatasetType.EXTERNAL) {
-				throw new AlgebricksException("Not an external dataset");
-			}
-			ExternalDatasetDetails datasetDetails = (ExternalDatasetDetails) dataset.getDatasetDetails();
-			List<String> nodeGroup = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, datasetDetails.getNodeGroupName())
-					.getNodeNames();
-			if (nodeGroup == null) {
-				throw new AlgebricksException("Couldn't find node group " + datasetDetails.getNodeGroupName());
-			}
+        try {
+            File relPathFile = new File(getRelativePath(dataverseName, datasetName + "_idx_" + targetIdxName));
+            Dataset dataset = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
+            if (dataset.getDatasetType() != DatasetType.EXTERNAL) {
+                throw new AlgebricksException("Not an external dataset");
+            }
+            ExternalDatasetDetails datasetDetails = (ExternalDatasetDetails) dataset.getDatasetDetails();
+            List<String> nodeGroup = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, datasetDetails.getNodeGroupName())
+                    .getNodeNames();
+            if (nodeGroup == null) {
+                throw new AlgebricksException("Couldn't find node group " + datasetDetails.getNodeGroupName());
+            }
 
-			List<FileSplit> splitArray = new ArrayList<FileSplit>();
-			for (String nd : nodeGroup) {
-				String[] nodeStores = stores.get(nd);
-				if (nodeStores == null) {
-					LOGGER.warning("Node " + nd + " has no stores.");
-					throw new AlgebricksException("Node " + nd + " has no stores.");
-				} else {
-					int numIODevices;
-					if (datasetDetails.getNodeGroupName().compareTo(MetadataConstants.METADATA_NODEGROUP_NAME) == 0) {
-						numIODevices = 1;
-					} else {
-						numIODevices = AsterixClusterProperties.INSTANCE.getNumberOfIODevices(nd);
-					}
-					String[] ioDevices = AsterixClusterProperties.INSTANCE.getIODevices(nd);
-					for (int j = 0; j < nodeStores.length; j++) {
-						for (int k = 0; k < numIODevices; k++) {
-							File f = new File(ioDevices[k] + File.separator + nodeStores[j] + File.separator
-									+ relPathFile);
-							splitArray.add(new FileSplit(nd, new FileReference(f), k));
-						}
-					}
-				}
-			}
-			FileSplit[] splits = new FileSplit[splitArray.size()];
-			int i = 0;
-			for (FileSplit fs : splitArray) {
-				splits[i++] = fs;
-			}
-			return splits;
-		} catch (MetadataException me) {
-			throw new AlgebricksException(me);
-		}
-	}
-    
+            List<FileSplit> splitArray = new ArrayList<FileSplit>();
+            for (String nd : nodeGroup) {
+                String[] nodeStores = stores.get(nd);
+                if (nodeStores == null) {
+                    LOGGER.warning("Node " + nd + " has no stores.");
+                    throw new AlgebricksException("Node " + nd + " has no stores.");
+                } else {
+                    int numIODevices;
+                    if (datasetDetails.getNodeGroupName().compareTo(MetadataConstants.METADATA_NODEGROUP_NAME) == 0) {
+                        numIODevices = 1;
+                    } else {
+                        numIODevices = AsterixClusterProperties.INSTANCE.getNumberOfIODevices(nd);
+                    }
+                    String[] ioDevices = AsterixClusterProperties.INSTANCE.getIODevices(nd);
+                    for (int j = 0; j < nodeStores.length; j++) {
+                        for (int k = 0; k < numIODevices; k++) {
+                            File f = new File(ioDevices[k] + File.separator + nodeStores[j] + File.separator
+                                    + relPathFile);
+                            splitArray.add(new FileSplit(nd, new FileReference(f), k));
+                        }
+                    }
+                }
+            }
+            FileSplit[] splits = new FileSplit[splitArray.size()];
+            int i = 0;
+            for (FileSplit fs : splitArray) {
+                splits[i++] = fs;
+            }
+            return splits;
+        } catch (MetadataException me) {
+            throw new AlgebricksException(me);
+        }
+    }
+
     private static Map<String, String> initializeAdapterFactoryMapping() {
         Map<String, String> adapterFactoryMapping = new HashMap<String, String>();
         adapterFactoryMapping.put("edu.uci.ics.asterix.external.dataset.adapter.NCFileSystemAdapter",
@@ -1811,7 +1796,7 @@
         adapterFactoryMapping.put("edu.uci.ics.asterix.external.dataset.adapter.CNNFeedAdapter",
                 "edu.uci.ics.asterix.external.dataset.adapter.CNNFeedAdapterFactory");
         adapterFactoryMapping.put("edu.uci.ics.asterix.external.dataset.adapter.HiveAdapter",
-				"edu.uci.ics.asterix.external.adapter.factory.HiveAdapterFactory");
+                "edu.uci.ics.asterix.external.adapter.factory.HiveAdapterFactory");
         return adapterFactoryMapping;
     }
 
@@ -1910,4 +1895,3 @@
     }
 
 }
-
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerProvider.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerProvider.java
index aec378b..140a8dd 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerProvider.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerProvider.java
@@ -18,7 +18,6 @@
 import edu.uci.ics.asterix.common.context.BaseOperationTracker;
 import edu.uci.ics.asterix.common.context.DatasetLifecycleManager;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerProvider;
 
@@ -27,10 +26,8 @@
     private static final long serialVersionUID = 1L;
 
     private final int datasetID;
-    private final ILSMIOOperationCallbackFactory ioOpCallbackFactory;
 
-    public SecondaryIndexOperationTrackerProvider(ILSMIOOperationCallbackFactory ioOpCallbackFactory, int datasetID) {
-        this.ioOpCallbackFactory = ioOpCallbackFactory;
+    public SecondaryIndexOperationTrackerProvider(int datasetID) {
         this.datasetID = datasetID;
     }
 
@@ -38,7 +35,7 @@
     public ILSMOperationTracker getOperationTracker(IHyracksTaskContext ctx) {
         DatasetLifecycleManager dslcManager = (DatasetLifecycleManager) ((IAsterixAppRuntimeContext) ctx
                 .getJobletContext().getApplicationContext().getApplicationObject()).getIndexLifecycleManager();
-        return new BaseOperationTracker(dslcManager, ioOpCallbackFactory, datasetID);
+        return new BaseOperationTracker(dslcManager, datasetID);
     }
 
 }
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMBTreeLocalResourceMetadata.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMBTreeLocalResourceMetadata.java
index d243dd2..cf60182 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMBTreeLocalResourceMetadata.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMBTreeLocalResourceMetadata.java
@@ -55,10 +55,11 @@
         LSMBTree lsmBTree = LSMBTreeUtils.createLSMTree(virtualBufferCaches, file, runtimeContextProvider
                 .getBufferCache(), runtimeContextProvider.getFileMapManager(), typeTraits, cmpFactories,
                 bloomFilterKeyFields, runtimeContextProvider.getBloomFilterFalsePositiveRate(), runtimeContextProvider
-                        .getLSMMergePolicy(), isPrimary ? runtimeContextProvider.getLSMBTreeOperationTracker(datasetID)
-                        : new BaseOperationTracker((DatasetLifecycleManager) runtimeContextProvider.getIndexLifecycleManager(),
-                                LSMBTreeIOOperationCallbackFactory.INSTANCE, datasetID), runtimeContextProvider
-                        .getLSMIOScheduler(), runtimeContextProvider.getLSMBTreeIOOperationCallbackProvider(isPrimary));
+                        .getLSMMergePolicy(),
+                isPrimary ? runtimeContextProvider.getLSMBTreeOperationTracker(datasetID) : new BaseOperationTracker(
+                        (DatasetLifecycleManager) runtimeContextProvider.getIndexLifecycleManager(), datasetID),
+                runtimeContextProvider.getLSMIOScheduler(), LSMBTreeIOOperationCallbackFactory.INSTANCE
+                        .createIOOperationCallback());
         return lsmBTree;
     }
 
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMInvertedIndexLocalResourceMetadata.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMInvertedIndexLocalResourceMetadata.java
index 8482172..5ed7019 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMInvertedIndexLocalResourceMetadata.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMInvertedIndexLocalResourceMetadata.java
@@ -59,25 +59,37 @@
         List<IVirtualBufferCache> virtualBufferCaches = runtimeContextProvider.getVirtualBufferCaches(datasetID);
         try {
             if (isPartitioned) {
-                return InvertedIndexUtils.createPartitionedLSMInvertedIndex(virtualBufferCaches, runtimeContextProvider
-                        .getFileMapManager(), invListTypeTraits, invListCmpFactories, tokenTypeTraits,
-                        tokenCmpFactories, tokenizerFactory, runtimeContextProvider.getBufferCache(), filePath,
-                        runtimeContextProvider.getBloomFilterFalsePositiveRate(), runtimeContextProvider
-                                .getLSMMergePolicy(), new BaseOperationTracker(
-                                (DatasetLifecycleManager) runtimeContextProvider.getIndexLifecycleManager(),
-                                LSMInvertedIndexIOOperationCallbackFactory.INSTANCE, datasetID), runtimeContextProvider
-                                .getLSMIOScheduler(), runtimeContextProvider
-                                .getLSMInvertedIndexIOOperationCallbackProvider());
+                return InvertedIndexUtils.createPartitionedLSMInvertedIndex(
+                        virtualBufferCaches,
+                        runtimeContextProvider.getFileMapManager(),
+                        invListTypeTraits,
+                        invListCmpFactories,
+                        tokenTypeTraits,
+                        tokenCmpFactories,
+                        tokenizerFactory,
+                        runtimeContextProvider.getBufferCache(),
+                        filePath,
+                        runtimeContextProvider.getBloomFilterFalsePositiveRate(),
+                        runtimeContextProvider.getLSMMergePolicy(),
+                        new BaseOperationTracker((DatasetLifecycleManager) runtimeContextProvider
+                                .getIndexLifecycleManager(), datasetID), runtimeContextProvider.getLSMIOScheduler(),
+                        LSMInvertedIndexIOOperationCallbackFactory.INSTANCE.createIOOperationCallback());
             } else {
-                return InvertedIndexUtils.createLSMInvertedIndex(virtualBufferCaches, runtimeContextProvider
-                        .getFileMapManager(), invListTypeTraits, invListCmpFactories, tokenTypeTraits,
-                        tokenCmpFactories, tokenizerFactory, runtimeContextProvider.getBufferCache(), filePath,
-                        runtimeContextProvider.getBloomFilterFalsePositiveRate(), runtimeContextProvider
-                                .getLSMMergePolicy(), new BaseOperationTracker(
-                                (DatasetLifecycleManager) runtimeContextProvider.getIndexLifecycleManager(),
-                                LSMInvertedIndexIOOperationCallbackFactory.INSTANCE, datasetID), runtimeContextProvider
-                                .getLSMIOScheduler(), runtimeContextProvider
-                                .getLSMInvertedIndexIOOperationCallbackProvider());
+                return InvertedIndexUtils.createLSMInvertedIndex(
+                        virtualBufferCaches,
+                        runtimeContextProvider.getFileMapManager(),
+                        invListTypeTraits,
+                        invListCmpFactories,
+                        tokenTypeTraits,
+                        tokenCmpFactories,
+                        tokenizerFactory,
+                        runtimeContextProvider.getBufferCache(),
+                        filePath,
+                        runtimeContextProvider.getBloomFilterFalsePositiveRate(),
+                        runtimeContextProvider.getLSMMergePolicy(),
+                        new BaseOperationTracker((DatasetLifecycleManager) runtimeContextProvider
+                                .getIndexLifecycleManager(), datasetID), runtimeContextProvider.getLSMIOScheduler(),
+                        LSMInvertedIndexIOOperationCallbackFactory.INSTANCE.createIOOperationCallback());
             }
         } catch (IndexException e) {
             throw new HyracksDataException(e);
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMRTreeLocalResourceMetadata.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMRTreeLocalResourceMetadata.java
index bc1e889..dc6b30b 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMRTreeLocalResourceMetadata.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMRTreeLocalResourceMetadata.java
@@ -66,10 +66,9 @@
                     runtimeContextProvider.getFileMapManager(), typeTraits, rtreeCmpFactories, btreeCmpFactories,
                     valueProviderFactories, rtreePolicyType, runtimeContextProvider.getBloomFilterFalsePositiveRate(),
                     runtimeContextProvider.getLSMMergePolicy(), new BaseOperationTracker(
-                            (DatasetLifecycleManager) runtimeContextProvider.getIndexLifecycleManager(),
-                            LSMRTreeIOOperationCallbackFactory.INSTANCE, datasetID), runtimeContextProvider
-                            .getLSMIOScheduler(), runtimeContextProvider.getLSMRTreeIOOperationCallbackProvider(),
-                    linearizeCmpFactory);
+                            (DatasetLifecycleManager) runtimeContextProvider.getIndexLifecycleManager(), datasetID),
+                    runtimeContextProvider.getLSMIOScheduler(), LSMRTreeIOOperationCallbackFactory.INSTANCE
+                            .createIOOperationCallback(), linearizeCmpFactory);
         } catch (TreeIndexException e) {
             throw new HyracksDataException(e);
         }
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/CheckpointThread.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/CheckpointThread.java
index 6f6da4a..dca14d8 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/CheckpointThread.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/CheckpointThread.java
@@ -16,8 +16,8 @@
 
 import java.util.List;
 
-import edu.uci.ics.asterix.common.context.BaseOperationTracker;
 import edu.uci.ics.asterix.common.exceptions.ACIDException;
+import edu.uci.ics.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
 import edu.uci.ics.asterix.common.transactions.IRecoveryManager;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndex;
@@ -70,7 +70,8 @@
         long firstLSN;
         if (openIndexList.size() > 0) {
             for (IIndex index : openIndexList) {
-                firstLSN = ((BaseOperationTracker) ((ILSMIndex) index).getOperationTracker()).getFirstLSN();
+                firstLSN = ((AbstractLSMIOOperationCallback) ((ILSMIndex) index).getIOOperationCallback())
+                        .getFirstLSN();
                 minMCTFirstLSN = Math.min(minMCTFirstLSN, firstLSN);
             }
         } else {
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
index 2ad3055..d0e9348 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
@@ -38,7 +38,6 @@
 import java.util.logging.Logger;
 
 import edu.uci.ics.asterix.common.api.ILocalResourceMetadata;
-import edu.uci.ics.asterix.common.context.BaseOperationTracker;
 import edu.uci.ics.asterix.common.exceptions.ACIDException;
 import edu.uci.ics.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
 import edu.uci.ics.asterix.common.transactions.IAsterixAppRuntimeContextProvider;
@@ -297,10 +296,8 @@
 
                             //#. get maxDiskLastLSN
                             ILSMIndex lsmIndex = (ILSMIndex) index;
-                            BaseOperationTracker indexOpTracker = (BaseOperationTracker) lsmIndex.getOperationTracker();
-                            AbstractLSMIOOperationCallback abstractLSMIOCallback = (AbstractLSMIOOperationCallback) indexOpTracker
-                                    .getIOOperationCallback();
-                            maxDiskLastLsn = abstractLSMIOCallback.getComponentLSN(index.getImmutableComponents());
+                            maxDiskLastLsn = ((AbstractLSMIOOperationCallback) lsmIndex.getIOOperationCallback())
+                                    .getComponentLSN(lsmIndex.getImmutableComponents());
 
                             //#. set resourceId and maxDiskLastLSN to the map
                             resourceId2MaxLSNMap.put(Long.valueOf(resourceId), Long.valueOf(maxDiskLastLsn));
@@ -375,9 +372,8 @@
                 ILSMIndex lsmIndex = (ILSMIndex) index;
                 ILSMIndexAccessor indexAccessor = lsmIndex.createAccessor(NoOpOperationCallback.INSTANCE,
                         NoOpOperationCallback.INSTANCE);
-                BaseOperationTracker indexOpTracker = (BaseOperationTracker) lsmIndex.getOperationTracker();
                 BlockingIOOperationCallbackWrapper cb = new BlockingIOOperationCallbackWrapper(
-                        indexOpTracker.getIOOperationCallback());
+                        lsmIndex.getIOOperationCallback());
                 callbackList.add(cb);
                 try {
                     indexAccessor.scheduleFlush(cb);
@@ -399,7 +395,8 @@
             minMCTFirstLSN = Long.MAX_VALUE;
             if (openIndexList.size() > 0) {
                 for (IIndex index : openIndexList) {
-                    firstLSN = ((BaseOperationTracker) ((ILSMIndex) index).getOperationTracker()).getFirstLSN();
+                    firstLSN = ((AbstractLSMIOOperationCallback) ((ILSMIndex) index).getIOOperationCallback())
+                            .getFirstLSN();
                     minMCTFirstLSN = Math.min(minMCTFirstLSN, firstLSN);
                 }
             } else {
@@ -614,8 +611,8 @@
                     loserTxnTable.remove(tempKeyTxnId);
                     if (IS_DEBUG_MODE) {
                         entityCommitLogCount++;
-                        System.out.println("" + Thread.currentThread().getId() + "======> entity_commit[" + currentLSN + "]"
-                                + tempKeyTxnId);
+                        System.out.println("" + Thread.currentThread().getId() + "======> entity_commit[" + currentLSN
+                                + "]" + tempKeyTxnId);
                     }
                     break;
 
@@ -721,7 +718,7 @@
             } else {
                 throw new IllegalStateException("Unsupported OperationType: " + logRecord.getNewOp());
             }
-            ((BaseOperationTracker) index.getOperationTracker()).updateLastLSN(logRecord.getLSN());
+            ((AbstractLSMIOOperationCallback) index.getIOOperationCallback()).updateLastLSN(logRecord.getLSN());
         } catch (Exception e) {
             throw new IllegalStateException("Failed to redo", e);
         }
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/AsterixRuntimeComponentsProvider.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/AsterixRuntimeComponentsProvider.java
index 59a8363..5d4806d 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/AsterixRuntimeComponentsProvider.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/AsterixRuntimeComponentsProvider.java
@@ -15,15 +15,11 @@
 package edu.uci.ics.asterix.transaction.management.service.transaction;
 
 import edu.uci.ics.asterix.common.api.IAsterixAppRuntimeContext;
-import edu.uci.ics.asterix.common.context.BaseOperationTracker;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManager;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackProvider;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationSchedulerProvider;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyProvider;
 import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
@@ -33,18 +29,12 @@
 import edu.uci.ics.hyracks.storage.common.file.ResourceIdFactory;
 
 public class AsterixRuntimeComponentsProvider implements IIndexLifecycleManagerProvider, IStorageManagerInterface,
-        ILSMIOOperationSchedulerProvider, ILSMMergePolicyProvider,
-        ILSMIOOperationCallbackProvider {
+        ILSMIOOperationSchedulerProvider, ILSMMergePolicyProvider {
     private static final long serialVersionUID = 1L;
 
     public static final AsterixRuntimeComponentsProvider RUNTIME_PROVIDER = new AsterixRuntimeComponentsProvider();
-    
-    private AsterixRuntimeComponentsProvider() {
-    }
 
-    @Override
-    public ILSMIOOperationCallback getIOOperationCallback(ILSMIndex index) {
-        return ((BaseOperationTracker) index.getOperationTracker()).getIOOperationCallback();
+    private AsterixRuntimeComponentsProvider() {
     }
 
     @Override
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
index 678956b..c54cb7f 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
@@ -21,9 +21,9 @@
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
-import edu.uci.ics.asterix.common.context.BaseOperationTracker;
 import edu.uci.ics.asterix.common.context.PrimaryIndexOperationTracker;
 import edu.uci.ics.asterix.common.exceptions.ACIDException;
+import edu.uci.ics.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
 import edu.uci.ics.asterix.common.transactions.AbstractOperationCallback;
 import edu.uci.ics.asterix.common.transactions.ITransactionContext;
 import edu.uci.ics.asterix.common.transactions.ITransactionManager;
@@ -71,7 +71,7 @@
 
     //indexMap is concurrently accessed by multiple threads, 
     //so those threads are synchronized on indexMap object itself
-    private Map<MutableLong, BaseOperationTracker> indexMap;
+    private Map<MutableLong, AbstractLSMIOOperationCallback> indexMap;
 
     //TODO: fix ComponentLSNs' issues. 
     //primaryIndex, primaryIndexCallback, and primaryIndexOptracker will be modified accordingly
@@ -97,7 +97,7 @@
         isTimeout = false;
         isWriteTxn = new AtomicBoolean(false);
         isMetadataTxn = false;
-        indexMap = new HashMap<MutableLong, BaseOperationTracker>();
+        indexMap = new HashMap<MutableLong, AbstractLSMIOOperationCallback>();
         primaryIndex = null;
         tempResourceIdForRegister = new MutableLong();
         tempResourceIdForSetLSN = new MutableLong();
@@ -114,7 +114,8 @@
             }
             tempResourceIdForRegister.set(resourceId);
             if (!indexMap.containsKey(tempResourceIdForRegister)) {
-                indexMap.put(new MutableLong(resourceId), ((BaseOperationTracker) index.getOperationTracker()));
+                indexMap.put(new MutableLong(resourceId),
+                        ((AbstractLSMIOOperationCallback) index.getIOOperationCallback()));
             }
         }
     }
@@ -122,16 +123,14 @@
     //[Notice] 
     //This method is called sequentially by the LogAppender threads. 
     //However, the indexMap is concurrently read and modified through this method and registerIndexAndCallback()
-    //TODO: fix issues - 591, 609, 612, and 614.
     @Override
     public void setLastLSN(long resourceId, long LSN) {
         synchronized (indexMap) {
             firstLSN.compareAndSet(-1, LSN);
             lastLSN.set(Math.max(lastLSN.get(), LSN));
             tempResourceIdForSetLSN.set(resourceId);
-            //TODO; create version number tracker and keep LSNs there. 
-            BaseOperationTracker opTracker = indexMap.get(tempResourceIdForSetLSN);
-            opTracker.updateLastLSN(LSN);
+            AbstractLSMIOOperationCallback ioOpCallback = indexMap.get(tempResourceIdForSetLSN);
+            ioOpCallback.updateLastLSN(LSN);
         }
     }