More cleaning.

git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix-fix-issue-9@196 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/IndexOperations.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/IndexOperations.java
index b87cae3..c0053fb 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/IndexOperations.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/IndexOperations.java
@@ -267,41 +267,34 @@
             projectionList[projCount++] = i;
         }
 
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> assignSplitsAndConstraint = metadata
-                .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, datasetName);
-
         AssignRuntimeFactory assign = new AssignRuntimeFactory(outColumns, evalFactories, projectionList);
         AlgebricksMetaOperatorDescriptor asterixAssignOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 1,
                 new IPushRuntimeFactory[] { assign }, new RecordDescriptor[] { secondaryRecDesc });
-
         AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, asterixAssignOp,
-                assignSplitsAndConstraint.second);
+                splitProviderAndConstraint.second);
 
         // ---------- END ASSIGN OP
 
         // ---------- START EXTERNAL SORT OP
 
         int[] sortFields = new int[numSecondaryKeys + numPrimaryKeys];
-        for (int i = 0; i < numSecondaryKeys + numPrimaryKeys; i++)
+        for (int i = 0; i < numSecondaryKeys + numPrimaryKeys; i++) {
             sortFields[i] = i;
-
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> sorterSplitsAndConstraint = metadata
-                .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, datasetName);
-
+        }
         ExternalSortOperatorDescriptor sortOp = new ExternalSortOperatorDescriptor(spec,
                 physicalOptimizationConfig.getMaxFramesExternalSort(), sortFields, secondaryComparatorFactories,
                 secondaryRecDesc);
-
         AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, sortOp,
-                sorterSplitsAndConstraint.second);
+                splitProviderAndConstraint.second);
 
         // ---------- END EXTERNAL SORT OP
 
         // ---------- START SECONDARY INDEX BULK LOAD
 
         int[] fieldPermutation = new int[numSecondaryKeys + numPrimaryKeys];
-        for (int i = 0; i < numSecondaryKeys + numPrimaryKeys; i++)
+        for (int i = 0; i < numSecondaryKeys + numPrimaryKeys; i++) {
             fieldPermutation[i] = i;
+        }
 
         Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint = metadata
                 .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, secondaryIndexName);
@@ -318,19 +311,12 @@
 
         // ---------- END SECONDARY INDEX BULK LOAD
 
-        // ---------- START CONNECT THE OPERATORS
-
+        // Connect the operators.
         spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primaryScanOp, 0);
-
         spec.connect(new OneToOneConnectorDescriptor(spec), primaryScanOp, 0, asterixAssignOp, 0);
-
         spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, sortOp, 0);
-
         spec.connect(new OneToOneConnectorDescriptor(spec), sortOp, 0, secondaryBulkLoadOp, 0);
-
         spec.addRoot(secondaryBulkLoadOp);
-
-        // ---------- END CONNECT THE OPERATORS
         spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
         return spec;
 
@@ -339,103 +325,35 @@
     @SuppressWarnings("unchecked")
     public static JobSpecification createRtreeIndexJobSpec(CompiledCreateIndexStatement createIndexStmt,
             AqlCompiledMetadataDeclarations metadata) throws AsterixException, AlgebricksException {
-
         JobSpecification spec = new JobSpecification();
-
-        String primaryIndexName = createIndexStmt.getDatasetName();
+        String datasetName = createIndexStmt.getDatasetName();
         String secondaryIndexName = createIndexStmt.getIndexName();
-
-        AqlCompiledDatasetDecl compiledDatasetDecl = metadata.findDataset(primaryIndexName);
+        AqlCompiledDatasetDecl compiledDatasetDecl = metadata.findDataset(datasetName);
         if (compiledDatasetDecl == null) {
-            throw new AsterixException("Could not find dataset " + primaryIndexName);
+            throw new AlgebricksException("Unknown dataset " + datasetName);
+        }
+        if (compiledDatasetDecl.getDatasetType() == DatasetType.EXTERNAL) {
+            throw new AsterixException("Cannot index an external dataset (" + datasetName + ").");
         }
         ARecordType itemType = (ARecordType) metadata.findType(compiledDatasetDecl.getItemTypeName());
-        ISerializerDeserializerProvider serdeProvider = metadata.getFormat().getSerdeProvider();
-        ISerializerDeserializer payloadSerde = serdeProvider.getSerializerDeserializer(itemType);
-
-        if (compiledDatasetDecl.getDatasetType() == DatasetType.EXTERNAL) {
-            throw new AsterixException("Cannot index an external dataset (" + primaryIndexName + ").");
-        }
+        ISerializerDeserializer payloadSerde = AqlSerializerDeserializerProvider.INSTANCE
+                .getSerializerDeserializer(itemType);
         int numPrimaryKeys = DatasetUtils.getPartitioningFunctions(compiledDatasetDecl).size();
 
-        // ---------- START GENERAL BTREE STUFF
-
-        IIndexRegistryProvider<IIndex> treeRegistryProvider = AsterixTreeRegistryProvider.INSTANCE;
-        IStorageManagerInterface storageManager = AsterixStorageManagerInterface.INSTANCE;
-
-        // ---------- END GENERAL BTREE STUFF
-
-        // ---------- START KEY PROVIDER OP
-
-        // TODO: should actually be empty tuple source
-        // build tuple containing low and high search keys
-        ArrayTupleBuilder tb = new ArrayTupleBuilder(1); // just one dummy field
-        DataOutput dos = tb.getDataOutput();
-
-        tb.reset();
-        try {
-            IntegerSerializerDeserializer.INSTANCE.serialize(0, dos);
-        } catch (HyracksDataException e) {
-            throw new AsterixException(e);
-        } // dummy field
-        tb.addFieldEndOffset();
-
-        ISerializerDeserializer[] keyRecDescSers = { IntegerSerializerDeserializer.INSTANCE };
-        RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
-
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> keyProviderSplitsAndConstraint = metadata
-                .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(primaryIndexName, primaryIndexName);
-
-        ConstantTupleSourceOperatorDescriptor keyProviderOp = new ConstantTupleSourceOperatorDescriptor(spec,
-                keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
-        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, keyProviderOp,
-                keyProviderSplitsAndConstraint.second);
-
-        // ---------- END KEY PROVIDER OP
-
-        // ---------- START PRIMARY INDEX SCAN
-
-        ISerializerDeserializer[] primaryRecFields = new ISerializerDeserializer[numPrimaryKeys + 1];
-        IBinaryComparatorFactory[] primaryComparatorFactories = new IBinaryComparatorFactory[numPrimaryKeys];
-        ITypeTraits[] primaryTypeTraits = new ITypeTraits[numPrimaryKeys + 1];
-        int i = 0;
-        serdeProvider = metadata.getFormat().getSerdeProvider();
-        for (Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType> evalFactoryAndType : DatasetUtils
-                .getPartitioningFunctions(compiledDatasetDecl)) {
-            IAType keyType = evalFactoryAndType.third;
-            ISerializerDeserializer keySerde = serdeProvider.getSerializerDeserializer(keyType);
-            primaryRecFields[i] = keySerde;
-            primaryComparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
-                    keyType, OrderKind.ASC);
-            primaryTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
-            ++i;
-        }
-        primaryRecFields[numPrimaryKeys] = payloadSerde;
-        primaryTypeTraits[numPrimaryKeys] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(itemType);
-
-        ITreeIndexFrameFactory primaryInteriorFrameFactory = AqlMetadataProvider
-                .createBTreeNSMInteriorFrameFactory(primaryTypeTraits);
-        ITreeIndexFrameFactory primaryLeafFrameFactory = AqlMetadataProvider
-                .createBTreeNSMLeafFrameFactory(primaryTypeTraits);
-
-        int[] lowKeyFields = null; // -infinity
-        int[] highKeyFields = null; // +infinity
-        RecordDescriptor primaryRecDesc = new RecordDescriptor(primaryRecFields);
-
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primarySplitsAndConstraint = metadata
-                .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(primaryIndexName, primaryIndexName);
-
-		BTreeSearchOperatorDescriptor primarySearchOp = new BTreeSearchOperatorDescriptor(
-				spec, primaryRecDesc, storageManager, treeRegistryProvider,
-				primarySplitsAndConstraint.first, primaryTypeTraits,
-				primaryComparatorFactories, lowKeyFields, highKeyFields, true,
-				true, new BTreeDataflowHelperFactory(),
-				NoOpOperationCallbackProvider.INSTANCE);
-
-        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, primarySearchOp,
-                primarySplitsAndConstraint.second);
-
-        // ---------- END PRIMARY INDEX SCAN
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndConstraint = metadata
+                .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, datasetName);
+        
+        // Create dummy key provider for feeding the primary index scan. 
+        AbstractOperatorDescriptor keyProviderOp = createDummyKeyProviderOp(spec);
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(
+                spec, keyProviderOp, splitProviderAndConstraint.second);
+        
+        // Create primary index scan op.
+        BTreeSearchOperatorDescriptor primaryScanOp = createPrimaryIndexScanOp(
+                spec, metadata, compiledDatasetDecl, itemType, payloadSerde,
+                splitProviderAndConstraint.first);
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(
+                spec, keyProviderOp, splitProviderAndConstraint.second);
 
         // ---------- START ASSIGN OP
 
@@ -468,7 +386,7 @@
 
         IAType keyType = AqlCompiledIndexDecl.keyFieldType(secondaryKeyFields.get(0), itemType);
         IAType nestedKeyType = NonTaggedFormatUtil.getNestedSpatialType(keyType.getTypeTag());
-        for (i = 0; i < numNestedSecondaryKeyFields; i++) {
+        for (int i = 0; i < numNestedSecondaryKeyFields; i++) {
             ISerializerDeserializer keySerde = AqlSerializerDeserializerProvider.INSTANCE
                     .getSerializerDeserializer(nestedKeyType);
             secondaryRecFields[i] = keySerde;
@@ -478,49 +396,49 @@
             valueProviderFactories[i] = AqlPrimitiveValueProviderFactory.INSTANCE;
         }
 
-        // fill in serializers and comparators for primary index fields
-        for (i = 0; i < numPrimaryKeys; i++) {
-            secondaryRecFields[numNestedSecondaryKeyFields + i] = primaryRecFields[i];
-            secondaryTypeTraits[numNestedSecondaryKeyFields + i] = primaryTypeTraits[i];
+        // Fill in serializers and comparators for primary index fields.
+        RecordDescriptor primaryRecDesc = primaryScanOp.getRecordDescriptor();
+        IBinaryComparatorFactory[] primaryComparatorFactories = primaryScanOp.getTreeIndexComparatorFactories();
+        for (int i = 0; i < numPrimaryKeys; i++) {
+            secondaryRecFields[numNestedSecondaryKeyFields + i] = primaryRecDesc.getFields()[i];
+            secondaryTypeTraits[numNestedSecondaryKeyFields + i] = primaryRecDesc.getTypeTraits()[i];
         }
         RecordDescriptor secondaryRecDesc = new RecordDescriptor(secondaryRecFields);
 
         int[] outColumns = new int[numNestedSecondaryKeyFields];
         int[] projectionList = new int[numNestedSecondaryKeyFields + numPrimaryKeys];
-        for (i = 0; i < numNestedSecondaryKeyFields; i++) {
+        for (int i = 0; i < numNestedSecondaryKeyFields; i++) {
             outColumns[i] = numPrimaryKeys + i + 1;
         }
         int projCount = 0;
-        for (i = 0; i < numNestedSecondaryKeyFields; i++) {
+        for (int i = 0; i < numNestedSecondaryKeyFields; i++) {
             projectionList[projCount++] = numPrimaryKeys + i + 1;
         }
-        for (i = 0; i < numPrimaryKeys; i++) {
+        for (int i = 0; i < numPrimaryKeys; i++) {
             projectionList[projCount++] = i;
         }
 
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> assignSplitsAndConstraint = metadata
-                .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(primaryIndexName, primaryIndexName);
-
         AssignRuntimeFactory assign = new AssignRuntimeFactory(outColumns, evalFactories, projectionList);
         AlgebricksMetaOperatorDescriptor asterixAssignOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 1,
                 new IPushRuntimeFactory[] { assign }, new RecordDescriptor[] { secondaryRecDesc });
 
         AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, asterixAssignOp,
-                assignSplitsAndConstraint.second);
+                splitProviderAndConstraint.second);
 
         // ---------- END ASSIGN OP
 
         // ---------- START SECONDARY INDEX BULK LOAD
 
         int[] fieldPermutation = new int[numNestedSecondaryKeyFields + numPrimaryKeys];
-        for (i = 0; i < numNestedSecondaryKeyFields + numPrimaryKeys; i++)
+        for (int i = 0; i < numNestedSecondaryKeyFields + numPrimaryKeys; i++) {
             fieldPermutation[i] = i;
+        }
 
         Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint = metadata
-                .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(primaryIndexName, secondaryIndexName);
+                .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, secondaryIndexName);
 
 		TreeIndexBulkLoadOperatorDescriptor secondaryBulkLoadOp = new TreeIndexBulkLoadOperatorDescriptor(
-				spec, storageManager, treeRegistryProvider,
+				spec, AsterixStorageManagerInterface.INSTANCE, AsterixTreeRegistryProvider.INSTANCE,
 				secondarySplitsAndConstraint.first, secondaryTypeTraits,
 				secondaryComparatorFactories, fieldPermutation, 0.7f,
 				new RTreeDataflowHelperFactory(valueProviderFactories),
@@ -531,17 +449,11 @@
 
         // ---------- END SECONDARY INDEX BULK LOAD
 
-        // ---------- START CONNECT THE OPERATORS
-
-        spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primarySearchOp, 0);
-
-        spec.connect(new OneToOneConnectorDescriptor(spec), primarySearchOp, 0, asterixAssignOp, 0);
-
+        // Connect the operators.
+        spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primaryScanOp, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), primaryScanOp, 0, asterixAssignOp, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, secondaryBulkLoadOp, 0);
-
         spec.addRoot(secondaryBulkLoadOp);
-
-        // ---------- END CONNECT THE OPERATORS
         spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
         return spec;