Finished code sharing between job gen of BTree and RTree index creation (InvertedIndex will be cleaned next).

git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix-fix-issue-9@211 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryBTreeCreator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryBTreeCreator.java
index 9f3d508..cb292a2 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryBTreeCreator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryBTreeCreator.java
@@ -1,43 +1,22 @@
 package edu.uci.ics.asterix.file;
 
-import java.util.List;
-
-import edu.uci.ics.asterix.common.context.AsterixStorageManagerInterface;
-import edu.uci.ics.asterix.common.context.AsterixTreeRegistryProvider;
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
-import edu.uci.ics.asterix.formats.nontagged.AqlTypeTraitProvider;
-import edu.uci.ics.asterix.metadata.declared.AqlCompiledIndexDecl;
 import edu.uci.ics.asterix.metadata.declared.AqlCompiledMetadataDeclarations;
-import edu.uci.ics.asterix.om.types.IAType;
 import edu.uci.ics.asterix.translator.DmlTranslator.CompiledCreateIndexStatement;
-import edu.uci.ics.hyracks.algebricks.core.algebra.data.ISerializerDeserializerProvider;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
-import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.base.IEvaluatorFactory;
-import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.base.IPushRuntimeFactory;
 import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.jobgen.impl.ConnectorPolicyAssignmentPolicy;
 import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
-import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.operators.std.AssignRuntimeFactory;
-import edu.uci.ics.hyracks.algebricks.core.api.constraints.AlgebricksPartitionConstraint;
 import edu.uci.ics.hyracks.algebricks.core.api.constraints.AlgebricksPartitionConstraintHelper;
 import edu.uci.ics.hyracks.algebricks.core.api.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
-import edu.uci.ics.hyracks.algebricks.core.utils.Pair;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
 import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
 import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
 
-@SuppressWarnings("rawtypes")
 public class SecondaryBTreeCreator extends SecondaryIndexCreator {
     
     protected SecondaryBTreeCreator(PhysicalOptimizationConfig physOptConf) {
@@ -52,88 +31,20 @@
         
         // Create dummy key provider for feeding the primary index scan. 
         AbstractOperatorDescriptor keyProviderOp = createDummyKeyProviderOp(spec);
-        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(
-                spec, keyProviderOp, primaryPartitionConstrant);
         
         // Create primary index scan op.
         BTreeSearchOperatorDescriptor primaryScanOp = createPrimaryIndexScanOp(spec);
-        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(
-                spec, keyProviderOp, primaryPartitionConstrant);
         
-        // ---------- START ASSIGN OP
-
-        List<String> secondaryKeyFields = createIndexStmt.getKeyFields();
-        int numSecondaryKeys = secondaryKeyFields.size();
-        ISerializerDeserializer[] secondaryRecFields = new ISerializerDeserializer[numPrimaryKeys + numSecondaryKeys];
-        IEvaluatorFactory[] evalFactories = new IEvaluatorFactory[numSecondaryKeys];
-        IBinaryComparatorFactory[] secondaryComparatorFactories = new IBinaryComparatorFactory[numSecondaryKeys
-                + numPrimaryKeys];
-        ITypeTraits[] secondaryTypeTraits = new ITypeTraits[numSecondaryKeys + numPrimaryKeys];
-        ISerializerDeserializerProvider serdeProvider = metadata.getFormat()
-                .getSerdeProvider();
-        for (int i = 0; i < numSecondaryKeys; i++) {
-            evalFactories[i] = metadata.getFormat().getFieldAccessEvaluatorFactory(itemType, secondaryKeyFields.get(i),
-                    numPrimaryKeys);
-            IAType keyType = AqlCompiledIndexDecl.keyFieldType(secondaryKeyFields.get(i), itemType);
-            ISerializerDeserializer keySerde = serdeProvider.getSerializerDeserializer(keyType);
-            secondaryRecFields[i] = keySerde;
-            secondaryComparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
-                    keyType, OrderKind.ASC);
-            secondaryTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
-        }
-        // Fill in serializers and comparators for primary index fields.
-        RecordDescriptor primaryRecDesc = primaryScanOp.getRecordDescriptor();
-        IBinaryComparatorFactory[] primaryComparatorFactories = primaryScanOp.getTreeIndexComparatorFactories();
-        for (int i = 0; i < numPrimaryKeys; i++) {
-            secondaryRecFields[numSecondaryKeys + i] = primaryRecDesc.getFields()[i];
-            secondaryTypeTraits[numSecondaryKeys + i] = primaryRecDesc.getTypeTraits()[i];
-            secondaryComparatorFactories[numSecondaryKeys + i] = primaryComparatorFactories[i];
-        }
-        RecordDescriptor secondaryRecDesc = new RecordDescriptor(secondaryRecFields);
-
-        int[] outColumns = new int[numSecondaryKeys];
-        int[] projectionList = new int[numSecondaryKeys + numPrimaryKeys];
-        for (int i = 0; i < numSecondaryKeys; i++) {
-            outColumns[i] = numPrimaryKeys + i + 1;
-        }
-        int projCount = 0;
-        for (int i = 0; i < numSecondaryKeys; i++) {
-            projectionList[projCount++] = numPrimaryKeys + i + 1;
-        }
-        for (int i = 0; i < numPrimaryKeys; i++) {
-            projectionList[projCount++] = i;
-        }
-
-        AssignRuntimeFactory assign = new AssignRuntimeFactory(outColumns, evalFactories, projectionList);
-        AlgebricksMetaOperatorDescriptor asterixAssignOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 1,
-                new IPushRuntimeFactory[] { assign }, new RecordDescriptor[] { secondaryRecDesc });
-        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, asterixAssignOp,
-                primaryPartitionConstrant);
-
-        // ---------- END ASSIGN OP
-
+        // Assign op.
+        AlgebricksMetaOperatorDescriptor asterixAssignOp = createAssignOp(spec, primaryScanOp, numSecondaryKeys);
+        
         // Sort by secondary keys.
         ExternalSortOperatorDescriptor sortOp = createSortOp(spec, secondaryComparatorFactories, secondaryRecDesc);
-        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, sortOp, primaryPartitionConstrant);
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, sortOp, primaryPartitionConstraint);
 
-        // ---------- START SECONDARY INDEX BULK LOAD
-
-        int[] fieldPermutation = new int[numSecondaryKeys + numPrimaryKeys];
-        for (int i = 0; i < numSecondaryKeys + numPrimaryKeys; i++) {
-            fieldPermutation[i] = i;
-        }
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint = metadata
-                .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, secondaryIndexName);
-        TreeIndexBulkLoadOperatorDescriptor secondaryBulkLoadOp = new TreeIndexBulkLoadOperatorDescriptor(
-                spec, AsterixStorageManagerInterface.INSTANCE, AsterixTreeRegistryProvider.INSTANCE,
-                secondarySplitsAndConstraint.first, secondaryTypeTraits,
-                secondaryComparatorFactories, fieldPermutation, 0.7f,
-                new BTreeDataflowHelperFactory(),
-                NoOpOperationCallbackProvider.INSTANCE);
-        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, secondaryBulkLoadOp,
-                secondarySplitsAndConstraint.second);
-
-        // ---------- END SECONDARY INDEX BULK LOAD
+        // Create secondary BTree bulk load op.
+        TreeIndexBulkLoadOperatorDescriptor secondaryBulkLoadOp = createTreeIndexBulkLoadOp(spec, numSecondaryKeys,
+                new BTreeDataflowHelperFactory(), BTree.DEFAULT_FILL_FACTOR);
 
         // Connect the operators.
         spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primaryScanOp, 0);
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java
index 5d2a71f..e331d10 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java
@@ -11,6 +11,7 @@
 import edu.uci.ics.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
 import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
 import edu.uci.ics.asterix.formats.nontagged.AqlTypeTraitProvider;
+import edu.uci.ics.asterix.metadata.MetadataException;
 import edu.uci.ics.asterix.metadata.declared.AqlCompiledDatasetDecl;
 import edu.uci.ics.asterix.metadata.declared.AqlCompiledIndexDecl;
 import edu.uci.ics.asterix.metadata.declared.AqlCompiledMetadataDeclarations;
@@ -45,6 +46,8 @@
 import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
 import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
 
 @SuppressWarnings("rawtypes")
@@ -59,9 +62,11 @@
     protected ARecordType itemType;    
     protected ISerializerDeserializer payloadSerde;
     protected IFileSplitProvider primaryFileSplitProvider;
-    protected AlgebricksPartitionConstraint primaryPartitionConstrant;
+    protected AlgebricksPartitionConstraint primaryPartitionConstraint;
     protected String secondaryIndexName;
-    
+
+    protected IBinaryComparatorFactory[] primaryComparatorFactories;
+    protected RecordDescriptor primaryRecDesc;
     protected IBinaryComparatorFactory[] secondaryComparatorFactories;
     protected RecordDescriptor secondaryRecDesc;
     protected IEvaluatorFactory[] evalFactories;
@@ -106,10 +111,62 @@
         payloadSerde = AqlSerializerDeserializerProvider.INSTANCE
                 .getSerializerDeserializer(itemType);
         numPrimaryKeys = DatasetUtils.getPartitioningFunctions(datasetDecl).size();
+        numSecondaryKeys = createIndexStmt.getKeyFields().size();
         Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndConstraint = metadata
                 .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, datasetName);
         primaryFileSplitProvider = splitProviderAndConstraint.first;
-        primaryPartitionConstrant = splitProviderAndConstraint.second;
+        primaryPartitionConstraint = splitProviderAndConstraint.second;
+        // Must be called in this order.
+        setPrimaryRecDescAndComparators();
+        setSecondaryRecDescAndComparators(createIndexStmt.getKeyFields());
+    }
+    
+    protected void setPrimaryRecDescAndComparators() throws AlgebricksException {
+        int numPrimaryKeys = DatasetUtils.getPartitioningFunctions(datasetDecl).size();
+        ISerializerDeserializer[] primaryRecFields = new ISerializerDeserializer[numPrimaryKeys + 1];
+        ITypeTraits[] primaryTypeTraits = new ITypeTraits[numPrimaryKeys + 1];
+        primaryComparatorFactories = new IBinaryComparatorFactory[numPrimaryKeys];
+        ISerializerDeserializerProvider serdeProvider = metadata.getFormat().getSerdeProvider();
+        List<Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType>> partitioningFunctions = DatasetUtils
+                .getPartitioningFunctions(datasetDecl);
+        int i = 0;
+        for (Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType> evalFactoryAndType : partitioningFunctions) {
+            IAType keyType = evalFactoryAndType.third;
+            ISerializerDeserializer keySerde = serdeProvider.getSerializerDeserializer(keyType);
+            primaryRecFields[i] = keySerde;
+            primaryComparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
+                    keyType, OrderKind.ASC);
+            primaryTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
+            ++i;
+        }
+        primaryRecFields[numPrimaryKeys] = payloadSerde;
+        primaryTypeTraits[numPrimaryKeys] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(itemType);
+        primaryRecDesc = new RecordDescriptor(primaryRecFields, primaryTypeTraits);
+    }
+    
+    protected void setSecondaryRecDescAndComparators(List<String> secondaryKeyFields) throws AlgebricksException, AsterixException {
+        evalFactories = new IEvaluatorFactory[numSecondaryKeys];
+        secondaryComparatorFactories = new IBinaryComparatorFactory[numSecondaryKeys + numPrimaryKeys];
+        ISerializerDeserializer[] secondaryRecFields = new ISerializerDeserializer[numPrimaryKeys + numSecondaryKeys];
+        ITypeTraits[] secondaryTypeTraits = new ITypeTraits[numSecondaryKeys + numPrimaryKeys];
+        ISerializerDeserializerProvider serdeProvider = metadata.getFormat().getSerdeProvider();
+        for (int i = 0; i < numSecondaryKeys; i++) {
+            evalFactories[i] = metadata.getFormat().getFieldAccessEvaluatorFactory(itemType, secondaryKeyFields.get(i),
+                    numPrimaryKeys);
+            IAType keyType = AqlCompiledIndexDecl.keyFieldType(secondaryKeyFields.get(i), itemType);
+            ISerializerDeserializer keySerde = serdeProvider.getSerializerDeserializer(keyType);
+            secondaryRecFields[i] = keySerde;
+            secondaryComparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
+                    keyType, OrderKind.ASC);
+            secondaryTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
+        }
+        // Add serializers and comparators for primary index fields.
+        for (int i = 0; i < numPrimaryKeys; i++) {
+            secondaryRecFields[numSecondaryKeys + i] = primaryRecDesc.getFields()[i];
+            secondaryTypeTraits[numSecondaryKeys + i] = primaryRecDesc.getTypeTraits()[i];
+            secondaryComparatorFactories[numSecondaryKeys + i] = primaryComparatorFactories[i];
+        }
+        secondaryRecDesc = new RecordDescriptor(secondaryRecFields, secondaryTypeTraits);
     }
     
     protected AbstractOperatorDescriptor createDummyKeyProviderOp(
@@ -132,111 +189,33 @@
                 spec, keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(),
                 tb.getSize());
         AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(
-                spec, keyProviderOp, primaryPartitionConstrant);
+                spec, keyProviderOp, primaryPartitionConstraint);
         return keyProviderOp;
     }
     
     protected BTreeSearchOperatorDescriptor createPrimaryIndexScanOp(JobSpecification spec) throws AlgebricksException {
-        int numPrimaryKeys = DatasetUtils.getPartitioningFunctions(datasetDecl).size();
-        ISerializerDeserializer[] primaryRecFields = new ISerializerDeserializer[numPrimaryKeys + 1];
-        ITypeTraits[] primaryTypeTraits = new ITypeTraits[numPrimaryKeys + 1];
-        IBinaryComparatorFactory[] primaryComparatorFactories = new IBinaryComparatorFactory[numPrimaryKeys];
-        ISerializerDeserializerProvider serdeProvider = metadata.getFormat().getSerdeProvider();
-        List<Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType>> partitioningFunctions = DatasetUtils
-                .getPartitioningFunctions(datasetDecl);
-        int i = 0;
-        for (Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType> evalFactoryAndType : partitioningFunctions) {
-            IAType keyType = evalFactoryAndType.third;
-            ISerializerDeserializer keySerde = serdeProvider.getSerializerDeserializer(keyType);
-            primaryRecFields[i] = keySerde;
-            primaryComparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
-                    keyType, OrderKind.ASC);
-            primaryTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
-            ++i;
-        }
-        primaryRecFields[numPrimaryKeys] = payloadSerde;
-        primaryTypeTraits[numPrimaryKeys] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(itemType);
-
         // -Infinity
         int[] lowKeyFields = null;
         // +Infinity
         int[] highKeyFields = null;
-        RecordDescriptor primaryRecDesc = new RecordDescriptor(primaryRecFields, primaryTypeTraits);
-
         BTreeSearchOperatorDescriptor primarySearchOp = new BTreeSearchOperatorDescriptor(spec, primaryRecDesc,
                 AsterixStorageManagerInterface.INSTANCE, AsterixTreeRegistryProvider.INSTANCE,
-                primaryFileSplitProvider, primaryTypeTraits, primaryComparatorFactories, lowKeyFields, highKeyFields,
-                true, true, new BTreeDataflowHelperFactory(), NoOpOperationCallbackProvider.INSTANCE);
-        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(
-                spec, primarySearchOp, primaryPartitionConstrant);
+                primaryFileSplitProvider, primaryRecDesc.getTypeTraits(), primaryComparatorFactories, lowKeyFields,
+                highKeyFields, true, true, new BTreeDataflowHelperFactory(), NoOpOperationCallbackProvider.INSTANCE);
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, primarySearchOp,
+                primaryPartitionConstraint);
         return primarySearchOp;
     }
     
-    protected void setSecondaryIndexFactories(BTreeSearchOperatorDescriptor primaryScanOp,
-            List<String> secondaryKeyFields) throws AlgebricksException {
-        int numSecondaryKeys = secondaryKeyFields.size();
-        evalFactories = new IEvaluatorFactory[numSecondaryKeys];
-        secondaryComparatorFactories = new IBinaryComparatorFactory[numSecondaryKeys + numPrimaryKeys];
-        ISerializerDeserializer[] secondaryRecFields = new ISerializerDeserializer[numPrimaryKeys + numSecondaryKeys];
-        ITypeTraits[] secondaryTypeTraits = new ITypeTraits[numSecondaryKeys + numPrimaryKeys];
-        ISerializerDeserializerProvider serdeProvider = metadata.getFormat().getSerdeProvider();
-        for (int i = 0; i < numSecondaryKeys; i++) {
-            evalFactories[i] = metadata.getFormat().getFieldAccessEvaluatorFactory(itemType, secondaryKeyFields.get(i),
-                    numPrimaryKeys);
-            IAType keyType = AqlCompiledIndexDecl.keyFieldType(secondaryKeyFields.get(i), itemType);
-            ISerializerDeserializer keySerde = serdeProvider.getSerializerDeserializer(keyType);
-            secondaryRecFields[i] = keySerde;
-            secondaryComparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
-                    keyType, OrderKind.ASC);
-            secondaryTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
-        }
-        // Fill in serializers and comparators for primary index fields.
-        RecordDescriptor primaryRecDesc = primaryScanOp.getRecordDescriptor();
-        IBinaryComparatorFactory[] primaryComparatorFactories = primaryScanOp.getTreeIndexComparatorFactories();
-        for (int i = 0; i < numPrimaryKeys; i++) {
-            secondaryRecFields[numSecondaryKeys + i] = primaryRecDesc.getFields()[i];
-            secondaryTypeTraits[numSecondaryKeys + i] = primaryRecDesc.getTypeTraits()[i];
-            secondaryComparatorFactories[numSecondaryKeys + i] = primaryComparatorFactories[i];
-        }
-        secondaryRecDesc = new RecordDescriptor(secondaryRecFields, secondaryTypeTraits);
-    }
-    
     protected AlgebricksMetaOperatorDescriptor createAssignOp(JobSpecification spec,
-            BTreeSearchOperatorDescriptor primaryScanOp, List<String> secondaryKeyFields) throws AlgebricksException {
-        int numSecondaryKeys = secondaryKeyFields.size();
-        ISerializerDeserializer[] secondaryRecFields = new ISerializerDeserializer[numPrimaryKeys + numSecondaryKeys];
-        IEvaluatorFactory[] evalFactories = new IEvaluatorFactory[numSecondaryKeys];
-        IBinaryComparatorFactory[] secondaryComparatorFactories = new IBinaryComparatorFactory[numSecondaryKeys
-                + numPrimaryKeys];
-        ITypeTraits[] secondaryTypeTraits = new ITypeTraits[numSecondaryKeys + numPrimaryKeys];
-        ISerializerDeserializerProvider serdeProvider = metadata.getFormat().getSerdeProvider();
-        for (int i = 0; i < numSecondaryKeys; i++) {
-            evalFactories[i] = metadata.getFormat().getFieldAccessEvaluatorFactory(itemType, secondaryKeyFields.get(i),
-                    numPrimaryKeys);
-            IAType keyType = AqlCompiledIndexDecl.keyFieldType(secondaryKeyFields.get(i), itemType);
-            ISerializerDeserializer keySerde = serdeProvider.getSerializerDeserializer(keyType);
-            secondaryRecFields[i] = keySerde;
-            secondaryComparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
-                    keyType, OrderKind.ASC);
-            secondaryTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
-        }
-        // Fill in serializers and comparators for primary index fields.
-        RecordDescriptor primaryRecDesc = primaryScanOp.getRecordDescriptor();
-        IBinaryComparatorFactory[] primaryComparatorFactories = primaryScanOp.getTreeIndexComparatorFactories();
-        for (int i = 0; i < numPrimaryKeys; i++) {
-            secondaryRecFields[numSecondaryKeys + i] = primaryRecDesc.getFields()[i];
-            secondaryTypeTraits[numSecondaryKeys + i] = primaryRecDesc.getTypeTraits()[i];
-            secondaryComparatorFactories[numSecondaryKeys + i] = primaryComparatorFactories[i];
-        }
-        RecordDescriptor secondaryRecDesc = new RecordDescriptor(secondaryRecFields);
-
-        int[] outColumns = new int[numSecondaryKeys];
-        int[] projectionList = new int[numSecondaryKeys + numPrimaryKeys];
-        for (int i = 0; i < numSecondaryKeys; i++) {
+            BTreeSearchOperatorDescriptor primaryScanOp, int numSecondaryKeyFields) throws AlgebricksException {
+        int[] outColumns = new int[numSecondaryKeyFields];
+        int[] projectionList = new int[numSecondaryKeyFields + numPrimaryKeys];
+        for (int i = 0; i < numSecondaryKeyFields; i++) {
             outColumns[i] = numPrimaryKeys + i + 1;
         }
         int projCount = 0;
-        for (int i = 0; i < numSecondaryKeys; i++) {
+        for (int i = 0; i < numSecondaryKeyFields; i++) {
             projectionList[projCount++] = numPrimaryKeys + i + 1;
         }
         for (int i = 0; i < numPrimaryKeys; i++) {
@@ -247,7 +226,7 @@
         AlgebricksMetaOperatorDescriptor asterixAssignOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 1,
                 new IPushRuntimeFactory[] { assign }, new RecordDescriptor[] { secondaryRecDesc });
         AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, asterixAssignOp,
-                primaryPartitionConstrant);
+                primaryPartitionConstraint);
         return asterixAssignOp;
     }
     
@@ -260,7 +239,25 @@
         ExternalSortOperatorDescriptor sortOp = new ExternalSortOperatorDescriptor(spec,
                 physOptConf.getMaxFramesExternalSort(), sortFields, secondaryComparatorFactories, secondaryRecDesc);
         AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, sortOp,
-                primaryPartitionConstrant);
+                primaryPartitionConstraint);
         return sortOp;
     }
+    
+    protected TreeIndexBulkLoadOperatorDescriptor createTreeIndexBulkLoadOp(JobSpecification spec,
+            int numSecondaryKeyFields, IIndexDataflowHelperFactory dataflowHelperFactory, float fillFactor) throws MetadataException,
+            AlgebricksException {
+        int[] fieldPermutation = new int[numSecondaryKeyFields + numPrimaryKeys];
+        for (int i = 0; i < numSecondaryKeyFields + numPrimaryKeys; i++) {
+            fieldPermutation[i] = i;
+        }
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint = metadata
+                .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, secondaryIndexName);
+        TreeIndexBulkLoadOperatorDescriptor treeIndexBulkLoadOp = new TreeIndexBulkLoadOperatorDescriptor(spec,
+                AsterixStorageManagerInterface.INSTANCE, AsterixTreeRegistryProvider.INSTANCE,
+                secondarySplitsAndConstraint.first, secondaryRecDesc.getTypeTraits(), secondaryComparatorFactories,
+                fieldPermutation, fillFactor, dataflowHelperFactory, NoOpOperationCallbackProvider.INSTANCE);
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, treeIndexBulkLoadOp,
+                secondarySplitsAndConstraint.second);
+        return treeIndexBulkLoadOp;
+    }
 }
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryRTreeCreator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryRTreeCreator.java
index 93b48a3..6fb8ee0 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryRTreeCreator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryRTreeCreator.java
@@ -2,8 +2,6 @@
 
 import java.util.List;
 
-import edu.uci.ics.asterix.common.context.AsterixStorageManagerInterface;
-import edu.uci.ics.asterix.common.context.AsterixTreeRegistryProvider;
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
 import edu.uci.ics.asterix.dataflow.data.nontagged.valueproviders.AqlPrimitiveValueProviderFactory;
 import edu.uci.ics.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
@@ -15,16 +13,10 @@
 import edu.uci.ics.asterix.om.util.NonTaggedFormatUtil;
 import edu.uci.ics.asterix.translator.DmlTranslator.CompiledCreateIndexStatement;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
-import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.base.IEvaluatorFactory;
-import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.base.IPushRuntimeFactory;
 import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.jobgen.impl.ConnectorPolicyAssignmentPolicy;
 import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
-import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.operators.std.AssignRuntimeFactory;
-import edu.uci.ics.hyracks.algebricks.core.api.constraints.AlgebricksPartitionConstraint;
-import edu.uci.ics.hyracks.algebricks.core.api.constraints.AlgebricksPartitionConstraintHelper;
 import edu.uci.ics.hyracks.algebricks.core.api.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
-import edu.uci.ics.hyracks.algebricks.core.utils.Pair;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
@@ -32,21 +24,65 @@
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
 import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
 import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
 import edu.uci.ics.hyracks.storage.am.rtree.dataflow.RTreeDataflowHelperFactory;
 
 @SuppressWarnings("rawtypes")
 public class SecondaryRTreeCreator extends SecondaryIndexCreator {
 
+    protected IPrimitiveValueProviderFactory[] valueProviderFactories;
+    protected int numNestedSecondaryKeyFields;
+    
     protected SecondaryRTreeCreator(PhysicalOptimizationConfig physOptConf) {
         super(physOptConf);
     }
 
     @Override
+    protected void setSecondaryRecDescAndComparators(List<String> secondaryKeyFields) throws AlgebricksException,
+            AsterixException {
+        int numSecondaryKeys = secondaryKeyFields.size();
+        if (numSecondaryKeys != 1) {
+            throw new AsterixException(
+                    "Cannot use "
+                            + numSecondaryKeys
+                            + " fields as a key for the R-tree index. There can be only one field as a key for the R-tree index.");
+        }
+        IAType spatialType = AqlCompiledIndexDecl.keyFieldType(secondaryKeyFields.get(0), itemType);
+        if (spatialType == null) {
+            throw new AsterixException("Could not find field " + secondaryKeyFields.get(0) + " in the schema.");
+        }
+        int numDimensions = NonTaggedFormatUtil.getNumDimensions(spatialType.getTypeTag());
+        numNestedSecondaryKeyFields = numDimensions * 2;
+        evalFactories = metadata.getFormat().createMBRFactory(itemType, secondaryKeyFields.get(0), numPrimaryKeys,
+                numDimensions);
+        secondaryComparatorFactories = new IBinaryComparatorFactory[numNestedSecondaryKeyFields];
+        valueProviderFactories = new IPrimitiveValueProviderFactory[numNestedSecondaryKeyFields];
+        ISerializerDeserializer[] secondaryRecFields = new ISerializerDeserializer[numPrimaryKeys
+                + numNestedSecondaryKeyFields];
+        ITypeTraits[] secondaryTypeTraits = new ITypeTraits[numNestedSecondaryKeyFields + numPrimaryKeys];
+        IAType keyType = AqlCompiledIndexDecl.keyFieldType(secondaryKeyFields.get(0), itemType);
+        IAType nestedKeyType = NonTaggedFormatUtil.getNestedSpatialType(keyType.getTypeTag());
+        for (int i = 0; i < numNestedSecondaryKeyFields; i++) {
+            ISerializerDeserializer keySerde = AqlSerializerDeserializerProvider.INSTANCE
+                    .getSerializerDeserializer(nestedKeyType);
+            secondaryRecFields[i] = keySerde;
+            secondaryComparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
+                    nestedKeyType, OrderKind.ASC);
+            secondaryTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(nestedKeyType);
+            valueProviderFactories[i] = AqlPrimitiveValueProviderFactory.INSTANCE;
+        }
+        // Add serializers and comparators for primary index fields.
+        for (int i = 0; i < numPrimaryKeys; i++) {
+            secondaryRecFields[numNestedSecondaryKeyFields + i] = primaryRecDesc.getFields()[i];
+            secondaryTypeTraits[numNestedSecondaryKeyFields + i] = primaryRecDesc.getTypeTraits()[i];
+        }
+        secondaryRecDesc = new RecordDescriptor(secondaryRecFields, secondaryTypeTraits);
+    }
+    
+    @Override
     public JobSpecification createJobSpec(CompiledCreateIndexStatement createIndexStmt,
             AqlCompiledMetadataDeclarations metadata) throws AsterixException, AlgebricksException {
         init(createIndexStmt, metadata);
@@ -58,97 +94,13 @@
         // Create primary index scan op.
         BTreeSearchOperatorDescriptor primaryScanOp = createPrimaryIndexScanOp(spec);
 
-        // ---------- START ASSIGN OP
+        // Assign op.
+        AlgebricksMetaOperatorDescriptor asterixAssignOp = createAssignOp(spec, primaryScanOp, numNestedSecondaryKeyFields);
 
-        List<String> secondaryKeyFields = createIndexStmt.getKeyFields();
-        int numSecondaryKeys = secondaryKeyFields.size();
-
-        if (numSecondaryKeys != 1) {
-            throw new AsterixException(
-                    "Cannot use "
-                            + numSecondaryKeys
-                            + " fields as a key for the R-tree index. There can be only one field as a key for the R-tree index.");
-        }
-
-        IAType spatialType = AqlCompiledIndexDecl.keyFieldType(secondaryKeyFields.get(0), itemType);
-        if (spatialType == null) {
-            throw new AsterixException("Could not find field " + secondaryKeyFields.get(0) + " in the schema.");
-        }
-
-        int dimension = NonTaggedFormatUtil.getNumDimensions(spatialType.getTypeTag());
-        int numNestedSecondaryKeyFields = dimension * 2;
-
-        IEvaluatorFactory[] evalFactories = metadata.getFormat().createMBRFactory(itemType, secondaryKeyFields.get(0),
-                numPrimaryKeys, dimension);
-
-        ISerializerDeserializer[] secondaryRecFields = new ISerializerDeserializer[numPrimaryKeys
-                + numNestedSecondaryKeyFields];
-        IBinaryComparatorFactory[] secondaryComparatorFactories = new IBinaryComparatorFactory[numNestedSecondaryKeyFields];
-        ITypeTraits[] secondaryTypeTraits = new ITypeTraits[numNestedSecondaryKeyFields + numPrimaryKeys];
-        IPrimitiveValueProviderFactory[] valueProviderFactories = new IPrimitiveValueProviderFactory[numNestedSecondaryKeyFields];
-
-        IAType keyType = AqlCompiledIndexDecl.keyFieldType(secondaryKeyFields.get(0), itemType);
-        IAType nestedKeyType = NonTaggedFormatUtil.getNestedSpatialType(keyType.getTypeTag());
-        for (int i = 0; i < numNestedSecondaryKeyFields; i++) {
-            ISerializerDeserializer keySerde = AqlSerializerDeserializerProvider.INSTANCE
-                    .getSerializerDeserializer(nestedKeyType);
-            secondaryRecFields[i] = keySerde;
-            secondaryComparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
-                    nestedKeyType, OrderKind.ASC);
-            secondaryTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(nestedKeyType);
-            valueProviderFactories[i] = AqlPrimitiveValueProviderFactory.INSTANCE;
-        }
-
-        // Fill in serializers and comparators for primary index fields.
-        RecordDescriptor primaryRecDesc = primaryScanOp.getRecordDescriptor();
-        for (int i = 0; i < numPrimaryKeys; i++) {
-            secondaryRecFields[numNestedSecondaryKeyFields + i] = primaryRecDesc.getFields()[i];
-            secondaryTypeTraits[numNestedSecondaryKeyFields + i] = primaryRecDesc.getTypeTraits()[i];
-        }
-        RecordDescriptor secondaryRecDesc = new RecordDescriptor(secondaryRecFields);
-
-        int[] outColumns = new int[numNestedSecondaryKeyFields];
-        int[] projectionList = new int[numNestedSecondaryKeyFields + numPrimaryKeys];
-        for (int i = 0; i < numNestedSecondaryKeyFields; i++) {
-            outColumns[i] = numPrimaryKeys + i + 1;
-        }
-        int projCount = 0;
-        for (int i = 0; i < numNestedSecondaryKeyFields; i++) {
-            projectionList[projCount++] = numPrimaryKeys + i + 1;
-        }
-        for (int i = 0; i < numPrimaryKeys; i++) {
-            projectionList[projCount++] = i;
-        }
-
-        AssignRuntimeFactory assign = new AssignRuntimeFactory(outColumns, evalFactories, projectionList);
-        AlgebricksMetaOperatorDescriptor asterixAssignOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 1,
-                new IPushRuntimeFactory[] { assign }, new RecordDescriptor[] { secondaryRecDesc });
-        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, asterixAssignOp,
-                primaryPartitionConstrant);
-
-        // ---------- END ASSIGN OP
-
-        // ---------- START SECONDARY INDEX BULK LOAD
-
-        int[] fieldPermutation = new int[numNestedSecondaryKeyFields + numPrimaryKeys];
-        for (int i = 0; i < numNestedSecondaryKeyFields + numPrimaryKeys; i++) {
-            fieldPermutation[i] = i;
-        }
-
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint = metadata
-                .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, secondaryIndexName);
-
-        TreeIndexBulkLoadOperatorDescriptor secondaryBulkLoadOp = new TreeIndexBulkLoadOperatorDescriptor(
-                spec, AsterixStorageManagerInterface.INSTANCE, AsterixTreeRegistryProvider.INSTANCE,
-                secondarySplitsAndConstraint.first, secondaryTypeTraits,
-                secondaryComparatorFactories, fieldPermutation, 0.7f,
-                new RTreeDataflowHelperFactory(valueProviderFactories),
-                NoOpOperationCallbackProvider.INSTANCE);
-
-        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, secondaryBulkLoadOp,
-                secondarySplitsAndConstraint.second);
-
-        // ---------- END SECONDARY INDEX BULK LOAD
+        // Create secondary RTree bulk load op.
+        TreeIndexBulkLoadOperatorDescriptor secondaryBulkLoadOp = createTreeIndexBulkLoadOp(spec,
+                numNestedSecondaryKeyFields, new RTreeDataflowHelperFactory(valueProviderFactories),
+                BTree.DEFAULT_FILL_FACTOR);
 
         // Connect the operators.
         spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primaryScanOp, 0);