More code sharing between index creation job gen. More to come.

git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix-fix-issue-9@210 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryBTreeCreator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryBTreeCreator.java
index 0bfb209..9f3d508 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryBTreeCreator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryBTreeCreator.java
@@ -2,18 +2,13 @@
 
 import java.util.List;
 
-import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
 import edu.uci.ics.asterix.common.context.AsterixStorageManagerInterface;
 import edu.uci.ics.asterix.common.context.AsterixTreeRegistryProvider;
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
 import edu.uci.ics.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
-import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
 import edu.uci.ics.asterix.formats.nontagged.AqlTypeTraitProvider;
-import edu.uci.ics.asterix.metadata.declared.AqlCompiledDatasetDecl;
 import edu.uci.ics.asterix.metadata.declared.AqlCompiledIndexDecl;
 import edu.uci.ics.asterix.metadata.declared.AqlCompiledMetadataDeclarations;
-import edu.uci.ics.asterix.metadata.utils.DatasetUtils;
-import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.asterix.om.types.IAType;
 import edu.uci.ics.asterix.translator.DmlTranslator.CompiledCreateIndexStatement;
 import edu.uci.ics.hyracks.algebricks.core.algebra.data.ISerializerDeserializerProvider;
@@ -52,35 +47,18 @@
     @Override
     public JobSpecification createJobSpec(CompiledCreateIndexStatement createIndexStmt,
             AqlCompiledMetadataDeclarations metadata) throws AsterixException, AlgebricksException {
+        init(createIndexStmt, metadata);
         JobSpecification spec = new JobSpecification();
-        String datasetName = createIndexStmt.getDatasetName();
-        String secondaryIndexName = createIndexStmt.getIndexName();
-        AqlCompiledDatasetDecl compiledDatasetDecl = metadata.findDataset(datasetName);
-        if (compiledDatasetDecl == null) {
-            throw new AlgebricksException("Unknown dataset " + datasetName);
-        }
-        if (compiledDatasetDecl.getDatasetType() == DatasetType.EXTERNAL) {
-            throw new AsterixException("Cannot index an external dataset (" + datasetName + ").");
-        }
-        ARecordType itemType = (ARecordType) metadata.findType(compiledDatasetDecl.getItemTypeName());
-        ISerializerDeserializer payloadSerde = AqlSerializerDeserializerProvider.INSTANCE
-                .getSerializerDeserializer(itemType);
-        int numPrimaryKeys = DatasetUtils.getPartitioningFunctions(compiledDatasetDecl).size();
-
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndConstraint = metadata
-                .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, datasetName);
         
         // Create dummy key provider for feeding the primary index scan. 
         AbstractOperatorDescriptor keyProviderOp = createDummyKeyProviderOp(spec);
         AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(
-                spec, keyProviderOp, splitProviderAndConstraint.second);
+                spec, keyProviderOp, primaryPartitionConstrant);
         
         // Create primary index scan op.
-        BTreeSearchOperatorDescriptor primaryScanOp = createPrimaryIndexScanOp(
-                spec, metadata, compiledDatasetDecl, itemType, payloadSerde,
-                splitProviderAndConstraint.first);
+        BTreeSearchOperatorDescriptor primaryScanOp = createPrimaryIndexScanOp(spec);
         AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(
-                spec, keyProviderOp, splitProviderAndConstraint.second);
+                spec, keyProviderOp, primaryPartitionConstrant);
         
         // ---------- START ASSIGN OP
 
@@ -130,23 +108,13 @@
         AlgebricksMetaOperatorDescriptor asterixAssignOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 1,
                 new IPushRuntimeFactory[] { assign }, new RecordDescriptor[] { secondaryRecDesc });
         AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, asterixAssignOp,
-                splitProviderAndConstraint.second);
+                primaryPartitionConstrant);
 
         // ---------- END ASSIGN OP
 
-        // ---------- START EXTERNAL SORT OP
-
-        int[] sortFields = new int[numSecondaryKeys + numPrimaryKeys];
-        for (int i = 0; i < numSecondaryKeys + numPrimaryKeys; i++) {
-            sortFields[i] = i;
-        }
-        ExternalSortOperatorDescriptor sortOp = new ExternalSortOperatorDescriptor(spec,
-                physOptConf.getMaxFramesExternalSort(), sortFields, secondaryComparatorFactories,
-                secondaryRecDesc);
-        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, sortOp,
-                splitProviderAndConstraint.second);
-
-        // ---------- END EXTERNAL SORT OP
+        // Sort by secondary keys.
+        ExternalSortOperatorDescriptor sortOp = createSortOp(spec, secondaryComparatorFactories, secondaryRecDesc);
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, sortOp, primaryPartitionConstrant);
 
         // ---------- START SECONDARY INDEX BULK LOAD
 
@@ -154,11 +122,8 @@
         for (int i = 0; i < numSecondaryKeys + numPrimaryKeys; i++) {
             fieldPermutation[i] = i;
         }
-
         Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint = metadata
                 .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, secondaryIndexName);
-
-        // GlobalConfig.DEFAULT_BTREE_FILL_FACTOR
         TreeIndexBulkLoadOperatorDescriptor secondaryBulkLoadOp = new TreeIndexBulkLoadOperatorDescriptor(
                 spec, AsterixStorageManagerInterface.INSTANCE, AsterixTreeRegistryProvider.INSTANCE,
                 secondarySplitsAndConstraint.first, secondaryTypeTraits,
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java
index fb17ddc..5d2a71f 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java
@@ -11,8 +11,8 @@
 import edu.uci.ics.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
 import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
 import edu.uci.ics.asterix.formats.nontagged.AqlTypeTraitProvider;
-import edu.uci.ics.asterix.metadata.MetadataException;
 import edu.uci.ics.asterix.metadata.declared.AqlCompiledDatasetDecl;
+import edu.uci.ics.asterix.metadata.declared.AqlCompiledIndexDecl;
 import edu.uci.ics.asterix.metadata.declared.AqlCompiledMetadataDeclarations;
 import edu.uci.ics.asterix.metadata.utils.DatasetUtils;
 import edu.uci.ics.asterix.om.types.ARecordType;
@@ -22,7 +22,11 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
 import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.base.IEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.base.IPushRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.operators.std.AssignRuntimeFactory;
 import edu.uci.ics.hyracks.algebricks.core.api.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.core.api.constraints.AlgebricksPartitionConstraintHelper;
 import edu.uci.ics.hyracks.algebricks.core.api.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
 import edu.uci.ics.hyracks.algebricks.core.utils.Pair;
@@ -38,6 +42,7 @@
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
 import edu.uci.ics.hyracks.dataflow.std.misc.ConstantTupleSourceOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
 import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
@@ -50,12 +55,17 @@
     protected int numSecondaryKeys;
     protected AqlCompiledMetadataDeclarations metadata;
     protected String datasetName;
+    protected AqlCompiledDatasetDecl datasetDecl;
     protected ARecordType itemType;    
     protected ISerializerDeserializer payloadSerde;
     protected IFileSplitProvider primaryFileSplitProvider;
     protected AlgebricksPartitionConstraint primaryPartitionConstrant;
     protected String secondaryIndexName;
     
+    protected IBinaryComparatorFactory[] secondaryComparatorFactories;
+    protected RecordDescriptor secondaryRecDesc;
+    protected IEvaluatorFactory[] evalFactories;
+    
     // Prevent public construction.
     protected SecondaryIndexCreator(PhysicalOptimizationConfig physOptConf) {
         this.physOptConf = physOptConf;
@@ -81,20 +91,21 @@
     public abstract JobSpecification createJobSpec(CompiledCreateIndexStatement createIndexStmt,
             AqlCompiledMetadataDeclarations metadata) throws AsterixException, AlgebricksException;
     
-    protected void init(CompiledCreateIndexStatement createIndexStmt) throws AsterixException, AlgebricksException {
+    protected void init(CompiledCreateIndexStatement createIndexStmt, AqlCompiledMetadataDeclarations metadata) throws AsterixException, AlgebricksException {
+        this.metadata = metadata;
         datasetName = createIndexStmt.getDatasetName();
         secondaryIndexName = createIndexStmt.getIndexName();
-        AqlCompiledDatasetDecl compiledDatasetDecl = metadata.findDataset(datasetName);
-        if (compiledDatasetDecl == null) {
+        datasetDecl = metadata.findDataset(datasetName);
+        if (datasetDecl == null) {
             throw new AsterixException("Unknown dataset " + datasetName);
         }
-        if (compiledDatasetDecl.getDatasetType() == DatasetType.EXTERNAL) {
+        if (datasetDecl.getDatasetType() == DatasetType.EXTERNAL) {
             throw new AsterixException("Cannot index an external dataset (" + datasetName + ").");
         }
-        itemType = (ARecordType) metadata.findType(compiledDatasetDecl.getItemTypeName());
+        itemType = (ARecordType) metadata.findType(datasetDecl.getItemTypeName());
         payloadSerde = AqlSerializerDeserializerProvider.INSTANCE
                 .getSerializerDeserializer(itemType);
-        numPrimaryKeys = DatasetUtils.getPartitioningFunctions(compiledDatasetDecl).size();
+        numPrimaryKeys = DatasetUtils.getPartitioningFunctions(datasetDecl).size();
         Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndConstraint = metadata
                 .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, datasetName);
         primaryFileSplitProvider = splitProviderAndConstraint.first;
@@ -120,39 +131,31 @@
         ConstantTupleSourceOperatorDescriptor keyProviderOp = new ConstantTupleSourceOperatorDescriptor(
                 spec, keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(),
                 tb.getSize());
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(
+                spec, keyProviderOp, primaryPartitionConstrant);
         return keyProviderOp;
     }
     
-    protected BTreeSearchOperatorDescriptor createPrimaryIndexScanOp(
-            JobSpecification spec, AqlCompiledMetadataDeclarations metadata,
-            AqlCompiledDatasetDecl compiledDatasetDecl, ARecordType itemType,
-            ISerializerDeserializer payloadSerde,
-            IFileSplitProvider splitProvider) throws AlgebricksException,
-            MetadataException {
-        int numPrimaryKeys = DatasetUtils.getPartitioningFunctions(
-                compiledDatasetDecl).size();
-        ISerializerDeserializer[] primaryRecFields = new ISerializerDeserializer[numPrimaryKeys + 1];       
+    protected BTreeSearchOperatorDescriptor createPrimaryIndexScanOp(JobSpecification spec) throws AlgebricksException {
+        int numPrimaryKeys = DatasetUtils.getPartitioningFunctions(datasetDecl).size();
+        ISerializerDeserializer[] primaryRecFields = new ISerializerDeserializer[numPrimaryKeys + 1];
         ITypeTraits[] primaryTypeTraits = new ITypeTraits[numPrimaryKeys + 1];
-        IBinaryComparatorFactory[] primaryComparatorFactories = new IBinaryComparatorFactory[numPrimaryKeys];       
-        ISerializerDeserializerProvider serdeProvider = metadata.getFormat()
-                .getSerdeProvider();
+        IBinaryComparatorFactory[] primaryComparatorFactories = new IBinaryComparatorFactory[numPrimaryKeys];
+        ISerializerDeserializerProvider serdeProvider = metadata.getFormat().getSerdeProvider();
         List<Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType>> partitioningFunctions = DatasetUtils
-                .getPartitioningFunctions(compiledDatasetDecl);
+                .getPartitioningFunctions(datasetDecl);
         int i = 0;
         for (Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType> evalFactoryAndType : partitioningFunctions) {
             IAType keyType = evalFactoryAndType.third;
-            ISerializerDeserializer keySerde = serdeProvider
-                    .getSerializerDeserializer(keyType);
+            ISerializerDeserializer keySerde = serdeProvider.getSerializerDeserializer(keyType);
             primaryRecFields[i] = keySerde;
-            primaryComparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE
-                    .getBinaryComparatorFactory(keyType, OrderKind.ASC);
-            primaryTypeTraits[i] = AqlTypeTraitProvider.INSTANCE
-                    .getTypeTrait(keyType);
+            primaryComparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
+                    keyType, OrderKind.ASC);
+            primaryTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
             ++i;
         }
         primaryRecFields[numPrimaryKeys] = payloadSerde;
-        primaryTypeTraits[numPrimaryKeys] = AqlTypeTraitProvider.INSTANCE
-                .getTypeTrait(itemType);
+        primaryTypeTraits[numPrimaryKeys] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(itemType);
 
         // -Infinity
         int[] lowKeyFields = null;
@@ -160,12 +163,104 @@
         int[] highKeyFields = null;
         RecordDescriptor primaryRecDesc = new RecordDescriptor(primaryRecFields, primaryTypeTraits);
 
-        BTreeSearchOperatorDescriptor primarySearchOp = new BTreeSearchOperatorDescriptor(
-                spec, primaryRecDesc, AsterixStorageManagerInterface.INSTANCE,
-                AsterixTreeRegistryProvider.INSTANCE, splitProvider,
-                primaryTypeTraits, primaryComparatorFactories, lowKeyFields,
-                highKeyFields, true, true, new BTreeDataflowHelperFactory(),
-                NoOpOperationCallbackProvider.INSTANCE);
+        BTreeSearchOperatorDescriptor primarySearchOp = new BTreeSearchOperatorDescriptor(spec, primaryRecDesc,
+                AsterixStorageManagerInterface.INSTANCE, AsterixTreeRegistryProvider.INSTANCE,
+                primaryFileSplitProvider, primaryTypeTraits, primaryComparatorFactories, lowKeyFields, highKeyFields,
+                true, true, new BTreeDataflowHelperFactory(), NoOpOperationCallbackProvider.INSTANCE);
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(
+                spec, primarySearchOp, primaryPartitionConstrant);
         return primarySearchOp;
     }
+    
+    protected void setSecondaryIndexFactories(BTreeSearchOperatorDescriptor primaryScanOp,
+            List<String> secondaryKeyFields) throws AlgebricksException {
+        int numSecondaryKeys = secondaryKeyFields.size();
+        evalFactories = new IEvaluatorFactory[numSecondaryKeys];
+        secondaryComparatorFactories = new IBinaryComparatorFactory[numSecondaryKeys + numPrimaryKeys];
+        ISerializerDeserializer[] secondaryRecFields = new ISerializerDeserializer[numPrimaryKeys + numSecondaryKeys];
+        ITypeTraits[] secondaryTypeTraits = new ITypeTraits[numSecondaryKeys + numPrimaryKeys];
+        ISerializerDeserializerProvider serdeProvider = metadata.getFormat().getSerdeProvider();
+        for (int i = 0; i < numSecondaryKeys; i++) {
+            evalFactories[i] = metadata.getFormat().getFieldAccessEvaluatorFactory(itemType, secondaryKeyFields.get(i),
+                    numPrimaryKeys);
+            IAType keyType = AqlCompiledIndexDecl.keyFieldType(secondaryKeyFields.get(i), itemType);
+            ISerializerDeserializer keySerde = serdeProvider.getSerializerDeserializer(keyType);
+            secondaryRecFields[i] = keySerde;
+            secondaryComparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
+                    keyType, OrderKind.ASC);
+            secondaryTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
+        }
+        // Fill in serializers and comparators for primary index fields.
+        RecordDescriptor primaryRecDesc = primaryScanOp.getRecordDescriptor();
+        IBinaryComparatorFactory[] primaryComparatorFactories = primaryScanOp.getTreeIndexComparatorFactories();
+        for (int i = 0; i < numPrimaryKeys; i++) {
+            secondaryRecFields[numSecondaryKeys + i] = primaryRecDesc.getFields()[i];
+            secondaryTypeTraits[numSecondaryKeys + i] = primaryRecDesc.getTypeTraits()[i];
+            secondaryComparatorFactories[numSecondaryKeys + i] = primaryComparatorFactories[i];
+        }
+        secondaryRecDesc = new RecordDescriptor(secondaryRecFields, secondaryTypeTraits);
+    }
+    
+    protected AlgebricksMetaOperatorDescriptor createAssignOp(JobSpecification spec,
+            BTreeSearchOperatorDescriptor primaryScanOp, List<String> secondaryKeyFields) throws AlgebricksException {
+        int numSecondaryKeys = secondaryKeyFields.size();
+        ISerializerDeserializer[] secondaryRecFields = new ISerializerDeserializer[numPrimaryKeys + numSecondaryKeys];
+        IEvaluatorFactory[] evalFactories = new IEvaluatorFactory[numSecondaryKeys];
+        IBinaryComparatorFactory[] secondaryComparatorFactories = new IBinaryComparatorFactory[numSecondaryKeys
+                + numPrimaryKeys];
+        ITypeTraits[] secondaryTypeTraits = new ITypeTraits[numSecondaryKeys + numPrimaryKeys];
+        ISerializerDeserializerProvider serdeProvider = metadata.getFormat().getSerdeProvider();
+        for (int i = 0; i < numSecondaryKeys; i++) {
+            evalFactories[i] = metadata.getFormat().getFieldAccessEvaluatorFactory(itemType, secondaryKeyFields.get(i),
+                    numPrimaryKeys);
+            IAType keyType = AqlCompiledIndexDecl.keyFieldType(secondaryKeyFields.get(i), itemType);
+            ISerializerDeserializer keySerde = serdeProvider.getSerializerDeserializer(keyType);
+            secondaryRecFields[i] = keySerde;
+            secondaryComparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
+                    keyType, OrderKind.ASC);
+            secondaryTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
+        }
+        // Fill in serializers and comparators for primary index fields.
+        RecordDescriptor primaryRecDesc = primaryScanOp.getRecordDescriptor();
+        IBinaryComparatorFactory[] primaryComparatorFactories = primaryScanOp.getTreeIndexComparatorFactories();
+        for (int i = 0; i < numPrimaryKeys; i++) {
+            secondaryRecFields[numSecondaryKeys + i] = primaryRecDesc.getFields()[i];
+            secondaryTypeTraits[numSecondaryKeys + i] = primaryRecDesc.getTypeTraits()[i];
+            secondaryComparatorFactories[numSecondaryKeys + i] = primaryComparatorFactories[i];
+        }
+        RecordDescriptor secondaryRecDesc = new RecordDescriptor(secondaryRecFields);
+
+        int[] outColumns = new int[numSecondaryKeys];
+        int[] projectionList = new int[numSecondaryKeys + numPrimaryKeys];
+        for (int i = 0; i < numSecondaryKeys; i++) {
+            outColumns[i] = numPrimaryKeys + i + 1;
+        }
+        int projCount = 0;
+        for (int i = 0; i < numSecondaryKeys; i++) {
+            projectionList[projCount++] = numPrimaryKeys + i + 1;
+        }
+        for (int i = 0; i < numPrimaryKeys; i++) {
+            projectionList[projCount++] = i;
+        }
+
+        AssignRuntimeFactory assign = new AssignRuntimeFactory(outColumns, evalFactories, projectionList);
+        AlgebricksMetaOperatorDescriptor asterixAssignOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 1,
+                new IPushRuntimeFactory[] { assign }, new RecordDescriptor[] { secondaryRecDesc });
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, asterixAssignOp,
+                primaryPartitionConstrant);
+        return asterixAssignOp;
+    }
+    
+    protected ExternalSortOperatorDescriptor createSortOp(JobSpecification spec,
+            IBinaryComparatorFactory[] secondaryComparatorFactories, RecordDescriptor secondaryRecDesc) {
+        int[] sortFields = new int[secondaryComparatorFactories.length];
+        for (int i = 0; i < secondaryComparatorFactories.length; i++) {
+            sortFields[i] = i;
+        }
+        ExternalSortOperatorDescriptor sortOp = new ExternalSortOperatorDescriptor(spec,
+                physOptConf.getMaxFramesExternalSort(), sortFields, secondaryComparatorFactories, secondaryRecDesc);
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, sortOp,
+                primaryPartitionConstrant);
+        return sortOp;
+    }
 }
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryInvertedIndexCreator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryInvertedIndexCreator.java
index 4eb981d..120edeb 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryInvertedIndexCreator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryInvertedIndexCreator.java
@@ -244,14 +244,15 @@
         IBinaryComparatorFactory[] tokenKeyPairComparatorFactories = new IBinaryComparatorFactory[numTokenKeyPairFields];
         tokenKeyPairComparatorFactories[0] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
                 fieldsToTokenizeType, OrderKind.ASC);
-        for (i = 0; i < numPrimaryKeys; i++)
+        for (i = 0; i < numPrimaryKeys; i++) {
             tokenKeyPairComparatorFactories[i + 1] = primaryComparatorFactories[i];
+        }
 
-        int[] sortFields = new int[numTokenKeyPairFields]; // <token, primary
-        // key a, primary
-        // key b, etc.>
-        for (i = 0; i < numTokenKeyPairFields; i++)
+        // <token, primarykey a, primarykey b, etc.>
+        int[] sortFields = new int[numTokenKeyPairFields]; 
+        for (i = 0; i < numTokenKeyPairFields; i++) {
             sortFields[i] = i;
+        }
 
         Pair<IFileSplitProvider, AlgebricksPartitionConstraint> sorterSplitsAndConstraint = metadata
                 .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(primaryIndexName, primaryIndexName);
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryRTreeCreator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryRTreeCreator.java
index 396616d..93b48a3 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryRTreeCreator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryRTreeCreator.java
@@ -2,7 +2,6 @@
 
 import java.util.List;
 
-import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
 import edu.uci.ics.asterix.common.context.AsterixStorageManagerInterface;
 import edu.uci.ics.asterix.common.context.AsterixTreeRegistryProvider;
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
@@ -10,11 +9,8 @@
 import edu.uci.ics.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
 import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
 import edu.uci.ics.asterix.formats.nontagged.AqlTypeTraitProvider;
-import edu.uci.ics.asterix.metadata.declared.AqlCompiledDatasetDecl;
 import edu.uci.ics.asterix.metadata.declared.AqlCompiledIndexDecl;
 import edu.uci.ics.asterix.metadata.declared.AqlCompiledMetadataDeclarations;
-import edu.uci.ics.asterix.metadata.utils.DatasetUtils;
-import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.asterix.om.types.IAType;
 import edu.uci.ics.asterix.om.util.NonTaggedFormatUtil;
 import edu.uci.ics.asterix.translator.DmlTranslator.CompiledCreateIndexStatement;
@@ -53,35 +49,14 @@
     @Override
     public JobSpecification createJobSpec(CompiledCreateIndexStatement createIndexStmt,
             AqlCompiledMetadataDeclarations metadata) throws AsterixException, AlgebricksException {
+        init(createIndexStmt, metadata);
         JobSpecification spec = new JobSpecification();
-        String datasetName = createIndexStmt.getDatasetName();
-        String secondaryIndexName = createIndexStmt.getIndexName();
-        AqlCompiledDatasetDecl compiledDatasetDecl = metadata.findDataset(datasetName);
-        if (compiledDatasetDecl == null) {
-            throw new AlgebricksException("Unknown dataset " + datasetName);
-        }
-        if (compiledDatasetDecl.getDatasetType() == DatasetType.EXTERNAL) {
-            throw new AsterixException("Cannot index an external dataset (" + datasetName + ").");
-        }
-        ARecordType itemType = (ARecordType) metadata.findType(compiledDatasetDecl.getItemTypeName());        
-        ISerializerDeserializer payloadSerde = AqlSerializerDeserializerProvider.INSTANCE
-                .getSerializerDeserializer(itemType);
-        int numPrimaryKeys = DatasetUtils.getPartitioningFunctions(compiledDatasetDecl).size();
-
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndConstraint = metadata
-                .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, datasetName);
         
         // Create dummy key provider for feeding the primary index scan. 
         AbstractOperatorDescriptor keyProviderOp = createDummyKeyProviderOp(spec);
-        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(
-                spec, keyProviderOp, splitProviderAndConstraint.second);
         
         // Create primary index scan op.
-        BTreeSearchOperatorDescriptor primaryScanOp = createPrimaryIndexScanOp(
-                spec, metadata, compiledDatasetDecl, itemType, payloadSerde,
-                splitProviderAndConstraint.first);
-        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(
-                spec, keyProviderOp, splitProviderAndConstraint.second);
+        BTreeSearchOperatorDescriptor primaryScanOp = createPrimaryIndexScanOp(spec);
 
         // ---------- START ASSIGN OP
 
@@ -148,9 +123,8 @@
         AssignRuntimeFactory assign = new AssignRuntimeFactory(outColumns, evalFactories, projectionList);
         AlgebricksMetaOperatorDescriptor asterixAssignOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 1,
                 new IPushRuntimeFactory[] { assign }, new RecordDescriptor[] { secondaryRecDesc });
-
         AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, asterixAssignOp,
-                splitProviderAndConstraint.second);
+                primaryPartitionConstrant);
 
         // ---------- END ASSIGN OP