Added index creator class hierarchy for better code sharing. Still needs some cleaning.

git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix-fix-issue-9@197 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/IndexOperations.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/IndexOperations.java
index c0053fb..39b9a3a 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/IndexOperations.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/IndexOperations.java
@@ -1,69 +1,25 @@
 package edu.uci.ics.asterix.file;
 
-import java.io.DataOutput;
-import java.util.List;
 import java.util.logging.Logger;
 
 import edu.uci.ics.asterix.aql.translator.DdlTranslator.CompiledIndexDropStatement;
-import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
 import edu.uci.ics.asterix.common.config.OptimizationConfUtil;
 import edu.uci.ics.asterix.common.context.AsterixStorageManagerInterface;
 import edu.uci.ics.asterix.common.context.AsterixTreeRegistryProvider;
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.dataflow.data.nontagged.valueproviders.AqlPrimitiveValueProviderFactory;
-import edu.uci.ics.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
-import edu.uci.ics.asterix.formats.nontagged.AqlBinaryTokenizerFactoryProvider;
-import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import edu.uci.ics.asterix.formats.nontagged.AqlTypeTraitProvider;
 import edu.uci.ics.asterix.metadata.MetadataException;
-import edu.uci.ics.asterix.metadata.declared.AqlCompiledDatasetDecl;
-import edu.uci.ics.asterix.metadata.declared.AqlCompiledIndexDecl;
 import edu.uci.ics.asterix.metadata.declared.AqlCompiledMetadataDeclarations;
-import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
-import edu.uci.ics.asterix.metadata.utils.DatasetUtils;
-import edu.uci.ics.asterix.om.types.ARecordType;
-import edu.uci.ics.asterix.om.types.IAType;
-import edu.uci.ics.asterix.om.util.NonTaggedFormatUtil;
 import edu.uci.ics.asterix.translator.DmlTranslator.CompiledCreateIndexStatement;
-import edu.uci.ics.hyracks.algebricks.core.algebra.data.ISerializerDeserializerProvider;
-import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
-import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.base.IEvaluatorFactory;
-import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.base.IPushRuntimeFactory;
-import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.jobgen.impl.ConnectorPolicyAssignmentPolicy;
-import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
-import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.operators.std.AssignRuntimeFactory;
 import edu.uci.ics.hyracks.algebricks.core.api.constraints.AlgebricksPartitionConstraint;
 import edu.uci.ics.hyracks.algebricks.core.api.constraints.AlgebricksPartitionConstraintHelper;
 import edu.uci.ics.hyracks.algebricks.core.api.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
 import edu.uci.ics.hyracks.algebricks.core.utils.Pair;
-import edu.uci.ics.hyracks.algebricks.core.utils.Triple;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
-import edu.uci.ics.hyracks.dataflow.std.misc.ConstantTupleSourceOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
-import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDropOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
-import edu.uci.ics.hyracks.storage.am.invertedindex.dataflow.BinaryTokenizerOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.invertedindex.tokenizers.IBinaryTokenizerFactory;
-import edu.uci.ics.hyracks.storage.am.rtree.dataflow.RTreeDataflowHelperFactory;
 import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
 
 public class IndexOperations {
@@ -74,31 +30,9 @@
     private static final Logger LOGGER = Logger.getLogger(IndexOperations.class.getName());
 
     public static JobSpecification buildCreateIndexJobSpec(CompiledCreateIndexStatement createIndexStmt,
-            AqlCompiledMetadataDeclarations datasetDecls) throws AsterixException, AlgebricksException {
-
-        switch (createIndexStmt.getIndexType()) {
-            case BTREE: {
-                return createBtreeIndexJobSpec(createIndexStmt, datasetDecls);
-            }
-
-            case RTREE: {
-                return createRtreeIndexJobSpec(createIndexStmt, datasetDecls);
-            }
-
-            case KEYWORD: {
-                return createKeywordIndexJobSpec(createIndexStmt, datasetDecls);
-            }
-
-            case QGRAM: {
-                // return createQgramIndexJobSpec(createIndexStmt,
-                // datasetDecls);
-            }
-
-            default: {
-                throw new AsterixException("Unknown Index Type: " + createIndexStmt.getIndexType());
-            }
-
-        }
+            AqlCompiledMetadataDeclarations metadata) throws AsterixException, AlgebricksException {
+        SecondaryIndexCreator secondaryIndexCreator = SecondaryIndexCreator.createIndexCreator(createIndexStmt.getIndexType(), physicalOptimizationConfig);
+        return secondaryIndexCreator.createJobSpec(createIndexStmt, metadata);
     }
 
     public static JobSpecification createSecondaryIndexDropJobSpec(CompiledIndexDropStatement deleteStmt,
@@ -121,595 +55,6 @@
         return spec;
     }
 
-	public static AbstractOperatorDescriptor createDummyKeyProviderOp(
-			JobSpecification spec) throws AsterixException, AlgebricksException {
-		// Build dummy tuple containing one field with a dummy value inside.
-		ArrayTupleBuilder tb = new ArrayTupleBuilder(1);
-		DataOutput dos = tb.getDataOutput();
-		tb.reset();
-		try {
-			// Serialize dummy value into a field.
-			IntegerSerializerDeserializer.INSTANCE.serialize(0, dos);
-		} catch (HyracksDataException e) {
-			throw new AsterixException(e);
-		}
-		// Add dummy field.
-		tb.addFieldEndOffset();
-		ISerializerDeserializer[] keyRecDescSers = { IntegerSerializerDeserializer.INSTANCE };
-		RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
-		ConstantTupleSourceOperatorDescriptor keyProviderOp = new ConstantTupleSourceOperatorDescriptor(
-				spec, keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(),
-				tb.getSize());
-		return keyProviderOp;
-	}
-    
-	public static BTreeSearchOperatorDescriptor createPrimaryIndexScanOp(
-			JobSpecification spec, AqlCompiledMetadataDeclarations metadata,
-			AqlCompiledDatasetDecl compiledDatasetDecl, ARecordType itemType,
-			ISerializerDeserializer payloadSerde,
-			IFileSplitProvider splitProvider) throws AlgebricksException,
-			MetadataException {
-		int numPrimaryKeys = DatasetUtils.getPartitioningFunctions(
-				compiledDatasetDecl).size();
-		ISerializerDeserializer[] primaryRecFields = new ISerializerDeserializer[numPrimaryKeys + 1];		
-		ITypeTraits[] primaryTypeTraits = new ITypeTraits[numPrimaryKeys + 1];
-		IBinaryComparatorFactory[] primaryComparatorFactories = new IBinaryComparatorFactory[numPrimaryKeys];		
-		ISerializerDeserializerProvider serdeProvider = metadata.getFormat()
-				.getSerdeProvider();
-		List<Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType>> partitioningFunctions = DatasetUtils
-				.getPartitioningFunctions(compiledDatasetDecl);
-		int i = 0;
-		for (Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType> evalFactoryAndType : partitioningFunctions) {
-			IAType keyType = evalFactoryAndType.third;
-			ISerializerDeserializer keySerde = serdeProvider
-					.getSerializerDeserializer(keyType);
-			primaryRecFields[i] = keySerde;
-			primaryComparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE
-					.getBinaryComparatorFactory(keyType, OrderKind.ASC);
-			primaryTypeTraits[i] = AqlTypeTraitProvider.INSTANCE
-					.getTypeTrait(keyType);
-			++i;
-		}
-		primaryRecFields[numPrimaryKeys] = payloadSerde;
-		primaryTypeTraits[numPrimaryKeys] = AqlTypeTraitProvider.INSTANCE
-				.getTypeTrait(itemType);
-
-		// -Infinity
-		int[] lowKeyFields = null;
-		// +Infinity
-		int[] highKeyFields = null;
-		RecordDescriptor primaryRecDesc = new RecordDescriptor(primaryRecFields, primaryTypeTraits);
-
-		BTreeSearchOperatorDescriptor primarySearchOp = new BTreeSearchOperatorDescriptor(
-				spec, primaryRecDesc, AsterixStorageManagerInterface.INSTANCE,
-				AsterixTreeRegistryProvider.INSTANCE, splitProvider,
-				primaryTypeTraits, primaryComparatorFactories, lowKeyFields,
-				highKeyFields, true, true, new BTreeDataflowHelperFactory(),
-				NoOpOperationCallbackProvider.INSTANCE);
-		return primarySearchOp;
-	}
-    
-    
-    @SuppressWarnings("unchecked")
-    public static JobSpecification createBtreeIndexJobSpec(CompiledCreateIndexStatement createIndexStmt,
-            AqlCompiledMetadataDeclarations metadata) throws AsterixException, AlgebricksException {
-        JobSpecification spec = new JobSpecification();
-        String datasetName = createIndexStmt.getDatasetName();
-        String secondaryIndexName = createIndexStmt.getIndexName();
-        AqlCompiledDatasetDecl compiledDatasetDecl = metadata.findDataset(datasetName);
-        if (compiledDatasetDecl == null) {
-            throw new AlgebricksException("Unknown dataset " + datasetName);
-        }
-        if (compiledDatasetDecl.getDatasetType() == DatasetType.EXTERNAL) {
-            throw new AsterixException("Cannot index an external dataset (" + datasetName + ").");
-        }
-        ARecordType itemType = (ARecordType) metadata.findType(compiledDatasetDecl.getItemTypeName());
-        ISerializerDeserializer payloadSerde = AqlSerializerDeserializerProvider.INSTANCE
-                .getSerializerDeserializer(itemType);
-        int numPrimaryKeys = DatasetUtils.getPartitioningFunctions(compiledDatasetDecl).size();
-
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndConstraint = metadata
-                .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, datasetName);
-        
-        // Create dummy key provider for feeding the primary index scan. 
-		AbstractOperatorDescriptor keyProviderOp = createDummyKeyProviderOp(spec);
-		AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(
-				spec, keyProviderOp, splitProviderAndConstraint.second);
-        
-        // Create primary index scan op.
-		BTreeSearchOperatorDescriptor primaryScanOp = createPrimaryIndexScanOp(
-				spec, metadata, compiledDatasetDecl, itemType, payloadSerde,
-				splitProviderAndConstraint.first);
-		AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(
-				spec, keyProviderOp, splitProviderAndConstraint.second);
-		
-        // ---------- START ASSIGN OP
-
-        List<String> secondaryKeyFields = createIndexStmt.getKeyFields();
-        int numSecondaryKeys = secondaryKeyFields.size();
-        ISerializerDeserializer[] secondaryRecFields = new ISerializerDeserializer[numPrimaryKeys + numSecondaryKeys];
-        IEvaluatorFactory[] evalFactories = new IEvaluatorFactory[numSecondaryKeys];
-        IBinaryComparatorFactory[] secondaryComparatorFactories = new IBinaryComparatorFactory[numSecondaryKeys
-                + numPrimaryKeys];
-        ITypeTraits[] secondaryTypeTraits = new ITypeTraits[numSecondaryKeys + numPrimaryKeys];
-        ISerializerDeserializerProvider serdeProvider = metadata.getFormat()
-                .getSerdeProvider();
-        for (int i = 0; i < numSecondaryKeys; i++) {
-            evalFactories[i] = metadata.getFormat().getFieldAccessEvaluatorFactory(itemType, secondaryKeyFields.get(i),
-                    numPrimaryKeys);
-            IAType keyType = AqlCompiledIndexDecl.keyFieldType(secondaryKeyFields.get(i), itemType);
-            ISerializerDeserializer keySerde = serdeProvider.getSerializerDeserializer(keyType);
-            secondaryRecFields[i] = keySerde;
-            secondaryComparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
-                    keyType, OrderKind.ASC);
-            secondaryTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
-        }
-        // Fill in serializers and comparators for primary index fields.
-        RecordDescriptor primaryRecDesc = primaryScanOp.getRecordDescriptor();
-        IBinaryComparatorFactory[] primaryComparatorFactories = primaryScanOp.getTreeIndexComparatorFactories();
-        for (int i = 0; i < numPrimaryKeys; i++) {
-            secondaryRecFields[numSecondaryKeys + i] = primaryRecDesc.getFields()[i];
-            secondaryTypeTraits[numSecondaryKeys + i] = primaryRecDesc.getTypeTraits()[i];
-            secondaryComparatorFactories[numSecondaryKeys + i] = primaryComparatorFactories[i];
-        }
-        RecordDescriptor secondaryRecDesc = new RecordDescriptor(secondaryRecFields);
-
-        int[] outColumns = new int[numSecondaryKeys];
-        int[] projectionList = new int[numSecondaryKeys + numPrimaryKeys];
-        for (int i = 0; i < numSecondaryKeys; i++) {
-            outColumns[i] = numPrimaryKeys + i + 1;
-        }
-        int projCount = 0;
-        for (int i = 0; i < numSecondaryKeys; i++) {
-            projectionList[projCount++] = numPrimaryKeys + i + 1;
-        }
-        for (int i = 0; i < numPrimaryKeys; i++) {
-            projectionList[projCount++] = i;
-        }
-
-        AssignRuntimeFactory assign = new AssignRuntimeFactory(outColumns, evalFactories, projectionList);
-        AlgebricksMetaOperatorDescriptor asterixAssignOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 1,
-                new IPushRuntimeFactory[] { assign }, new RecordDescriptor[] { secondaryRecDesc });
-        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, asterixAssignOp,
-                splitProviderAndConstraint.second);
-
-        // ---------- END ASSIGN OP
-
-        // ---------- START EXTERNAL SORT OP
-
-        int[] sortFields = new int[numSecondaryKeys + numPrimaryKeys];
-        for (int i = 0; i < numSecondaryKeys + numPrimaryKeys; i++) {
-            sortFields[i] = i;
-        }
-        ExternalSortOperatorDescriptor sortOp = new ExternalSortOperatorDescriptor(spec,
-                physicalOptimizationConfig.getMaxFramesExternalSort(), sortFields, secondaryComparatorFactories,
-                secondaryRecDesc);
-        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, sortOp,
-                splitProviderAndConstraint.second);
-
-        // ---------- END EXTERNAL SORT OP
-
-        // ---------- START SECONDARY INDEX BULK LOAD
-
-        int[] fieldPermutation = new int[numSecondaryKeys + numPrimaryKeys];
-        for (int i = 0; i < numSecondaryKeys + numPrimaryKeys; i++) {
-            fieldPermutation[i] = i;
-        }
-
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint = metadata
-                .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, secondaryIndexName);
-
-        // GlobalConfig.DEFAULT_BTREE_FILL_FACTOR
-		TreeIndexBulkLoadOperatorDescriptor secondaryBulkLoadOp = new TreeIndexBulkLoadOperatorDescriptor(
-				spec, AsterixStorageManagerInterface.INSTANCE, AsterixTreeRegistryProvider.INSTANCE,
-				secondarySplitsAndConstraint.first, secondaryTypeTraits,
-				secondaryComparatorFactories, fieldPermutation, 0.7f,
-				new BTreeDataflowHelperFactory(),
-				NoOpOperationCallbackProvider.INSTANCE);
-        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, secondaryBulkLoadOp,
-                secondarySplitsAndConstraint.second);
-
-        // ---------- END SECONDARY INDEX BULK LOAD
-
-        // Connect the operators.
-        spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primaryScanOp, 0);
-        spec.connect(new OneToOneConnectorDescriptor(spec), primaryScanOp, 0, asterixAssignOp, 0);
-        spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, sortOp, 0);
-        spec.connect(new OneToOneConnectorDescriptor(spec), sortOp, 0, secondaryBulkLoadOp, 0);
-        spec.addRoot(secondaryBulkLoadOp);
-        spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
-        return spec;
-
-    }
-
-    @SuppressWarnings("unchecked")
-    public static JobSpecification createRtreeIndexJobSpec(CompiledCreateIndexStatement createIndexStmt,
-            AqlCompiledMetadataDeclarations metadata) throws AsterixException, AlgebricksException {
-        JobSpecification spec = new JobSpecification();
-        String datasetName = createIndexStmt.getDatasetName();
-        String secondaryIndexName = createIndexStmt.getIndexName();
-        AqlCompiledDatasetDecl compiledDatasetDecl = metadata.findDataset(datasetName);
-        if (compiledDatasetDecl == null) {
-            throw new AlgebricksException("Unknown dataset " + datasetName);
-        }
-        if (compiledDatasetDecl.getDatasetType() == DatasetType.EXTERNAL) {
-            throw new AsterixException("Cannot index an external dataset (" + datasetName + ").");
-        }
-        ARecordType itemType = (ARecordType) metadata.findType(compiledDatasetDecl.getItemTypeName());
-        ISerializerDeserializer payloadSerde = AqlSerializerDeserializerProvider.INSTANCE
-                .getSerializerDeserializer(itemType);
-        int numPrimaryKeys = DatasetUtils.getPartitioningFunctions(compiledDatasetDecl).size();
-
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndConstraint = metadata
-                .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, datasetName);
-        
-        // Create dummy key provider for feeding the primary index scan. 
-        AbstractOperatorDescriptor keyProviderOp = createDummyKeyProviderOp(spec);
-        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(
-                spec, keyProviderOp, splitProviderAndConstraint.second);
-        
-        // Create primary index scan op.
-        BTreeSearchOperatorDescriptor primaryScanOp = createPrimaryIndexScanOp(
-                spec, metadata, compiledDatasetDecl, itemType, payloadSerde,
-                splitProviderAndConstraint.first);
-        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(
-                spec, keyProviderOp, splitProviderAndConstraint.second);
-
-        // ---------- START ASSIGN OP
-
-        List<String> secondaryKeyFields = createIndexStmt.getKeyFields();
-        int numSecondaryKeys = secondaryKeyFields.size();
-
-        if (numSecondaryKeys != 1) {
-            throw new AsterixException(
-                    "Cannot use "
-                            + numSecondaryKeys
-                            + " fields as a key for the R-tree index. There can be only one field as a key for the R-tree index.");
-        }
-
-        IAType spatialType = AqlCompiledIndexDecl.keyFieldType(secondaryKeyFields.get(0), itemType);
-        if (spatialType == null) {
-            throw new AsterixException("Could not find field " + secondaryKeyFields.get(0) + " in the schema.");
-        }
-
-        int dimension = NonTaggedFormatUtil.getNumDimensions(spatialType.getTypeTag());
-        int numNestedSecondaryKeyFields = dimension * 2;
-
-        IEvaluatorFactory[] evalFactories = metadata.getFormat().createMBRFactory(itemType, secondaryKeyFields.get(0),
-                numPrimaryKeys, dimension);
-
-        ISerializerDeserializer[] secondaryRecFields = new ISerializerDeserializer[numPrimaryKeys
-                + numNestedSecondaryKeyFields];
-        IBinaryComparatorFactory[] secondaryComparatorFactories = new IBinaryComparatorFactory[numNestedSecondaryKeyFields];
-        ITypeTraits[] secondaryTypeTraits = new ITypeTraits[numNestedSecondaryKeyFields + numPrimaryKeys];
-        IPrimitiveValueProviderFactory[] valueProviderFactories = new IPrimitiveValueProviderFactory[numNestedSecondaryKeyFields];
-
-        IAType keyType = AqlCompiledIndexDecl.keyFieldType(secondaryKeyFields.get(0), itemType);
-        IAType nestedKeyType = NonTaggedFormatUtil.getNestedSpatialType(keyType.getTypeTag());
-        for (int i = 0; i < numNestedSecondaryKeyFields; i++) {
-            ISerializerDeserializer keySerde = AqlSerializerDeserializerProvider.INSTANCE
-                    .getSerializerDeserializer(nestedKeyType);
-            secondaryRecFields[i] = keySerde;
-            secondaryComparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
-                    nestedKeyType, OrderKind.ASC);
-            secondaryTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(nestedKeyType);
-            valueProviderFactories[i] = AqlPrimitiveValueProviderFactory.INSTANCE;
-        }
-
-        // Fill in serializers and comparators for primary index fields.
-        RecordDescriptor primaryRecDesc = primaryScanOp.getRecordDescriptor();
-        IBinaryComparatorFactory[] primaryComparatorFactories = primaryScanOp.getTreeIndexComparatorFactories();
-        for (int i = 0; i < numPrimaryKeys; i++) {
-            secondaryRecFields[numNestedSecondaryKeyFields + i] = primaryRecDesc.getFields()[i];
-            secondaryTypeTraits[numNestedSecondaryKeyFields + i] = primaryRecDesc.getTypeTraits()[i];
-        }
-        RecordDescriptor secondaryRecDesc = new RecordDescriptor(secondaryRecFields);
-
-        int[] outColumns = new int[numNestedSecondaryKeyFields];
-        int[] projectionList = new int[numNestedSecondaryKeyFields + numPrimaryKeys];
-        for (int i = 0; i < numNestedSecondaryKeyFields; i++) {
-            outColumns[i] = numPrimaryKeys + i + 1;
-        }
-        int projCount = 0;
-        for (int i = 0; i < numNestedSecondaryKeyFields; i++) {
-            projectionList[projCount++] = numPrimaryKeys + i + 1;
-        }
-        for (int i = 0; i < numPrimaryKeys; i++) {
-            projectionList[projCount++] = i;
-        }
-
-        AssignRuntimeFactory assign = new AssignRuntimeFactory(outColumns, evalFactories, projectionList);
-        AlgebricksMetaOperatorDescriptor asterixAssignOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 1,
-                new IPushRuntimeFactory[] { assign }, new RecordDescriptor[] { secondaryRecDesc });
-
-        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, asterixAssignOp,
-                splitProviderAndConstraint.second);
-
-        // ---------- END ASSIGN OP
-
-        // ---------- START SECONDARY INDEX BULK LOAD
-
-        int[] fieldPermutation = new int[numNestedSecondaryKeyFields + numPrimaryKeys];
-        for (int i = 0; i < numNestedSecondaryKeyFields + numPrimaryKeys; i++) {
-            fieldPermutation[i] = i;
-        }
-
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint = metadata
-                .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, secondaryIndexName);
-
-		TreeIndexBulkLoadOperatorDescriptor secondaryBulkLoadOp = new TreeIndexBulkLoadOperatorDescriptor(
-				spec, AsterixStorageManagerInterface.INSTANCE, AsterixTreeRegistryProvider.INSTANCE,
-				secondarySplitsAndConstraint.first, secondaryTypeTraits,
-				secondaryComparatorFactories, fieldPermutation, 0.7f,
-				new RTreeDataflowHelperFactory(valueProviderFactories),
-				NoOpOperationCallbackProvider.INSTANCE);
-
-        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, secondaryBulkLoadOp,
-                secondarySplitsAndConstraint.second);
-
-        // ---------- END SECONDARY INDEX BULK LOAD
-
-        // Connect the operators.
-        spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primaryScanOp, 0);
-        spec.connect(new OneToOneConnectorDescriptor(spec), primaryScanOp, 0, asterixAssignOp, 0);
-        spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, secondaryBulkLoadOp, 0);
-        spec.addRoot(secondaryBulkLoadOp);
-        spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
-        return spec;
-
-    }
-
-    @SuppressWarnings("unchecked")
-    public static JobSpecification createKeywordIndexJobSpec(CompiledCreateIndexStatement createIndexStmt,
-            AqlCompiledMetadataDeclarations datasetDecls) throws AsterixException, AlgebricksException {
-
-        JobSpecification spec = new JobSpecification();
-
-        String primaryIndexName = createIndexStmt.getDatasetName();
-        String secondaryIndexName = createIndexStmt.getIndexName();
-
-        AqlCompiledDatasetDecl compiledDatasetDecl = datasetDecls.findDataset(primaryIndexName);
-        if (compiledDatasetDecl == null) {
-            throw new AsterixException("Could not find dataset " + primaryIndexName);
-        }
-
-        if (compiledDatasetDecl.getDatasetType() == DatasetType.EXTERNAL) {
-            throw new AsterixException("Cannot index an external dataset (" + primaryIndexName + ").");
-        }
-        ARecordType itemType = (ARecordType) datasetDecls.findType(compiledDatasetDecl.getItemTypeName());
-        ISerializerDeserializerProvider serdeProvider = datasetDecls.getFormat().getSerdeProvider();
-        ISerializerDeserializer payloadSerde = serdeProvider.getSerializerDeserializer(itemType);
-
-        int numPrimaryKeys = DatasetUtils.getPartitioningFunctions(compiledDatasetDecl).size();
-
-        // sanity
-        if (numPrimaryKeys > 1)
-            throw new AsterixException("Cannot create inverted keyword index on dataset with composite primary key.");
-
-        // sanity
-        IAType fieldsToTokenizeType = AqlCompiledIndexDecl
-                .keyFieldType(createIndexStmt.getKeyFields().get(0), itemType);
-        for (String fieldName : createIndexStmt.getKeyFields()) {
-            IAType nextFieldToTokenizeType = AqlCompiledIndexDecl.keyFieldType(fieldName, itemType);
-            if (nextFieldToTokenizeType.getTypeTag() != fieldsToTokenizeType.getTypeTag()) {
-                throw new AsterixException(
-                        "Cannot create inverted keyword index. Fields to tokenize must be of the same type.");
-            }
-        }
-
-        // ---------- START GENERAL BTREE STUFF
-
-        IIndexRegistryProvider<IIndex> treeRegistryProvider = AsterixTreeRegistryProvider.INSTANCE;
-        IStorageManagerInterface storageManager = AsterixStorageManagerInterface.INSTANCE;
-
-        // ---------- END GENERAL BTREE STUFF
-
-        // ---------- START KEY PROVIDER OP
-
-        // TODO: should actually be empty tuple source
-        // build tuple containing low and high search keys
-        ArrayTupleBuilder tb = new ArrayTupleBuilder(1); // just one dummy field
-        DataOutput dos = tb.getDataOutput();
-
-        try {
-            tb.reset();
-            IntegerSerializerDeserializer.INSTANCE.serialize(0, dos); // dummy
-            // field
-            tb.addFieldEndOffset();
-        } catch (HyracksDataException e) {
-            throw new AsterixException(e);
-        }
-
-        ISerializerDeserializer[] keyRecDescSers = { IntegerSerializerDeserializer.INSTANCE };
-        RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
-
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> keyProviderSplitsAndConstraint = datasetDecls
-                .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(primaryIndexName, primaryIndexName);
-
-        ConstantTupleSourceOperatorDescriptor keyProviderOp = new ConstantTupleSourceOperatorDescriptor(spec,
-                keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
-        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, keyProviderOp,
-                keyProviderSplitsAndConstraint.second);
-
-        // ---------- END KEY PROVIDER OP
-
-        // ---------- START PRIMARY INDEX SCAN
-
-        ISerializerDeserializer[] primaryRecFields = new ISerializerDeserializer[numPrimaryKeys + 1];
-        IBinaryComparatorFactory[] primaryComparatorFactories = new IBinaryComparatorFactory[numPrimaryKeys];
-        ITypeTraits[] primaryTypeTraits = new ITypeTraits[numPrimaryKeys + 1];
-        int i = 0;
-        for (Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType> evalFactoryAndType : DatasetUtils
-                .getPartitioningFunctions(compiledDatasetDecl)) {
-            IAType keyType = evalFactoryAndType.third;
-            ISerializerDeserializer keySerde = serdeProvider.getSerializerDeserializer(keyType);
-            primaryRecFields[i] = keySerde;
-            primaryComparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
-                    keyType, OrderKind.ASC);
-            primaryTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
-            ++i;
-        }
-        primaryRecFields[numPrimaryKeys] = payloadSerde;
-        primaryTypeTraits[numPrimaryKeys] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(itemType);
-
-        ITreeIndexFrameFactory primaryInteriorFrameFactory = AqlMetadataProvider
-                .createBTreeNSMInteriorFrameFactory(primaryTypeTraits);
-        ITreeIndexFrameFactory primaryLeafFrameFactory = AqlMetadataProvider
-                .createBTreeNSMLeafFrameFactory(primaryTypeTraits);
-
-        int[] lowKeyFields = null; // -infinity
-        int[] highKeyFields = null; // +infinity
-        RecordDescriptor primaryRecDesc = new RecordDescriptor(primaryRecFields);
-
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primarySplitsAndConstraint = datasetDecls
-                .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(primaryIndexName, primaryIndexName);
-
-        BTreeSearchOperatorDescriptor primarySearchOp = new BTreeSearchOperatorDescriptor(spec, primaryRecDesc,
-                storageManager, treeRegistryProvider, primarySplitsAndConstraint.first, primaryTypeTraits, primaryComparatorFactories, lowKeyFields,
-                highKeyFields, true, true, new BTreeDataflowHelperFactory(), NoOpOperationCallbackProvider.INSTANCE);
-        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, primarySearchOp,
-                primarySplitsAndConstraint.second);
-
-        // ---------- END PRIMARY INDEX SCAN
-
-        // ---------- START ASSIGN OP
-
-        List<String> secondaryKeyFields = createIndexStmt.getKeyFields();
-        int numSecondaryKeys = secondaryKeyFields.size();
-        ISerializerDeserializer[] secondaryRecFields = new ISerializerDeserializer[numPrimaryKeys + numSecondaryKeys];
-        IEvaluatorFactory[] evalFactories = new IEvaluatorFactory[numSecondaryKeys];
-        for (i = 0; i < numSecondaryKeys; i++) {
-            evalFactories[i] = datasetDecls.getFormat().getFieldAccessEvaluatorFactory(itemType,
-                    secondaryKeyFields.get(i), numPrimaryKeys);
-            IAType keyType = AqlCompiledIndexDecl.keyFieldType(secondaryKeyFields.get(i), itemType);
-            ISerializerDeserializer keySerde = serdeProvider.getSerializerDeserializer(keyType);
-            secondaryRecFields[i] = keySerde;
-        }
-        // fill in serializers and comparators for primary index fields
-        for (i = 0; i < numPrimaryKeys; i++) {
-            secondaryRecFields[numSecondaryKeys + i] = primaryRecFields[i];
-        }
-        RecordDescriptor secondaryRecDesc = new RecordDescriptor(secondaryRecFields);
-
-        int[] outColumns = new int[numSecondaryKeys];
-        int[] projectionList = new int[numSecondaryKeys + numPrimaryKeys];
-        for (i = 0; i < numSecondaryKeys; i++) {
-            outColumns[i] = numPrimaryKeys + i + 1;
-        }
-        int projCount = 0;
-        for (i = 0; i < numSecondaryKeys; i++) {
-            projectionList[projCount++] = numPrimaryKeys + i + 1;
-        }
-        for (i = 0; i < numPrimaryKeys; i++) {
-            projectionList[projCount++] = i;
-        }
-
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> assignSplitsAndConstraint = datasetDecls
-                .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(primaryIndexName, primaryIndexName);
-
-        AssignRuntimeFactory assign = new AssignRuntimeFactory(outColumns, evalFactories, projectionList);
-        AlgebricksMetaOperatorDescriptor asterixAssignOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 1,
-                new IPushRuntimeFactory[] { assign }, new RecordDescriptor[] { secondaryRecDesc });
-        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, asterixAssignOp,
-                assignSplitsAndConstraint.second);
-
-        // ---------- END ASSIGN OP
-
-        // ---------- START TOKENIZER OP
-
-        int numTokenKeyPairFields = numPrimaryKeys + 1;
-
-        ISerializerDeserializer[] tokenKeyPairFields = new ISerializerDeserializer[numTokenKeyPairFields];
-        tokenKeyPairFields[0] = serdeProvider.getSerializerDeserializer(fieldsToTokenizeType);
-        for (i = 0; i < numPrimaryKeys; i++)
-            tokenKeyPairFields[i + 1] = secondaryRecFields[numSecondaryKeys + i];
-        RecordDescriptor tokenKeyPairRecDesc = new RecordDescriptor(tokenKeyPairFields);
-
-        int[] fieldsToTokenize = new int[numSecondaryKeys];
-        for (i = 0; i < numSecondaryKeys; i++)
-            fieldsToTokenize[i] = i;
-
-        int[] primaryKeyFields = new int[numPrimaryKeys];
-        for (i = 0; i < numPrimaryKeys; i++)
-            primaryKeyFields[i] = numSecondaryKeys + i;
-
-        IBinaryTokenizerFactory tokenizerFactory = AqlBinaryTokenizerFactoryProvider.INSTANCE
-                .getTokenizerFactory(fieldsToTokenizeType);
-        BinaryTokenizerOperatorDescriptor tokenizerOp = new BinaryTokenizerOperatorDescriptor(spec,
-                tokenKeyPairRecDesc, tokenizerFactory, fieldsToTokenize, primaryKeyFields);
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint = datasetDecls
-                .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(primaryIndexName, secondaryIndexName);
-        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, tokenizerOp,
-                secondarySplitsAndConstraint.second);
-
-        // ---------- END TOKENIZER OP
-
-        // ---------- START EXTERNAL SORT OP
-
-        IBinaryComparatorFactory[] tokenKeyPairComparatorFactories = new IBinaryComparatorFactory[numTokenKeyPairFields];
-        tokenKeyPairComparatorFactories[0] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
-                fieldsToTokenizeType, OrderKind.ASC);
-        for (i = 0; i < numPrimaryKeys; i++)
-            tokenKeyPairComparatorFactories[i + 1] = primaryComparatorFactories[i];
-
-        int[] sortFields = new int[numTokenKeyPairFields]; // <token, primary
-        // key a, primary
-        // key b, etc.>
-        for (i = 0; i < numTokenKeyPairFields; i++)
-            sortFields[i] = i;
-
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> sorterSplitsAndConstraint = datasetDecls
-                .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(primaryIndexName, primaryIndexName);
-
-        ExternalSortOperatorDescriptor sortOp = new ExternalSortOperatorDescriptor(spec,
-                physicalOptimizationConfig.getMaxFramesExternalSort(), sortFields, tokenKeyPairComparatorFactories,
-                secondaryRecDesc);
-        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, sortOp,
-                sorterSplitsAndConstraint.second);
-
-        // ---------- END EXTERNAL SORT OP
-
-        // ---------- START SECONDARY INDEX BULK LOAD
-
-        ITypeTraits[] secondaryTypeTraits = new ITypeTraits[numTokenKeyPairFields];
-        secondaryTypeTraits[0] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(fieldsToTokenizeType);
-        for (i = 0; i < numPrimaryKeys; i++)
-            secondaryTypeTraits[i + 1] = primaryTypeTraits[i];
-
-        int[] fieldPermutation = new int[numSecondaryKeys + numPrimaryKeys];
-        for (i = 0; i < numTokenKeyPairFields; i++)
-            fieldPermutation[i] = i;
-
-		TreeIndexBulkLoadOperatorDescriptor secondaryBulkLoadOp = new TreeIndexBulkLoadOperatorDescriptor(
-				spec, storageManager, treeRegistryProvider,
-				secondarySplitsAndConstraint.first, secondaryTypeTraits,
-				tokenKeyPairComparatorFactories, fieldPermutation, 0.7f,
-				new BTreeDataflowHelperFactory(),
-				NoOpOperationCallbackProvider.INSTANCE);
-        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, secondaryBulkLoadOp,
-                secondarySplitsAndConstraint.second);
-
-        // ---------- END SECONDARY INDEX BULK LOAD
-
-        // ---------- START CONNECT THE OPERATORS
-
-        spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primarySearchOp, 0);
-
-        spec.connect(new OneToOneConnectorDescriptor(spec), primarySearchOp, 0, asterixAssignOp, 0);
-
-        spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, tokenizerOp, 0);
-
-        spec.connect(new OneToOneConnectorDescriptor(spec), tokenizerOp, 0, sortOp, 0);
-
-        spec.connect(new OneToOneConnectorDescriptor(spec), sortOp, 0, secondaryBulkLoadOp, 0);
-
-        spec.addRoot(secondaryBulkLoadOp);
-
-        // ---------- END CONNECT THE OPERATORS
-        spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
-        return spec;
-    }
-
     public static void main(String[] args) throws Exception {
         String host;
         String appName;
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryBTreeCreator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryBTreeCreator.java
new file mode 100644
index 0000000..0bfb209
--- /dev/null
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryBTreeCreator.java
@@ -0,0 +1,183 @@
+package edu.uci.ics.asterix.file;
+
+import java.util.List;
+
+import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
+import edu.uci.ics.asterix.common.context.AsterixStorageManagerInterface;
+import edu.uci.ics.asterix.common.context.AsterixTreeRegistryProvider;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
+import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import edu.uci.ics.asterix.formats.nontagged.AqlTypeTraitProvider;
+import edu.uci.ics.asterix.metadata.declared.AqlCompiledDatasetDecl;
+import edu.uci.ics.asterix.metadata.declared.AqlCompiledIndexDecl;
+import edu.uci.ics.asterix.metadata.declared.AqlCompiledMetadataDeclarations;
+import edu.uci.ics.asterix.metadata.utils.DatasetUtils;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.translator.DmlTranslator.CompiledCreateIndexStatement;
+import edu.uci.ics.hyracks.algebricks.core.algebra.data.ISerializerDeserializerProvider;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.base.IEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.base.IPushRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.jobgen.impl.ConnectorPolicyAssignmentPolicy;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.operators.std.AssignRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.core.api.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.core.api.constraints.AlgebricksPartitionConstraintHelper;
+import edu.uci.ics.hyracks.algebricks.core.api.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
+import edu.uci.ics.hyracks.algebricks.core.utils.Pair;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
+
+@SuppressWarnings("rawtypes")
+public class SecondaryBTreeCreator extends SecondaryIndexCreator {
+    
+    protected SecondaryBTreeCreator(PhysicalOptimizationConfig physOptConf) {
+        super(physOptConf);
+    }
+
+    @Override
+    public JobSpecification createJobSpec(CompiledCreateIndexStatement createIndexStmt,
+            AqlCompiledMetadataDeclarations metadata) throws AsterixException, AlgebricksException {
+        JobSpecification spec = new JobSpecification();
+        String datasetName = createIndexStmt.getDatasetName();
+        String secondaryIndexName = createIndexStmt.getIndexName();
+        AqlCompiledDatasetDecl compiledDatasetDecl = metadata.findDataset(datasetName);
+        if (compiledDatasetDecl == null) {
+            throw new AlgebricksException("Unknown dataset " + datasetName);
+        }
+        if (compiledDatasetDecl.getDatasetType() == DatasetType.EXTERNAL) {
+            throw new AsterixException("Cannot index an external dataset (" + datasetName + ").");
+        }
+        ARecordType itemType = (ARecordType) metadata.findType(compiledDatasetDecl.getItemTypeName());
+        ISerializerDeserializer payloadSerde = AqlSerializerDeserializerProvider.INSTANCE
+                .getSerializerDeserializer(itemType);
+        int numPrimaryKeys = DatasetUtils.getPartitioningFunctions(compiledDatasetDecl).size();
+
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndConstraint = metadata
+                .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, datasetName);
+        
+        // Create dummy key provider for feeding the primary index scan. 
+        AbstractOperatorDescriptor keyProviderOp = createDummyKeyProviderOp(spec);
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(
+                spec, keyProviderOp, splitProviderAndConstraint.second);
+        
+        // Create primary index scan op.
+        BTreeSearchOperatorDescriptor primaryScanOp = createPrimaryIndexScanOp(
+                spec, metadata, compiledDatasetDecl, itemType, payloadSerde,
+                splitProviderAndConstraint.first);
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(
+                spec, keyProviderOp, splitProviderAndConstraint.second);
+        
+        // ---------- START ASSIGN OP
+
+        List<String> secondaryKeyFields = createIndexStmt.getKeyFields();
+        int numSecondaryKeys = secondaryKeyFields.size();
+        ISerializerDeserializer[] secondaryRecFields = new ISerializerDeserializer[numPrimaryKeys + numSecondaryKeys];
+        IEvaluatorFactory[] evalFactories = new IEvaluatorFactory[numSecondaryKeys];
+        IBinaryComparatorFactory[] secondaryComparatorFactories = new IBinaryComparatorFactory[numSecondaryKeys
+                + numPrimaryKeys];
+        ITypeTraits[] secondaryTypeTraits = new ITypeTraits[numSecondaryKeys + numPrimaryKeys];
+        ISerializerDeserializerProvider serdeProvider = metadata.getFormat()
+                .getSerdeProvider();
+        for (int i = 0; i < numSecondaryKeys; i++) {
+            evalFactories[i] = metadata.getFormat().getFieldAccessEvaluatorFactory(itemType, secondaryKeyFields.get(i),
+                    numPrimaryKeys);
+            IAType keyType = AqlCompiledIndexDecl.keyFieldType(secondaryKeyFields.get(i), itemType);
+            ISerializerDeserializer keySerde = serdeProvider.getSerializerDeserializer(keyType);
+            secondaryRecFields[i] = keySerde;
+            secondaryComparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
+                    keyType, OrderKind.ASC);
+            secondaryTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
+        }
+        // Fill in serializers and comparators for primary index fields.
+        RecordDescriptor primaryRecDesc = primaryScanOp.getRecordDescriptor();
+        IBinaryComparatorFactory[] primaryComparatorFactories = primaryScanOp.getTreeIndexComparatorFactories();
+        for (int i = 0; i < numPrimaryKeys; i++) {
+            secondaryRecFields[numSecondaryKeys + i] = primaryRecDesc.getFields()[i];
+            secondaryTypeTraits[numSecondaryKeys + i] = primaryRecDesc.getTypeTraits()[i];
+            secondaryComparatorFactories[numSecondaryKeys + i] = primaryComparatorFactories[i];
+        }
+        RecordDescriptor secondaryRecDesc = new RecordDescriptor(secondaryRecFields);
+
+        int[] outColumns = new int[numSecondaryKeys];
+        int[] projectionList = new int[numSecondaryKeys + numPrimaryKeys];
+        for (int i = 0; i < numSecondaryKeys; i++) {
+            outColumns[i] = numPrimaryKeys + i + 1;
+        }
+        int projCount = 0;
+        for (int i = 0; i < numSecondaryKeys; i++) {
+            projectionList[projCount++] = numPrimaryKeys + i + 1;
+        }
+        for (int i = 0; i < numPrimaryKeys; i++) {
+            projectionList[projCount++] = i;
+        }
+
+        AssignRuntimeFactory assign = new AssignRuntimeFactory(outColumns, evalFactories, projectionList);
+        AlgebricksMetaOperatorDescriptor asterixAssignOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 1,
+                new IPushRuntimeFactory[] { assign }, new RecordDescriptor[] { secondaryRecDesc });
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, asterixAssignOp,
+                splitProviderAndConstraint.second);
+
+        // ---------- END ASSIGN OP
+
+        // ---------- START EXTERNAL SORT OP
+
+        int[] sortFields = new int[numSecondaryKeys + numPrimaryKeys];
+        for (int i = 0; i < numSecondaryKeys + numPrimaryKeys; i++) {
+            sortFields[i] = i;
+        }
+        ExternalSortOperatorDescriptor sortOp = new ExternalSortOperatorDescriptor(spec,
+                physOptConf.getMaxFramesExternalSort(), sortFields, secondaryComparatorFactories,
+                secondaryRecDesc);
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, sortOp,
+                splitProviderAndConstraint.second);
+
+        // ---------- END EXTERNAL SORT OP
+
+        // ---------- START SECONDARY INDEX BULK LOAD
+
+        int[] fieldPermutation = new int[numSecondaryKeys + numPrimaryKeys];
+        for (int i = 0; i < numSecondaryKeys + numPrimaryKeys; i++) {
+            fieldPermutation[i] = i;
+        }
+
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint = metadata
+                .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, secondaryIndexName);
+
+        // GlobalConfig.DEFAULT_BTREE_FILL_FACTOR
+        TreeIndexBulkLoadOperatorDescriptor secondaryBulkLoadOp = new TreeIndexBulkLoadOperatorDescriptor(
+                spec, AsterixStorageManagerInterface.INSTANCE, AsterixTreeRegistryProvider.INSTANCE,
+                secondarySplitsAndConstraint.first, secondaryTypeTraits,
+                secondaryComparatorFactories, fieldPermutation, 0.7f,
+                new BTreeDataflowHelperFactory(),
+                NoOpOperationCallbackProvider.INSTANCE);
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, secondaryBulkLoadOp,
+                secondarySplitsAndConstraint.second);
+
+        // ---------- END SECONDARY INDEX BULK LOAD
+
+        // Connect the operators.
+        spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primaryScanOp, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), primaryScanOp, 0, asterixAssignOp, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, sortOp, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), sortOp, 0, secondaryBulkLoadOp, 0);
+        spec.addRoot(secondaryBulkLoadOp);
+        spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+        return spec;
+
+    }
+}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java
new file mode 100644
index 0000000..fb17ddc
--- /dev/null
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java
@@ -0,0 +1,171 @@
+package edu.uci.ics.asterix.file;
+
+import java.io.DataOutput;
+import java.util.List;
+
+import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
+import edu.uci.ics.asterix.common.config.DatasetConfig.IndexType;
+import edu.uci.ics.asterix.common.context.AsterixStorageManagerInterface;
+import edu.uci.ics.asterix.common.context.AsterixTreeRegistryProvider;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
+import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import edu.uci.ics.asterix.formats.nontagged.AqlTypeTraitProvider;
+import edu.uci.ics.asterix.metadata.MetadataException;
+import edu.uci.ics.asterix.metadata.declared.AqlCompiledDatasetDecl;
+import edu.uci.ics.asterix.metadata.declared.AqlCompiledMetadataDeclarations;
+import edu.uci.ics.asterix.metadata.utils.DatasetUtils;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.translator.DmlTranslator.CompiledCreateIndexStatement;
+import edu.uci.ics.hyracks.algebricks.core.algebra.data.ISerializerDeserializerProvider;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.base.IEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.core.api.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.core.api.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
+import edu.uci.ics.hyracks.algebricks.core.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.utils.Triple;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.misc.ConstantTupleSourceOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
+
+@SuppressWarnings("rawtypes")
+public abstract class SecondaryIndexCreator {
+    protected final PhysicalOptimizationConfig physOptConf;
+    
+    protected int numPrimaryKeys;
+    protected int numSecondaryKeys;
+    protected AqlCompiledMetadataDeclarations metadata;
+    protected String datasetName;
+    protected ARecordType itemType;    
+    protected ISerializerDeserializer payloadSerde;
+    protected IFileSplitProvider primaryFileSplitProvider;
+    protected AlgebricksPartitionConstraint primaryPartitionConstrant;
+    protected String secondaryIndexName;
+    
+    // Prevent public construction.
+    protected SecondaryIndexCreator(PhysicalOptimizationConfig physOptConf) {
+        this.physOptConf = physOptConf;
+    }
+
+    public static SecondaryIndexCreator createIndexCreator(IndexType indexType, PhysicalOptimizationConfig physOptConf) throws AsterixException {
+        switch (indexType) {
+            case BTREE: {
+                return new SecondaryBTreeCreator(physOptConf);
+            }
+            case RTREE: {
+                return new SecondaryRTreeCreator(physOptConf);
+            }
+            case KEYWORD: {
+                return new SecondaryInvertedIndexCreator(physOptConf);
+            }
+            default: {
+                throw new AsterixException("Unknown Index Type: " + indexType);
+            }
+        }
+    }
+    
+    public abstract JobSpecification createJobSpec(CompiledCreateIndexStatement createIndexStmt,
+            AqlCompiledMetadataDeclarations metadata) throws AsterixException, AlgebricksException;
+    
+    protected void init(CompiledCreateIndexStatement createIndexStmt) throws AsterixException, AlgebricksException {
+        datasetName = createIndexStmt.getDatasetName();
+        secondaryIndexName = createIndexStmt.getIndexName();
+        AqlCompiledDatasetDecl compiledDatasetDecl = metadata.findDataset(datasetName);
+        if (compiledDatasetDecl == null) {
+            throw new AsterixException("Unknown dataset " + datasetName);
+        }
+        if (compiledDatasetDecl.getDatasetType() == DatasetType.EXTERNAL) {
+            throw new AsterixException("Cannot index an external dataset (" + datasetName + ").");
+        }
+        itemType = (ARecordType) metadata.findType(compiledDatasetDecl.getItemTypeName());
+        payloadSerde = AqlSerializerDeserializerProvider.INSTANCE
+                .getSerializerDeserializer(itemType);
+        numPrimaryKeys = DatasetUtils.getPartitioningFunctions(compiledDatasetDecl).size();
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndConstraint = metadata
+                .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, datasetName);
+        primaryFileSplitProvider = splitProviderAndConstraint.first;
+        primaryPartitionConstrant = splitProviderAndConstraint.second;
+    }
+    
+    protected AbstractOperatorDescriptor createDummyKeyProviderOp(
+            JobSpecification spec) throws AsterixException, AlgebricksException {
+        // Build dummy tuple containing one field with a dummy value inside.
+        ArrayTupleBuilder tb = new ArrayTupleBuilder(1);
+        DataOutput dos = tb.getDataOutput();
+        tb.reset();
+        try {
+            // Serialize dummy value into a field.
+            IntegerSerializerDeserializer.INSTANCE.serialize(0, dos);
+        } catch (HyracksDataException e) {
+            throw new AsterixException(e);
+        }
+        // Add dummy field.
+        tb.addFieldEndOffset();
+        ISerializerDeserializer[] keyRecDescSers = { IntegerSerializerDeserializer.INSTANCE };
+        RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
+        ConstantTupleSourceOperatorDescriptor keyProviderOp = new ConstantTupleSourceOperatorDescriptor(
+                spec, keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(),
+                tb.getSize());
+        return keyProviderOp;
+    }
+    
+    protected BTreeSearchOperatorDescriptor createPrimaryIndexScanOp(
+            JobSpecification spec, AqlCompiledMetadataDeclarations metadata,
+            AqlCompiledDatasetDecl compiledDatasetDecl, ARecordType itemType,
+            ISerializerDeserializer payloadSerde,
+            IFileSplitProvider splitProvider) throws AlgebricksException,
+            MetadataException {
+        int numPrimaryKeys = DatasetUtils.getPartitioningFunctions(
+                compiledDatasetDecl).size();
+        ISerializerDeserializer[] primaryRecFields = new ISerializerDeserializer[numPrimaryKeys + 1];       
+        ITypeTraits[] primaryTypeTraits = new ITypeTraits[numPrimaryKeys + 1];
+        IBinaryComparatorFactory[] primaryComparatorFactories = new IBinaryComparatorFactory[numPrimaryKeys];       
+        ISerializerDeserializerProvider serdeProvider = metadata.getFormat()
+                .getSerdeProvider();
+        List<Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType>> partitioningFunctions = DatasetUtils
+                .getPartitioningFunctions(compiledDatasetDecl);
+        int i = 0;
+        for (Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType> evalFactoryAndType : partitioningFunctions) {
+            IAType keyType = evalFactoryAndType.third;
+            ISerializerDeserializer keySerde = serdeProvider
+                    .getSerializerDeserializer(keyType);
+            primaryRecFields[i] = keySerde;
+            primaryComparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE
+                    .getBinaryComparatorFactory(keyType, OrderKind.ASC);
+            primaryTypeTraits[i] = AqlTypeTraitProvider.INSTANCE
+                    .getTypeTrait(keyType);
+            ++i;
+        }
+        primaryRecFields[numPrimaryKeys] = payloadSerde;
+        primaryTypeTraits[numPrimaryKeys] = AqlTypeTraitProvider.INSTANCE
+                .getTypeTrait(itemType);
+
+        // -Infinity
+        int[] lowKeyFields = null;
+        // +Infinity
+        int[] highKeyFields = null;
+        RecordDescriptor primaryRecDesc = new RecordDescriptor(primaryRecFields, primaryTypeTraits);
+
+        BTreeSearchOperatorDescriptor primarySearchOp = new BTreeSearchOperatorDescriptor(
+                spec, primaryRecDesc, AsterixStorageManagerInterface.INSTANCE,
+                AsterixTreeRegistryProvider.INSTANCE, splitProvider,
+                primaryTypeTraits, primaryComparatorFactories, lowKeyFields,
+                highKeyFields, true, true, new BTreeDataflowHelperFactory(),
+                NoOpOperationCallbackProvider.INSTANCE);
+        return primarySearchOp;
+    }
+}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryInvertedIndexCreator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryInvertedIndexCreator.java
new file mode 100644
index 0000000..4eb981d
--- /dev/null
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryInvertedIndexCreator.java
@@ -0,0 +1,308 @@
+package edu.uci.ics.asterix.file;
+
+import java.io.DataOutput;
+import java.util.List;
+
+import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
+import edu.uci.ics.asterix.common.context.AsterixStorageManagerInterface;
+import edu.uci.ics.asterix.common.context.AsterixTreeRegistryProvider;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
+import edu.uci.ics.asterix.formats.nontagged.AqlBinaryTokenizerFactoryProvider;
+import edu.uci.ics.asterix.formats.nontagged.AqlTypeTraitProvider;
+import edu.uci.ics.asterix.metadata.declared.AqlCompiledDatasetDecl;
+import edu.uci.ics.asterix.metadata.declared.AqlCompiledIndexDecl;
+import edu.uci.ics.asterix.metadata.declared.AqlCompiledMetadataDeclarations;
+import edu.uci.ics.asterix.metadata.utils.DatasetUtils;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.translator.DmlTranslator.CompiledCreateIndexStatement;
+import edu.uci.ics.hyracks.algebricks.core.algebra.data.ISerializerDeserializerProvider;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.base.IEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.base.IPushRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.jobgen.impl.ConnectorPolicyAssignmentPolicy;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.operators.std.AssignRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.core.api.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.core.api.constraints.AlgebricksPartitionConstraintHelper;
+import edu.uci.ics.hyracks.algebricks.core.api.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
+import edu.uci.ics.hyracks.algebricks.core.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.utils.Triple;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.misc.ConstantTupleSourceOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
+import edu.uci.ics.hyracks.storage.am.invertedindex.dataflow.BinaryTokenizerOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.invertedindex.tokenizers.IBinaryTokenizerFactory;
+import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
+
+@SuppressWarnings("rawtypes")
+public class SecondaryInvertedIndexCreator extends SecondaryIndexCreator {
+    
+    protected SecondaryInvertedIndexCreator(PhysicalOptimizationConfig physOptConf) {
+        super(physOptConf);
+    }
+
+    @Override
+    public JobSpecification createJobSpec(CompiledCreateIndexStatement createIndexStmt,
+            AqlCompiledMetadataDeclarations metadata) throws AsterixException, AlgebricksException {
+        JobSpecification spec = new JobSpecification();
+
+        String primaryIndexName = createIndexStmt.getDatasetName();
+        String secondaryIndexName = createIndexStmt.getIndexName();
+
+        AqlCompiledDatasetDecl compiledDatasetDecl = metadata.findDataset(primaryIndexName);
+        if (compiledDatasetDecl == null) {
+            throw new AsterixException("Could not find dataset " + primaryIndexName);
+        }
+
+        if (compiledDatasetDecl.getDatasetType() == DatasetType.EXTERNAL) {
+            throw new AsterixException("Cannot index an external dataset (" + primaryIndexName + ").");
+        }
+        ARecordType itemType = (ARecordType) metadata.findType(compiledDatasetDecl.getItemTypeName());
+        ISerializerDeserializerProvider serdeProvider = metadata.getFormat().getSerdeProvider();
+        ISerializerDeserializer payloadSerde = serdeProvider.getSerializerDeserializer(itemType);
+
+        int numPrimaryKeys = DatasetUtils.getPartitioningFunctions(compiledDatasetDecl).size();
+
+        // sanity
+        if (numPrimaryKeys > 1)
+            throw new AsterixException("Cannot create inverted keyword index on dataset with composite primary key.");
+
+        // sanity
+        IAType fieldsToTokenizeType = AqlCompiledIndexDecl
+                .keyFieldType(createIndexStmt.getKeyFields().get(0), itemType);
+        for (String fieldName : createIndexStmt.getKeyFields()) {
+            IAType nextFieldToTokenizeType = AqlCompiledIndexDecl.keyFieldType(fieldName, itemType);
+            if (nextFieldToTokenizeType.getTypeTag() != fieldsToTokenizeType.getTypeTag()) {
+                throw new AsterixException(
+                        "Cannot create inverted keyword index. Fields to tokenize must be of the same type.");
+            }
+        }
+
+        // ---------- START GENERAL BTREE STUFF
+
+        IIndexRegistryProvider<IIndex> treeRegistryProvider = AsterixTreeRegistryProvider.INSTANCE;
+        IStorageManagerInterface storageManager = AsterixStorageManagerInterface.INSTANCE;
+
+        // ---------- END GENERAL BTREE STUFF
+
+        // ---------- START KEY PROVIDER OP
+
+        // TODO: should actually be empty tuple source
+        // build tuple containing low and high search keys
+        ArrayTupleBuilder tb = new ArrayTupleBuilder(1); // just one dummy field
+        DataOutput dos = tb.getDataOutput();
+
+        try {
+            tb.reset();
+            IntegerSerializerDeserializer.INSTANCE.serialize(0, dos); // dummy
+            // field
+            tb.addFieldEndOffset();
+        } catch (HyracksDataException e) {
+            throw new AsterixException(e);
+        }
+
+        ISerializerDeserializer[] keyRecDescSers = { IntegerSerializerDeserializer.INSTANCE };
+        RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
+
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> keyProviderSplitsAndConstraint = metadata
+                .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(primaryIndexName, primaryIndexName);
+
+        ConstantTupleSourceOperatorDescriptor keyProviderOp = new ConstantTupleSourceOperatorDescriptor(spec,
+                keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, keyProviderOp,
+                keyProviderSplitsAndConstraint.second);
+
+        // ---------- END KEY PROVIDER OP
+
+        // ---------- START PRIMARY INDEX SCAN
+
+        ISerializerDeserializer[] primaryRecFields = new ISerializerDeserializer[numPrimaryKeys + 1];
+        IBinaryComparatorFactory[] primaryComparatorFactories = new IBinaryComparatorFactory[numPrimaryKeys];
+        ITypeTraits[] primaryTypeTraits = new ITypeTraits[numPrimaryKeys + 1];
+        int i = 0;
+        for (Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType> evalFactoryAndType : DatasetUtils
+                .getPartitioningFunctions(compiledDatasetDecl)) {
+            IAType keyType = evalFactoryAndType.third;
+            ISerializerDeserializer keySerde = serdeProvider.getSerializerDeserializer(keyType);
+            primaryRecFields[i] = keySerde;
+            primaryComparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
+                    keyType, OrderKind.ASC);
+            primaryTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
+            ++i;
+        }
+        primaryRecFields[numPrimaryKeys] = payloadSerde;
+        primaryTypeTraits[numPrimaryKeys] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(itemType);
+
+        int[] lowKeyFields = null; // -infinity
+        int[] highKeyFields = null; // +infinity
+        RecordDescriptor primaryRecDesc = new RecordDescriptor(primaryRecFields);
+
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primarySplitsAndConstraint = metadata
+                .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(primaryIndexName, primaryIndexName);
+
+        BTreeSearchOperatorDescriptor primarySearchOp = new BTreeSearchOperatorDescriptor(spec, primaryRecDesc,
+                storageManager, treeRegistryProvider, primarySplitsAndConstraint.first, primaryTypeTraits, primaryComparatorFactories, lowKeyFields,
+                highKeyFields, true, true, new BTreeDataflowHelperFactory(), NoOpOperationCallbackProvider.INSTANCE);
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, primarySearchOp,
+                primarySplitsAndConstraint.second);
+
+        // ---------- END PRIMARY INDEX SCAN
+
+        // ---------- START ASSIGN OP
+
+        List<String> secondaryKeyFields = createIndexStmt.getKeyFields();
+        int numSecondaryKeys = secondaryKeyFields.size();
+        ISerializerDeserializer[] secondaryRecFields = new ISerializerDeserializer[numPrimaryKeys + numSecondaryKeys];
+        IEvaluatorFactory[] evalFactories = new IEvaluatorFactory[numSecondaryKeys];
+        for (i = 0; i < numSecondaryKeys; i++) {
+            evalFactories[i] = metadata.getFormat().getFieldAccessEvaluatorFactory(itemType,
+                    secondaryKeyFields.get(i), numPrimaryKeys);
+            IAType keyType = AqlCompiledIndexDecl.keyFieldType(secondaryKeyFields.get(i), itemType);
+            ISerializerDeserializer keySerde = serdeProvider.getSerializerDeserializer(keyType);
+            secondaryRecFields[i] = keySerde;
+        }
+        // fill in serializers and comparators for primary index fields
+        for (i = 0; i < numPrimaryKeys; i++) {
+            secondaryRecFields[numSecondaryKeys + i] = primaryRecFields[i];
+        }
+        RecordDescriptor secondaryRecDesc = new RecordDescriptor(secondaryRecFields);
+
+        int[] outColumns = new int[numSecondaryKeys];
+        int[] projectionList = new int[numSecondaryKeys + numPrimaryKeys];
+        for (i = 0; i < numSecondaryKeys; i++) {
+            outColumns[i] = numPrimaryKeys + i + 1;
+        }
+        int projCount = 0;
+        for (i = 0; i < numSecondaryKeys; i++) {
+            projectionList[projCount++] = numPrimaryKeys + i + 1;
+        }
+        for (i = 0; i < numPrimaryKeys; i++) {
+            projectionList[projCount++] = i;
+        }
+
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> assignSplitsAndConstraint = metadata
+                .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(primaryIndexName, primaryIndexName);
+
+        AssignRuntimeFactory assign = new AssignRuntimeFactory(outColumns, evalFactories, projectionList);
+        AlgebricksMetaOperatorDescriptor asterixAssignOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 1,
+                new IPushRuntimeFactory[] { assign }, new RecordDescriptor[] { secondaryRecDesc });
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, asterixAssignOp,
+                assignSplitsAndConstraint.second);
+
+        // ---------- END ASSIGN OP
+
+        // ---------- START TOKENIZER OP
+
+        int numTokenKeyPairFields = numPrimaryKeys + 1;
+
+        ISerializerDeserializer[] tokenKeyPairFields = new ISerializerDeserializer[numTokenKeyPairFields];
+        tokenKeyPairFields[0] = serdeProvider.getSerializerDeserializer(fieldsToTokenizeType);
+        for (i = 0; i < numPrimaryKeys; i++)
+            tokenKeyPairFields[i + 1] = secondaryRecFields[numSecondaryKeys + i];
+        RecordDescriptor tokenKeyPairRecDesc = new RecordDescriptor(tokenKeyPairFields);
+
+        int[] fieldsToTokenize = new int[numSecondaryKeys];
+        for (i = 0; i < numSecondaryKeys; i++)
+            fieldsToTokenize[i] = i;
+
+        int[] primaryKeyFields = new int[numPrimaryKeys];
+        for (i = 0; i < numPrimaryKeys; i++)
+            primaryKeyFields[i] = numSecondaryKeys + i;
+
+        IBinaryTokenizerFactory tokenizerFactory = AqlBinaryTokenizerFactoryProvider.INSTANCE
+                .getTokenizerFactory(fieldsToTokenizeType);
+        BinaryTokenizerOperatorDescriptor tokenizerOp = new BinaryTokenizerOperatorDescriptor(spec,
+                tokenKeyPairRecDesc, tokenizerFactory, fieldsToTokenize, primaryKeyFields);
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint = metadata
+                .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(primaryIndexName, secondaryIndexName);
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, tokenizerOp,
+                secondarySplitsAndConstraint.second);
+
+        // ---------- END TOKENIZER OP
+
+        // ---------- START EXTERNAL SORT OP
+
+        IBinaryComparatorFactory[] tokenKeyPairComparatorFactories = new IBinaryComparatorFactory[numTokenKeyPairFields];
+        tokenKeyPairComparatorFactories[0] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
+                fieldsToTokenizeType, OrderKind.ASC);
+        for (i = 0; i < numPrimaryKeys; i++)
+            tokenKeyPairComparatorFactories[i + 1] = primaryComparatorFactories[i];
+
+        int[] sortFields = new int[numTokenKeyPairFields]; // <token, primary
+        // key a, primary
+        // key b, etc.>
+        for (i = 0; i < numTokenKeyPairFields; i++)
+            sortFields[i] = i;
+
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> sorterSplitsAndConstraint = metadata
+                .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(primaryIndexName, primaryIndexName);
+
+        ExternalSortOperatorDescriptor sortOp = new ExternalSortOperatorDescriptor(spec,
+                physOptConf.getMaxFramesExternalSort(), sortFields, tokenKeyPairComparatorFactories,
+                secondaryRecDesc);
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, sortOp,
+                sorterSplitsAndConstraint.second);
+
+        // ---------- END EXTERNAL SORT OP
+
+        // ---------- START SECONDARY INDEX BULK LOAD
+
+        ITypeTraits[] secondaryTypeTraits = new ITypeTraits[numTokenKeyPairFields];
+        secondaryTypeTraits[0] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(fieldsToTokenizeType);
+        for (i = 0; i < numPrimaryKeys; i++)
+            secondaryTypeTraits[i + 1] = primaryTypeTraits[i];
+
+        int[] fieldPermutation = new int[numSecondaryKeys + numPrimaryKeys];
+        for (i = 0; i < numTokenKeyPairFields; i++)
+            fieldPermutation[i] = i;
+
+        TreeIndexBulkLoadOperatorDescriptor secondaryBulkLoadOp = new TreeIndexBulkLoadOperatorDescriptor(
+                spec, storageManager, treeRegistryProvider,
+                secondarySplitsAndConstraint.first, secondaryTypeTraits,
+                tokenKeyPairComparatorFactories, fieldPermutation, 0.7f,
+                new BTreeDataflowHelperFactory(),
+                NoOpOperationCallbackProvider.INSTANCE);
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, secondaryBulkLoadOp,
+                secondarySplitsAndConstraint.second);
+
+        // ---------- END SECONDARY INDEX BULK LOAD
+
+        // ---------- START CONNECT THE OPERATORS
+
+        spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primarySearchOp, 0);
+
+        spec.connect(new OneToOneConnectorDescriptor(spec), primarySearchOp, 0, asterixAssignOp, 0);
+
+        spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, tokenizerOp, 0);
+
+        spec.connect(new OneToOneConnectorDescriptor(spec), tokenizerOp, 0, sortOp, 0);
+
+        spec.connect(new OneToOneConnectorDescriptor(spec), sortOp, 0, secondaryBulkLoadOp, 0);
+
+        spec.addRoot(secondaryBulkLoadOp);
+
+        // ---------- END CONNECT THE OPERATORS
+        spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+        return spec;
+    }
+
+}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryRTreeCreator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryRTreeCreator.java
new file mode 100644
index 0000000..396616d
--- /dev/null
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryRTreeCreator.java
@@ -0,0 +1,188 @@
+package edu.uci.ics.asterix.file;
+
+import java.util.List;
+
+import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
+import edu.uci.ics.asterix.common.context.AsterixStorageManagerInterface;
+import edu.uci.ics.asterix.common.context.AsterixTreeRegistryProvider;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.dataflow.data.nontagged.valueproviders.AqlPrimitiveValueProviderFactory;
+import edu.uci.ics.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
+import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import edu.uci.ics.asterix.formats.nontagged.AqlTypeTraitProvider;
+import edu.uci.ics.asterix.metadata.declared.AqlCompiledDatasetDecl;
+import edu.uci.ics.asterix.metadata.declared.AqlCompiledIndexDecl;
+import edu.uci.ics.asterix.metadata.declared.AqlCompiledMetadataDeclarations;
+import edu.uci.ics.asterix.metadata.utils.DatasetUtils;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.om.util.NonTaggedFormatUtil;
+import edu.uci.ics.asterix.translator.DmlTranslator.CompiledCreateIndexStatement;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.base.IEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.base.IPushRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.jobgen.impl.ConnectorPolicyAssignmentPolicy;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.operators.std.AssignRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.core.api.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.core.api.constraints.AlgebricksPartitionConstraintHelper;
+import edu.uci.ics.hyracks.algebricks.core.api.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
+import edu.uci.ics.hyracks.algebricks.core.utils.Pair;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
+import edu.uci.ics.hyracks.storage.am.rtree.dataflow.RTreeDataflowHelperFactory;
+
+@SuppressWarnings("rawtypes")
+public class SecondaryRTreeCreator extends SecondaryIndexCreator {
+
+    protected SecondaryRTreeCreator(PhysicalOptimizationConfig physOptConf) {
+        super(physOptConf);
+    }
+
+    @Override
+    public JobSpecification createJobSpec(CompiledCreateIndexStatement createIndexStmt,
+            AqlCompiledMetadataDeclarations metadata) throws AsterixException, AlgebricksException {
+        JobSpecification spec = new JobSpecification();
+        String datasetName = createIndexStmt.getDatasetName();
+        String secondaryIndexName = createIndexStmt.getIndexName();
+        AqlCompiledDatasetDecl compiledDatasetDecl = metadata.findDataset(datasetName);
+        if (compiledDatasetDecl == null) {
+            throw new AlgebricksException("Unknown dataset " + datasetName);
+        }
+        if (compiledDatasetDecl.getDatasetType() == DatasetType.EXTERNAL) {
+            throw new AsterixException("Cannot index an external dataset (" + datasetName + ").");
+        }
+        ARecordType itemType = (ARecordType) metadata.findType(compiledDatasetDecl.getItemTypeName());        
+        ISerializerDeserializer payloadSerde = AqlSerializerDeserializerProvider.INSTANCE
+                .getSerializerDeserializer(itemType);
+        int numPrimaryKeys = DatasetUtils.getPartitioningFunctions(compiledDatasetDecl).size();
+
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndConstraint = metadata
+                .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, datasetName);
+        
+        // Create dummy key provider for feeding the primary index scan. 
+        AbstractOperatorDescriptor keyProviderOp = createDummyKeyProviderOp(spec);
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(
+                spec, keyProviderOp, splitProviderAndConstraint.second);
+        
+        // Create primary index scan op.
+        BTreeSearchOperatorDescriptor primaryScanOp = createPrimaryIndexScanOp(
+                spec, metadata, compiledDatasetDecl, itemType, payloadSerde,
+                splitProviderAndConstraint.first);
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(
+                spec, keyProviderOp, splitProviderAndConstraint.second);
+
+        // ---------- START ASSIGN OP
+
+        List<String> secondaryKeyFields = createIndexStmt.getKeyFields();
+        int numSecondaryKeys = secondaryKeyFields.size();
+
+        if (numSecondaryKeys != 1) {
+            throw new AsterixException(
+                    "Cannot use "
+                            + numSecondaryKeys
+                            + " fields as a key for the R-tree index. There can be only one field as a key for the R-tree index.");
+        }
+
+        IAType spatialType = AqlCompiledIndexDecl.keyFieldType(secondaryKeyFields.get(0), itemType);
+        if (spatialType == null) {
+            throw new AsterixException("Could not find field " + secondaryKeyFields.get(0) + " in the schema.");
+        }
+
+        int dimension = NonTaggedFormatUtil.getNumDimensions(spatialType.getTypeTag());
+        int numNestedSecondaryKeyFields = dimension * 2;
+
+        IEvaluatorFactory[] evalFactories = metadata.getFormat().createMBRFactory(itemType, secondaryKeyFields.get(0),
+                numPrimaryKeys, dimension);
+
+        ISerializerDeserializer[] secondaryRecFields = new ISerializerDeserializer[numPrimaryKeys
+                + numNestedSecondaryKeyFields];
+        IBinaryComparatorFactory[] secondaryComparatorFactories = new IBinaryComparatorFactory[numNestedSecondaryKeyFields];
+        ITypeTraits[] secondaryTypeTraits = new ITypeTraits[numNestedSecondaryKeyFields + numPrimaryKeys];
+        IPrimitiveValueProviderFactory[] valueProviderFactories = new IPrimitiveValueProviderFactory[numNestedSecondaryKeyFields];
+
+        IAType keyType = AqlCompiledIndexDecl.keyFieldType(secondaryKeyFields.get(0), itemType);
+        IAType nestedKeyType = NonTaggedFormatUtil.getNestedSpatialType(keyType.getTypeTag());
+        for (int i = 0; i < numNestedSecondaryKeyFields; i++) {
+            ISerializerDeserializer keySerde = AqlSerializerDeserializerProvider.INSTANCE
+                    .getSerializerDeserializer(nestedKeyType);
+            secondaryRecFields[i] = keySerde;
+            secondaryComparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
+                    nestedKeyType, OrderKind.ASC);
+            secondaryTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(nestedKeyType);
+            valueProviderFactories[i] = AqlPrimitiveValueProviderFactory.INSTANCE;
+        }
+
+        // Fill in serializers and comparators for primary index fields.
+        RecordDescriptor primaryRecDesc = primaryScanOp.getRecordDescriptor();
+        for (int i = 0; i < numPrimaryKeys; i++) {
+            secondaryRecFields[numNestedSecondaryKeyFields + i] = primaryRecDesc.getFields()[i];
+            secondaryTypeTraits[numNestedSecondaryKeyFields + i] = primaryRecDesc.getTypeTraits()[i];
+        }
+        RecordDescriptor secondaryRecDesc = new RecordDescriptor(secondaryRecFields);
+
+        int[] outColumns = new int[numNestedSecondaryKeyFields];
+        int[] projectionList = new int[numNestedSecondaryKeyFields + numPrimaryKeys];
+        for (int i = 0; i < numNestedSecondaryKeyFields; i++) {
+            outColumns[i] = numPrimaryKeys + i + 1;
+        }
+        int projCount = 0;
+        for (int i = 0; i < numNestedSecondaryKeyFields; i++) {
+            projectionList[projCount++] = numPrimaryKeys + i + 1;
+        }
+        for (int i = 0; i < numPrimaryKeys; i++) {
+            projectionList[projCount++] = i;
+        }
+
+        AssignRuntimeFactory assign = new AssignRuntimeFactory(outColumns, evalFactories, projectionList);
+        AlgebricksMetaOperatorDescriptor asterixAssignOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 1,
+                new IPushRuntimeFactory[] { assign }, new RecordDescriptor[] { secondaryRecDesc });
+
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, asterixAssignOp,
+                splitProviderAndConstraint.second);
+
+        // ---------- END ASSIGN OP
+
+        // ---------- START SECONDARY INDEX BULK LOAD
+
+        int[] fieldPermutation = new int[numNestedSecondaryKeyFields + numPrimaryKeys];
+        for (int i = 0; i < numNestedSecondaryKeyFields + numPrimaryKeys; i++) {
+            fieldPermutation[i] = i;
+        }
+
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint = metadata
+                .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, secondaryIndexName);
+
+        TreeIndexBulkLoadOperatorDescriptor secondaryBulkLoadOp = new TreeIndexBulkLoadOperatorDescriptor(
+                spec, AsterixStorageManagerInterface.INSTANCE, AsterixTreeRegistryProvider.INSTANCE,
+                secondarySplitsAndConstraint.first, secondaryTypeTraits,
+                secondaryComparatorFactories, fieldPermutation, 0.7f,
+                new RTreeDataflowHelperFactory(valueProviderFactories),
+                NoOpOperationCallbackProvider.INSTANCE);
+
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, secondaryBulkLoadOp,
+                secondarySplitsAndConstraint.second);
+
+        // ---------- END SECONDARY INDEX BULK LOAD
+
+        // Connect the operators.
+        spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primaryScanOp, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), primaryScanOp, 0, asterixAssignOp, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, secondaryBulkLoadOp, 0);
+        spec.addRoot(secondaryBulkLoadOp);
+        spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+        return spec;
+    }
+
+}