Merged asterix_stabilization r192:r194.

git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix-fix-issue-9@195 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/IndexOperations.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/IndexOperations.java
index c9cb798..b87cae3 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/IndexOperations.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/IndexOperations.java
@@ -47,6 +47,7 @@
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
 import edu.uci.ics.hyracks.dataflow.std.misc.ConstantTupleSourceOperatorDescriptor;
@@ -120,104 +121,108 @@
         return spec;
     }
 
+	public static AbstractOperatorDescriptor createDummyKeyProviderOp(
+			JobSpecification spec) throws AsterixException, AlgebricksException {
+		// Build dummy tuple containing one field with a dummy value inside.
+		ArrayTupleBuilder tb = new ArrayTupleBuilder(1);
+		DataOutput dos = tb.getDataOutput();
+		tb.reset();
+		try {
+			// Serialize dummy value into a field.
+			IntegerSerializerDeserializer.INSTANCE.serialize(0, dos);
+		} catch (HyracksDataException e) {
+			throw new AsterixException(e);
+		}
+		// Add dummy field.
+		tb.addFieldEndOffset();
+		ISerializerDeserializer[] keyRecDescSers = { IntegerSerializerDeserializer.INSTANCE };
+		RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
+		ConstantTupleSourceOperatorDescriptor keyProviderOp = new ConstantTupleSourceOperatorDescriptor(
+				spec, keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(),
+				tb.getSize());
+		return keyProviderOp;
+	}
+    
+	public static BTreeSearchOperatorDescriptor createPrimaryIndexScanOp(
+			JobSpecification spec, AqlCompiledMetadataDeclarations metadata,
+			AqlCompiledDatasetDecl compiledDatasetDecl, ARecordType itemType,
+			ISerializerDeserializer payloadSerde,
+			IFileSplitProvider splitProvider) throws AlgebricksException,
+			MetadataException {
+		int numPrimaryKeys = DatasetUtils.getPartitioningFunctions(
+				compiledDatasetDecl).size();
+		ISerializerDeserializer[] primaryRecFields = new ISerializerDeserializer[numPrimaryKeys + 1];		
+		ITypeTraits[] primaryTypeTraits = new ITypeTraits[numPrimaryKeys + 1];
+		IBinaryComparatorFactory[] primaryComparatorFactories = new IBinaryComparatorFactory[numPrimaryKeys];		
+		ISerializerDeserializerProvider serdeProvider = metadata.getFormat()
+				.getSerdeProvider();
+		List<Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType>> partitioningFunctions = DatasetUtils
+				.getPartitioningFunctions(compiledDatasetDecl);
+		int i = 0;
+		for (Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType> evalFactoryAndType : partitioningFunctions) {
+			IAType keyType = evalFactoryAndType.third;
+			ISerializerDeserializer keySerde = serdeProvider
+					.getSerializerDeserializer(keyType);
+			primaryRecFields[i] = keySerde;
+			primaryComparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE
+					.getBinaryComparatorFactory(keyType, OrderKind.ASC);
+			primaryTypeTraits[i] = AqlTypeTraitProvider.INSTANCE
+					.getTypeTrait(keyType);
+			++i;
+		}
+		primaryRecFields[numPrimaryKeys] = payloadSerde;
+		primaryTypeTraits[numPrimaryKeys] = AqlTypeTraitProvider.INSTANCE
+				.getTypeTrait(itemType);
+
+		// -Infinity
+		int[] lowKeyFields = null;
+		// +Infinity
+		int[] highKeyFields = null;
+		RecordDescriptor primaryRecDesc = new RecordDescriptor(primaryRecFields, primaryTypeTraits);
+
+		BTreeSearchOperatorDescriptor primarySearchOp = new BTreeSearchOperatorDescriptor(
+				spec, primaryRecDesc, AsterixStorageManagerInterface.INSTANCE,
+				AsterixTreeRegistryProvider.INSTANCE, splitProvider,
+				primaryTypeTraits, primaryComparatorFactories, lowKeyFields,
+				highKeyFields, true, true, new BTreeDataflowHelperFactory(),
+				NoOpOperationCallbackProvider.INSTANCE);
+		return primarySearchOp;
+	}
+    
+    
     @SuppressWarnings("unchecked")
     public static JobSpecification createBtreeIndexJobSpec(CompiledCreateIndexStatement createIndexStmt,
             AqlCompiledMetadataDeclarations metadata) throws AsterixException, AlgebricksException {
-
         JobSpecification spec = new JobSpecification();
-
         String datasetName = createIndexStmt.getDatasetName();
         String secondaryIndexName = createIndexStmt.getIndexName();
-
         AqlCompiledDatasetDecl compiledDatasetDecl = metadata.findDataset(datasetName);
         if (compiledDatasetDecl == null) {
             throw new AlgebricksException("Unknown dataset " + datasetName);
         }
-        ARecordType itemType = (ARecordType) metadata.findType(compiledDatasetDecl.getItemTypeName());
-        ISerializerDeserializer payloadSerde = AqlSerializerDeserializerProvider.INSTANCE
-                .getSerializerDeserializer(itemType);
-
         if (compiledDatasetDecl.getDatasetType() == DatasetType.EXTERNAL) {
             throw new AsterixException("Cannot index an external dataset (" + datasetName + ").");
         }
-
-        AqlCompiledDatasetDecl srcCompiledDatasetDecl = compiledDatasetDecl;
+        ARecordType itemType = (ARecordType) metadata.findType(compiledDatasetDecl.getItemTypeName());
+        ISerializerDeserializer payloadSerde = AqlSerializerDeserializerProvider.INSTANCE
+                .getSerializerDeserializer(itemType);
         int numPrimaryKeys = DatasetUtils.getPartitioningFunctions(compiledDatasetDecl).size();
 
-        // ---------- START GENERAL BTREE STUFF
-        IIndexRegistryProvider<IIndex> treeRegistryProvider = AsterixTreeRegistryProvider.INSTANCE;
-        IStorageManagerInterface storageManager = AsterixStorageManagerInterface.INSTANCE;
-
-        // ---------- END GENERAL BTREE STUFF
-
-        // ---------- START KEY PROVIDER OP
-
-        // TODO: should actually be empty tuple source
-        // build tuple containing low and high search keys
-        ArrayTupleBuilder tb = new ArrayTupleBuilder(1); // just one dummy field
-        DataOutput dos = tb.getDataOutput();
-
-        tb.reset();
-        try {
-            IntegerSerializerDeserializer.INSTANCE.serialize(0, dos);
-        } catch (HyracksDataException e) {
-            throw new AsterixException(e);
-        } // dummy field
-        tb.addFieldEndOffset();
-
-        ISerializerDeserializer[] keyRecDescSers = { IntegerSerializerDeserializer.INSTANCE };
-        RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
-
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> keyProviderSplitsAndConstraint = metadata
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndConstraint = metadata
                 .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, datasetName);
-
-        ConstantTupleSourceOperatorDescriptor keyProviderOp = new ConstantTupleSourceOperatorDescriptor(spec,
-                keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
-        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, keyProviderOp,
-                keyProviderSplitsAndConstraint.second);
-
-        // ---------- END KEY PROVIDER OP
-
-        // ---------- START PRIMARY INDEX SCAN
-
-        ISerializerDeserializer[] primaryRecFields = new ISerializerDeserializer[numPrimaryKeys + 1];
-        IBinaryComparatorFactory[] primaryComparatorFactories = new IBinaryComparatorFactory[numPrimaryKeys];
-        ITypeTraits[] primaryTypeTraits = new ITypeTraits[numPrimaryKeys + 1];
-        int i = 0;
-        ISerializerDeserializerProvider serdeProvider = metadata.getFormat().getSerdeProvider();
-        List<Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType>> partitioningFunctions = DatasetUtils
-                .getPartitioningFunctions(srcCompiledDatasetDecl);
-        for (Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType> evalFactoryAndType : partitioningFunctions) {
-            IAType keyType = evalFactoryAndType.third;
-            ISerializerDeserializer keySerde = serdeProvider.getSerializerDeserializer(keyType);
-            primaryRecFields[i] = keySerde;
-            primaryComparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
-                    keyType, OrderKind.ASC);
-            primaryTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
-            ++i;
-        }
-        primaryRecFields[numPrimaryKeys] = payloadSerde;
-        primaryTypeTraits[numPrimaryKeys] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(itemType);
-
-        int[] lowKeyFields = null; // -infinity
-        int[] highKeyFields = null; // +infinity
-        RecordDescriptor primaryRecDesc = new RecordDescriptor(primaryRecFields);
-
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primarySplitsAndConstraint = metadata
-                .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, datasetName);
-
-		BTreeSearchOperatorDescriptor primarySearchOp = new BTreeSearchOperatorDescriptor(
-				spec, primaryRecDesc, storageManager, treeRegistryProvider,
-				primarySplitsAndConstraint.first, primaryTypeTraits,
-				primaryComparatorFactories, lowKeyFields, highKeyFields, true,
-				true, new BTreeDataflowHelperFactory(),
-				NoOpOperationCallbackProvider.INSTANCE);
-
-        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, primarySearchOp,
-                primarySplitsAndConstraint.second);
-
-        // ---------- END PRIMARY INDEX SCAN
-
+        
+        // Create dummy key provider for feeding the primary index scan. 
+		AbstractOperatorDescriptor keyProviderOp = createDummyKeyProviderOp(spec);
+		AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(
+				spec, keyProviderOp, splitProviderAndConstraint.second);
+        
+        // Create primary index scan op.
+		BTreeSearchOperatorDescriptor primaryScanOp = createPrimaryIndexScanOp(
+				spec, metadata, compiledDatasetDecl, itemType, payloadSerde,
+				splitProviderAndConstraint.first);
+		AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(
+				spec, keyProviderOp, splitProviderAndConstraint.second);
+		
         // ---------- START ASSIGN OP
 
         List<String> secondaryKeyFields = createIndexStmt.getKeyFields();
@@ -227,7 +232,9 @@
         IBinaryComparatorFactory[] secondaryComparatorFactories = new IBinaryComparatorFactory[numSecondaryKeys
                 + numPrimaryKeys];
         ITypeTraits[] secondaryTypeTraits = new ITypeTraits[numSecondaryKeys + numPrimaryKeys];
-        for (i = 0; i < numSecondaryKeys; i++) {
+        ISerializerDeserializerProvider serdeProvider = metadata.getFormat()
+                .getSerdeProvider();
+        for (int i = 0; i < numSecondaryKeys; i++) {
             evalFactories[i] = metadata.getFormat().getFieldAccessEvaluatorFactory(itemType, secondaryKeyFields.get(i),
                     numPrimaryKeys);
             IAType keyType = AqlCompiledIndexDecl.keyFieldType(secondaryKeyFields.get(i), itemType);
@@ -237,24 +244,26 @@
                     keyType, OrderKind.ASC);
             secondaryTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
         }
-        // fill in serializers and comparators for primary index fields
-        for (i = 0; i < numPrimaryKeys; i++) {
-            secondaryRecFields[numSecondaryKeys + i] = primaryRecFields[i];
+        // Fill in serializers and comparators for primary index fields.
+        RecordDescriptor primaryRecDesc = primaryScanOp.getRecordDescriptor();
+        IBinaryComparatorFactory[] primaryComparatorFactories = primaryScanOp.getTreeIndexComparatorFactories();
+        for (int i = 0; i < numPrimaryKeys; i++) {
+            secondaryRecFields[numSecondaryKeys + i] = primaryRecDesc.getFields()[i];
+            secondaryTypeTraits[numSecondaryKeys + i] = primaryRecDesc.getTypeTraits()[i];
             secondaryComparatorFactories[numSecondaryKeys + i] = primaryComparatorFactories[i];
-            secondaryTypeTraits[numSecondaryKeys + i] = primaryTypeTraits[i];
         }
         RecordDescriptor secondaryRecDesc = new RecordDescriptor(secondaryRecFields);
 
         int[] outColumns = new int[numSecondaryKeys];
         int[] projectionList = new int[numSecondaryKeys + numPrimaryKeys];
-        for (i = 0; i < numSecondaryKeys; i++) {
+        for (int i = 0; i < numSecondaryKeys; i++) {
             outColumns[i] = numPrimaryKeys + i + 1;
         }
         int projCount = 0;
-        for (i = 0; i < numSecondaryKeys; i++) {
+        for (int i = 0; i < numSecondaryKeys; i++) {
             projectionList[projCount++] = numPrimaryKeys + i + 1;
         }
-        for (i = 0; i < numPrimaryKeys; i++) {
+        for (int i = 0; i < numPrimaryKeys; i++) {
             projectionList[projCount++] = i;
         }
 
@@ -273,7 +282,7 @@
         // ---------- START EXTERNAL SORT OP
 
         int[] sortFields = new int[numSecondaryKeys + numPrimaryKeys];
-        for (i = 0; i < numSecondaryKeys + numPrimaryKeys; i++)
+        for (int i = 0; i < numSecondaryKeys + numPrimaryKeys; i++)
             sortFields[i] = i;
 
         Pair<IFileSplitProvider, AlgebricksPartitionConstraint> sorterSplitsAndConstraint = metadata
@@ -291,7 +300,7 @@
         // ---------- START SECONDARY INDEX BULK LOAD
 
         int[] fieldPermutation = new int[numSecondaryKeys + numPrimaryKeys];
-        for (i = 0; i < numSecondaryKeys + numPrimaryKeys; i++)
+        for (int i = 0; i < numSecondaryKeys + numPrimaryKeys; i++)
             fieldPermutation[i] = i;
 
         Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint = metadata
@@ -299,7 +308,7 @@
 
         // GlobalConfig.DEFAULT_BTREE_FILL_FACTOR
 		TreeIndexBulkLoadOperatorDescriptor secondaryBulkLoadOp = new TreeIndexBulkLoadOperatorDescriptor(
-				spec, storageManager, treeRegistryProvider,
+				spec, AsterixStorageManagerInterface.INSTANCE, AsterixTreeRegistryProvider.INSTANCE,
 				secondarySplitsAndConstraint.first, secondaryTypeTraits,
 				secondaryComparatorFactories, fieldPermutation, 0.7f,
 				new BTreeDataflowHelperFactory(),
@@ -311,9 +320,9 @@
 
         // ---------- START CONNECT THE OPERATORS
 
-        spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primarySearchOp, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primaryScanOp, 0);
 
-        spec.connect(new OneToOneConnectorDescriptor(spec), primarySearchOp, 0, asterixAssignOp, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), primaryScanOp, 0, asterixAssignOp, 0);
 
         spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, sortOp, 0);