Addressed code review comments.
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java
index d1666a7..0e058b0 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java
@@ -80,362 +80,303 @@
 // possible index job specs,
 // not just for creation.
 public abstract class SecondaryIndexCreator {
-	protected final PhysicalOptimizationConfig physOptConf;
+    protected final PhysicalOptimizationConfig physOptConf;
 
-	protected int numPrimaryKeys;
-	protected int numSecondaryKeys;
-	protected AqlMetadataProvider metadataProvider;
-	protected String dataverseName;
-	protected String datasetName;
-	protected Dataset dataset;
-	protected ARecordType itemType;
-	protected ISerializerDeserializer payloadSerde;
-	protected IFileSplitProvider primaryFileSplitProvider;
-	protected AlgebricksPartitionConstraint primaryPartitionConstraint;
-	protected IFileSplitProvider secondaryFileSplitProvider;
-	protected AlgebricksPartitionConstraint secondaryPartitionConstraint;
-	protected String secondaryIndexName;
-	protected boolean anySecondaryKeyIsNullable = false;
+    protected int numPrimaryKeys;
+    protected int numSecondaryKeys;
+    protected AqlMetadataProvider metadataProvider;
+    protected String dataverseName;
+    protected String datasetName;
+    protected Dataset dataset;
+    protected ARecordType itemType;
+    protected ISerializerDeserializer payloadSerde;
+    protected IFileSplitProvider primaryFileSplitProvider;
+    protected AlgebricksPartitionConstraint primaryPartitionConstraint;
+    protected IFileSplitProvider secondaryFileSplitProvider;
+    protected AlgebricksPartitionConstraint secondaryPartitionConstraint;
+    protected String secondaryIndexName;
+    protected boolean anySecondaryKeyIsNullable = false;
 
-	protected long numElementsHint;
-	protected IBinaryComparatorFactory[] primaryComparatorFactories;
-	protected int[] primaryBloomFilterKeyFields;
-	protected RecordDescriptor primaryRecDesc;
-	protected IBinaryComparatorFactory[] secondaryComparatorFactories;
-	protected int[] secondaryBloomFilterKeyFields;
-	protected RecordDescriptor secondaryRecDesc;
-	protected ICopyEvaluatorFactory[] secondaryFieldAccessEvalFactories;
+    protected long numElementsHint;
+    protected IBinaryComparatorFactory[] primaryComparatorFactories;
+    protected int[] primaryBloomFilterKeyFields;
+    protected RecordDescriptor primaryRecDesc;
+    protected IBinaryComparatorFactory[] secondaryComparatorFactories;
+    protected int[] secondaryBloomFilterKeyFields;
+    protected RecordDescriptor secondaryRecDesc;
+    protected ICopyEvaluatorFactory[] secondaryFieldAccessEvalFactories;
 
-	protected IAsterixPropertiesProvider propertiesProvider;
+    protected IAsterixPropertiesProvider propertiesProvider;
 
-	// Prevent public construction. Should be created via createIndexCreator().
-	protected SecondaryIndexCreator(PhysicalOptimizationConfig physOptConf,
-			IAsterixPropertiesProvider propertiesProvider) {
-		this.physOptConf = physOptConf;
-		this.propertiesProvider = propertiesProvider;
-	}
+    // Prevent public construction. Should be created via createIndexCreator().
+    protected SecondaryIndexCreator(PhysicalOptimizationConfig physOptConf,
+            IAsterixPropertiesProvider propertiesProvider) {
+        this.physOptConf = physOptConf;
+        this.propertiesProvider = propertiesProvider;
+    }
 
-	public static SecondaryIndexCreator createIndexCreator(
-			CompiledCreateIndexStatement createIndexStmt,
-			AqlMetadataProvider metadataProvider,
-			PhysicalOptimizationConfig physOptConf) throws AsterixException,
-			AlgebricksException {
-		IAsterixPropertiesProvider asterixPropertiesProvider = AsterixAppContextInfo
-				.getInstance();
-		SecondaryIndexCreator indexCreator = null;
-		switch (createIndexStmt.getIndexType()) {
-		case BTREE: {
-			indexCreator = new SecondaryBTreeCreator(physOptConf,
-					asterixPropertiesProvider);
-			break;
-		}
-		case RTREE: {
-			indexCreator = new SecondaryRTreeCreator(physOptConf,
-					asterixPropertiesProvider);
-			break;
-		}
-		case WORD_INVIX:
-		case NGRAM_INVIX:
-		case FUZZY_WORD_INVIX:
-		case FUZZY_NGRAM_INVIX: {
-			indexCreator = new SecondaryInvertedIndexCreator(physOptConf,
-					asterixPropertiesProvider);
-			break;
-		}
-		default: {
-			throw new AsterixException("Unknown Index Type: "
-					+ createIndexStmt.getIndexType());
-		}
-		}
-		indexCreator.init(createIndexStmt, metadataProvider);
-		return indexCreator;
-	}
+    public static SecondaryIndexCreator createIndexCreator(CompiledCreateIndexStatement createIndexStmt,
+            AqlMetadataProvider metadataProvider, PhysicalOptimizationConfig physOptConf) throws AsterixException,
+            AlgebricksException {
+        IAsterixPropertiesProvider asterixPropertiesProvider = AsterixAppContextInfo.getInstance();
+        SecondaryIndexCreator indexCreator = null;
+        switch (createIndexStmt.getIndexType()) {
+            case BTREE: {
+                indexCreator = new SecondaryBTreeCreator(physOptConf, asterixPropertiesProvider);
+                break;
+            }
+            case RTREE: {
+                indexCreator = new SecondaryRTreeCreator(physOptConf, asterixPropertiesProvider);
+                break;
+            }
+            case WORD_INVIX:
+            case NGRAM_INVIX:
+            case FUZZY_WORD_INVIX:
+            case FUZZY_NGRAM_INVIX: {
+                indexCreator = new SecondaryInvertedIndexCreator(physOptConf, asterixPropertiesProvider);
+                break;
+            }
+            default: {
+                throw new AsterixException("Unknown Index Type: " + createIndexStmt.getIndexType());
+            }
+        }
+        indexCreator.init(createIndexStmt, metadataProvider);
+        return indexCreator;
+    }
 
-	public abstract JobSpecification buildCreationJobSpec()
-			throws AsterixException, AlgebricksException;
+    public abstract JobSpecification buildCreationJobSpec() throws AsterixException, AlgebricksException;
 
-	public abstract JobSpecification buildLoadingJobSpec()
-			throws AsterixException, AlgebricksException;
+    public abstract JobSpecification buildLoadingJobSpec() throws AsterixException, AlgebricksException;
 
-	protected void init(CompiledCreateIndexStatement createIndexStmt,
-			AqlMetadataProvider metadataProvider) throws AsterixException,
-			AlgebricksException {
-		this.metadataProvider = metadataProvider;
-		dataverseName = createIndexStmt.getDataverseName() == null ? metadataProvider
-				.getDefaultDataverseName() : createIndexStmt.getDataverseName();
-		datasetName = createIndexStmt.getDatasetName();
-		secondaryIndexName = createIndexStmt.getIndexName();
-		dataset = metadataProvider.findDataset(dataverseName, datasetName);
-		if (dataset == null) {
-			throw new AsterixException("Unknown dataset " + datasetName);
-		}
-		if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
-			throw new AsterixException("Cannot index an external dataset ("
-					+ datasetName + ").");
-		}
-		itemType = (ARecordType) metadataProvider.findType(
-				dataset.getDataverseName(), dataset.getItemTypeName());
-		payloadSerde = AqlSerializerDeserializerProvider.INSTANCE
-				.getSerializerDeserializer(itemType);
-		numPrimaryKeys = DatasetUtils.getPartitioningKeys(dataset).size();
-		numSecondaryKeys = createIndexStmt.getKeyFields().size();
-		Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primarySplitsAndConstraint = metadataProvider
-				.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(
-						dataverseName, datasetName, datasetName);
-		primaryFileSplitProvider = primarySplitsAndConstraint.first;
-		primaryPartitionConstraint = primarySplitsAndConstraint.second;
-		Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint = metadataProvider
-				.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(
-						dataverseName, datasetName, secondaryIndexName);
-		secondaryFileSplitProvider = secondarySplitsAndConstraint.first;
-		secondaryPartitionConstraint = secondarySplitsAndConstraint.second;
-		// Must be called in this order.
-		setPrimaryRecDescAndComparators();
-		setSecondaryRecDescAndComparators(createIndexStmt, metadataProvider);
+    protected void init(CompiledCreateIndexStatement createIndexStmt, AqlMetadataProvider metadataProvider)
+            throws AsterixException, AlgebricksException {
+        this.metadataProvider = metadataProvider;
+        dataverseName = createIndexStmt.getDataverseName() == null ? metadataProvider.getDefaultDataverseName()
+                : createIndexStmt.getDataverseName();
+        datasetName = createIndexStmt.getDatasetName();
+        secondaryIndexName = createIndexStmt.getIndexName();
+        dataset = metadataProvider.findDataset(dataverseName, datasetName);
+        if (dataset == null) {
+            throw new AsterixException("Unknown dataset " + datasetName);
+        }
+        if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
+            throw new AsterixException("Cannot index an external dataset (" + datasetName + ").");
+        }
+        itemType = (ARecordType) metadataProvider.findType(dataset.getDataverseName(), dataset.getItemTypeName());
+        payloadSerde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(itemType);
+        numPrimaryKeys = DatasetUtils.getPartitioningKeys(dataset).size();
+        numSecondaryKeys = createIndexStmt.getKeyFields().size();
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primarySplitsAndConstraint = metadataProvider
+                .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(dataverseName, datasetName, datasetName);
+        primaryFileSplitProvider = primarySplitsAndConstraint.first;
+        primaryPartitionConstraint = primarySplitsAndConstraint.second;
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint = metadataProvider
+                .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(dataverseName, datasetName,
+                        secondaryIndexName);
+        secondaryFileSplitProvider = secondarySplitsAndConstraint.first;
+        secondaryPartitionConstraint = secondarySplitsAndConstraint.second;
+        // Must be called in this order.
+        setPrimaryRecDescAndComparators();
+        setSecondaryRecDescAndComparators(createIndexStmt, metadataProvider);
 
-		String numElementsHintString = dataset.getHints().get("CARDINALITY");
-		if (numElementsHintString == null) {
-			numElementsHint = DatasetCardinalityHint.DEFAULT;
-		} else {
-			numElementsHint = Long.parseLong(dataset.getHints().get(
-					"CARDINALITY"));
-		}
-	}
+        String numElementsHintString = dataset.getHints().get("CARDINALITY");
+        if (numElementsHintString == null) {
+            numElementsHint = DatasetCardinalityHint.DEFAULT;
+        } else {
+            numElementsHint = Long.parseLong(dataset.getHints().get("CARDINALITY"));
+        }
+    }
 
-	protected void setPrimaryRecDescAndComparators() throws AlgebricksException {
-		List<String> partitioningKeys = DatasetUtils
-				.getPartitioningKeys(dataset);
-		int numPrimaryKeys = partitioningKeys.size();
-		ISerializerDeserializer[] primaryRecFields = new ISerializerDeserializer[numPrimaryKeys + 1];
-		ITypeTraits[] primaryTypeTraits = new ITypeTraits[numPrimaryKeys + 1];
-		primaryComparatorFactories = new IBinaryComparatorFactory[numPrimaryKeys];
-		primaryBloomFilterKeyFields = new int[numPrimaryKeys];
-		ISerializerDeserializerProvider serdeProvider = metadataProvider
-				.getFormat().getSerdeProvider();
-		for (int i = 0; i < numPrimaryKeys; i++) {
-			IAType keyType;
-			try {
-				keyType = itemType.getFieldType(partitioningKeys.get(i));
-			} catch (IOException e) {
-				throw new AlgebricksException(e);
-			}
-			primaryRecFields[i] = serdeProvider
-					.getSerializerDeserializer(keyType);
-			primaryComparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE
-					.getBinaryComparatorFactory(keyType, true);
-			primaryTypeTraits[i] = AqlTypeTraitProvider.INSTANCE
-					.getTypeTrait(keyType);
-			primaryBloomFilterKeyFields[i] = i;
-		}
-		primaryRecFields[numPrimaryKeys] = payloadSerde;
-		primaryTypeTraits[numPrimaryKeys] = AqlTypeTraitProvider.INSTANCE
-				.getTypeTrait(itemType);
-		primaryRecDesc = new RecordDescriptor(primaryRecFields,
-				primaryTypeTraits);
-	}
+    protected void setPrimaryRecDescAndComparators() throws AlgebricksException {
+        List<String> partitioningKeys = DatasetUtils.getPartitioningKeys(dataset);
+        int numPrimaryKeys = partitioningKeys.size();
+        ISerializerDeserializer[] primaryRecFields = new ISerializerDeserializer[numPrimaryKeys + 1];
+        ITypeTraits[] primaryTypeTraits = new ITypeTraits[numPrimaryKeys + 1];
+        primaryComparatorFactories = new IBinaryComparatorFactory[numPrimaryKeys];
+        primaryBloomFilterKeyFields = new int[numPrimaryKeys];
+        ISerializerDeserializerProvider serdeProvider = metadataProvider.getFormat().getSerdeProvider();
+        for (int i = 0; i < numPrimaryKeys; i++) {
+            IAType keyType;
+            try {
+                keyType = itemType.getFieldType(partitioningKeys.get(i));
+            } catch (IOException e) {
+                throw new AlgebricksException(e);
+            }
+            primaryRecFields[i] = serdeProvider.getSerializerDeserializer(keyType);
+            primaryComparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
+                    keyType, true);
+            primaryTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
+            primaryBloomFilterKeyFields[i] = i;
+        }
+        primaryRecFields[numPrimaryKeys] = payloadSerde;
+        primaryTypeTraits[numPrimaryKeys] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(itemType);
+        primaryRecDesc = new RecordDescriptor(primaryRecFields, primaryTypeTraits);
+    }
 
-	protected void setSecondaryRecDescAndComparators(
-			CompiledCreateIndexStatement createIndexStmt,
-			AqlMetadataProvider metadataProvider) throws AlgebricksException,
-			AsterixException {
-		List<String> secondaryKeyFields = createIndexStmt.getKeyFields();
-		secondaryFieldAccessEvalFactories = new ICopyEvaluatorFactory[numSecondaryKeys];
-		secondaryComparatorFactories = new IBinaryComparatorFactory[numSecondaryKeys
-				+ numPrimaryKeys];
-		secondaryBloomFilterKeyFields = new int[numSecondaryKeys];
-		ISerializerDeserializer[] secondaryRecFields = new ISerializerDeserializer[numPrimaryKeys
-				+ numSecondaryKeys];
-		ITypeTraits[] secondaryTypeTraits = new ITypeTraits[numSecondaryKeys
-				+ numPrimaryKeys];
-		ISerializerDeserializerProvider serdeProvider = metadataProvider
-				.getFormat().getSerdeProvider();
-		ITypeTraitProvider typeTraitProvider = metadataProvider.getFormat()
-				.getTypeTraitProvider();
-		IBinaryComparatorFactoryProvider comparatorFactoryProvider = metadataProvider
-				.getFormat().getBinaryComparatorFactoryProvider();
-		for (int i = 0; i < numSecondaryKeys; i++) {
-			secondaryFieldAccessEvalFactories[i] = metadataProvider.getFormat()
-					.getFieldAccessEvaluatorFactory(itemType,
-							secondaryKeyFields.get(i), numPrimaryKeys);
-			Pair<IAType, Boolean> keyTypePair = Index
-					.getNonNullableKeyFieldType(secondaryKeyFields.get(i),
-							itemType);
-			IAType keyType = keyTypePair.first;
-			anySecondaryKeyIsNullable = anySecondaryKeyIsNullable
-					|| keyTypePair.second;
-			ISerializerDeserializer keySerde = serdeProvider
-					.getSerializerDeserializer(keyType);
-			secondaryRecFields[i] = keySerde;
-			secondaryComparatorFactories[i] = comparatorFactoryProvider
-					.getBinaryComparatorFactory(keyType, true);
-			secondaryTypeTraits[i] = typeTraitProvider.getTypeTrait(keyType);
-			secondaryBloomFilterKeyFields[i] = i;
-		}
-		// Add serializers and comparators for primary index fields.
-		for (int i = 0; i < numPrimaryKeys; i++) {
-			secondaryRecFields[numSecondaryKeys + i] = primaryRecDesc
-					.getFields()[i];
-			secondaryTypeTraits[numSecondaryKeys + i] = primaryRecDesc
-					.getTypeTraits()[i];
-			secondaryComparatorFactories[numSecondaryKeys + i] = primaryComparatorFactories[i];
-		}
-		secondaryRecDesc = new RecordDescriptor(secondaryRecFields,
-				secondaryTypeTraits);
-	}
+    protected void setSecondaryRecDescAndComparators(CompiledCreateIndexStatement createIndexStmt,
+            AqlMetadataProvider metadataProvider) throws AlgebricksException, AsterixException {
+        List<String> secondaryKeyFields = createIndexStmt.getKeyFields();
+        secondaryFieldAccessEvalFactories = new ICopyEvaluatorFactory[numSecondaryKeys];
+        secondaryComparatorFactories = new IBinaryComparatorFactory[numSecondaryKeys + numPrimaryKeys];
+        secondaryBloomFilterKeyFields = new int[numSecondaryKeys];
+        ISerializerDeserializer[] secondaryRecFields = new ISerializerDeserializer[numPrimaryKeys + numSecondaryKeys];
+        ITypeTraits[] secondaryTypeTraits = new ITypeTraits[numSecondaryKeys + numPrimaryKeys];
+        ISerializerDeserializerProvider serdeProvider = metadataProvider.getFormat().getSerdeProvider();
+        ITypeTraitProvider typeTraitProvider = metadataProvider.getFormat().getTypeTraitProvider();
+        IBinaryComparatorFactoryProvider comparatorFactoryProvider = metadataProvider.getFormat()
+                .getBinaryComparatorFactoryProvider();
+        for (int i = 0; i < numSecondaryKeys; i++) {
+            secondaryFieldAccessEvalFactories[i] = metadataProvider.getFormat().getFieldAccessEvaluatorFactory(
+                    itemType, secondaryKeyFields.get(i), numPrimaryKeys);
+            Pair<IAType, Boolean> keyTypePair = Index.getNonNullableKeyFieldType(secondaryKeyFields.get(i), itemType);
+            IAType keyType = keyTypePair.first;
+            anySecondaryKeyIsNullable = anySecondaryKeyIsNullable || keyTypePair.second;
+            ISerializerDeserializer keySerde = serdeProvider.getSerializerDeserializer(keyType);
+            secondaryRecFields[i] = keySerde;
+            secondaryComparatorFactories[i] = comparatorFactoryProvider.getBinaryComparatorFactory(keyType, true);
+            secondaryTypeTraits[i] = typeTraitProvider.getTypeTrait(keyType);
+            secondaryBloomFilterKeyFields[i] = i;
+        }
+        // Add serializers and comparators for primary index fields.
+        for (int i = 0; i < numPrimaryKeys; i++) {
+            secondaryRecFields[numSecondaryKeys + i] = primaryRecDesc.getFields()[i];
+            secondaryTypeTraits[numSecondaryKeys + i] = primaryRecDesc.getTypeTraits()[i];
+            secondaryComparatorFactories[numSecondaryKeys + i] = primaryComparatorFactories[i];
+        }
+        secondaryRecDesc = new RecordDescriptor(secondaryRecFields, secondaryTypeTraits);
+    }
 
-	protected AbstractOperatorDescriptor createDummyKeyProviderOp(
-			JobSpecification spec) throws AsterixException, AlgebricksException {
-		// Build dummy tuple containing one field with a dummy value inside.
-		ArrayTupleBuilder tb = new ArrayTupleBuilder(1);
-		DataOutput dos = tb.getDataOutput();
-		tb.reset();
-		try {
-			// Serialize dummy value into a field.
-			IntegerSerializerDeserializer.INSTANCE.serialize(0, dos);
-		} catch (HyracksDataException e) {
-			throw new AsterixException(e);
-		}
-		// Add dummy field.
-		tb.addFieldEndOffset();
-		ISerializerDeserializer[] keyRecDescSers = { IntegerSerializerDeserializer.INSTANCE };
-		RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
-		ConstantTupleSourceOperatorDescriptor keyProviderOp = new ConstantTupleSourceOperatorDescriptor(
-				spec, keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(),
-				tb.getSize());
-		AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(
-				spec, keyProviderOp, primaryPartitionConstraint);
-		return keyProviderOp;
-	}
+    protected AbstractOperatorDescriptor createDummyKeyProviderOp(JobSpecification spec) throws AsterixException,
+            AlgebricksException {
+        // Build dummy tuple containing one field with a dummy value inside.
+        ArrayTupleBuilder tb = new ArrayTupleBuilder(1);
+        DataOutput dos = tb.getDataOutput();
+        tb.reset();
+        try {
+            // Serialize dummy value into a field.
+            IntegerSerializerDeserializer.INSTANCE.serialize(0, dos);
+        } catch (HyracksDataException e) {
+            throw new AsterixException(e);
+        }
+        // Add dummy field.
+        tb.addFieldEndOffset();
+        ISerializerDeserializer[] keyRecDescSers = { IntegerSerializerDeserializer.INSTANCE };
+        RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
+        ConstantTupleSourceOperatorDescriptor keyProviderOp = new ConstantTupleSourceOperatorDescriptor(spec,
+                keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, keyProviderOp,
+                primaryPartitionConstraint);
+        return keyProviderOp;
+    }
 
-	protected BTreeSearchOperatorDescriptor createPrimaryIndexScanOp(
-			JobSpecification spec) throws AlgebricksException {
-		// -Infinity
-		int[] lowKeyFields = null;
-		// +Infinity
-		int[] highKeyFields = null;
-		AsterixStorageProperties storageProperties = propertiesProvider
-				.getStorageProperties();
-		BTreeSearchOperatorDescriptor primarySearchOp = new BTreeSearchOperatorDescriptor(
-				spec, primaryRecDesc,
-				AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER,
-				AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER,
-				primaryFileSplitProvider, primaryRecDesc.getTypeTraits(),
-				primaryComparatorFactories, primaryBloomFilterKeyFields,
-				lowKeyFields, highKeyFields, true, true,
-				new LSMBTreeDataflowHelperFactory(
-						AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
-						AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
-						AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
-						AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
-						storageProperties.getMemoryComponentPageSize(),
-						storageProperties.getMemoryComponentNumPages(),
-						storageProperties.getBloomFilterFalsePositiveRate()),
-				false, NoOpOperationCallbackFactory.INSTANCE);
-		AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(
-				spec, primarySearchOp, primaryPartitionConstraint);
-		return primarySearchOp;
-	}
+    protected BTreeSearchOperatorDescriptor createPrimaryIndexScanOp(JobSpecification spec) throws AlgebricksException {
+        // -Infinity
+        int[] lowKeyFields = null;
+        // +Infinity
+        int[] highKeyFields = null;
+        AsterixStorageProperties storageProperties = propertiesProvider.getStorageProperties();
+        BTreeSearchOperatorDescriptor primarySearchOp = new BTreeSearchOperatorDescriptor(spec, primaryRecDesc,
+                AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER, AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER,
+                primaryFileSplitProvider, primaryRecDesc.getTypeTraits(), primaryComparatorFactories,
+                primaryBloomFilterKeyFields, lowKeyFields, highKeyFields, true, true,
+                new LSMBTreeDataflowHelperFactory(AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
+                        AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
+                        AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
+                        AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER, storageProperties
+                                .getMemoryComponentPageSize(), storageProperties.getMemoryComponentNumPages(),
+                        storageProperties.getBloomFilterFalsePositiveRate()), false,
+                NoOpOperationCallbackFactory.INSTANCE);
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, primarySearchOp,
+                primaryPartitionConstraint);
+        return primarySearchOp;
+    }
 
-	protected AlgebricksMetaOperatorDescriptor createAssignOp(
-			JobSpecification spec, BTreeSearchOperatorDescriptor primaryScanOp,
-			int numSecondaryKeyFields) throws AlgebricksException {
-		int[] outColumns = new int[numSecondaryKeyFields];
-		int[] projectionList = new int[numSecondaryKeyFields + numPrimaryKeys];
-		for (int i = 0; i < numSecondaryKeyFields; i++) {
-			outColumns[i] = numPrimaryKeys + i + 1;
-		}
-		int projCount = 0;
-		for (int i = 0; i < numSecondaryKeyFields; i++) {
-			projectionList[projCount++] = numPrimaryKeys + i + 1;
-		}
-		for (int i = 0; i < numPrimaryKeys; i++) {
-			projectionList[projCount++] = i;
-		}
-		IScalarEvaluatorFactory[] sefs = new IScalarEvaluatorFactory[secondaryFieldAccessEvalFactories.length];
-		for (int i = 0; i < secondaryFieldAccessEvalFactories.length; ++i) {
-			sefs[i] = new LogicalExpressionJobGenToExpressionRuntimeProviderAdapter.ScalarEvaluatorFactoryAdapter(
-					secondaryFieldAccessEvalFactories[i]);
-		}
-		AssignRuntimeFactory assign = new AssignRuntimeFactory(outColumns,
-				sefs, projectionList);
-		AlgebricksMetaOperatorDescriptor asterixAssignOp = new AlgebricksMetaOperatorDescriptor(
-				spec, 1, 1, new IPushRuntimeFactory[] { assign },
-				new RecordDescriptor[] { secondaryRecDesc });
-		AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(
-				spec, asterixAssignOp, primaryPartitionConstraint);
-		return asterixAssignOp;
-	}
+    protected AlgebricksMetaOperatorDescriptor createAssignOp(JobSpecification spec,
+            BTreeSearchOperatorDescriptor primaryScanOp, int numSecondaryKeyFields) throws AlgebricksException {
+        int[] outColumns = new int[numSecondaryKeyFields];
+        int[] projectionList = new int[numSecondaryKeyFields + numPrimaryKeys];
+        for (int i = 0; i < numSecondaryKeyFields; i++) {
+            outColumns[i] = numPrimaryKeys + i + 1;
+        }
+        int projCount = 0;
+        for (int i = 0; i < numSecondaryKeyFields; i++) {
+            projectionList[projCount++] = numPrimaryKeys + i + 1;
+        }
+        for (int i = 0; i < numPrimaryKeys; i++) {
+            projectionList[projCount++] = i;
+        }
+        IScalarEvaluatorFactory[] sefs = new IScalarEvaluatorFactory[secondaryFieldAccessEvalFactories.length];
+        for (int i = 0; i < secondaryFieldAccessEvalFactories.length; ++i) {
+            sefs[i] = new LogicalExpressionJobGenToExpressionRuntimeProviderAdapter.ScalarEvaluatorFactoryAdapter(
+                    secondaryFieldAccessEvalFactories[i]);
+        }
+        AssignRuntimeFactory assign = new AssignRuntimeFactory(outColumns, sefs, projectionList);
+        AlgebricksMetaOperatorDescriptor asterixAssignOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 1,
+                new IPushRuntimeFactory[] { assign }, new RecordDescriptor[] { secondaryRecDesc });
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, asterixAssignOp,
+                primaryPartitionConstraint);
+        return asterixAssignOp;
+    }
 
-	protected ExternalSortOperatorDescriptor createSortOp(
-			JobSpecification spec,
-			IBinaryComparatorFactory[] secondaryComparatorFactories,
-			RecordDescriptor secondaryRecDesc) {
-		int[] sortFields = new int[secondaryComparatorFactories.length];
-		for (int i = 0; i < secondaryComparatorFactories.length; i++) {
-			sortFields[i] = i;
-		}
-		ExternalSortOperatorDescriptor sortOp = new ExternalSortOperatorDescriptor(
-				spec, physOptConf.getMaxFramesExternalSort(), sortFields,
-				secondaryComparatorFactories, secondaryRecDesc);
-		AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(
-				spec, sortOp, primaryPartitionConstraint);
-		return sortOp;
-	}
+    protected ExternalSortOperatorDescriptor createSortOp(JobSpecification spec,
+            IBinaryComparatorFactory[] secondaryComparatorFactories, RecordDescriptor secondaryRecDesc) {
+        int[] sortFields = new int[secondaryComparatorFactories.length];
+        for (int i = 0; i < secondaryComparatorFactories.length; i++) {
+            sortFields[i] = i;
+        }
+        ExternalSortOperatorDescriptor sortOp = new ExternalSortOperatorDescriptor(spec,
+                physOptConf.getMaxFramesExternalSort(), sortFields, secondaryComparatorFactories, secondaryRecDesc);
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, sortOp, primaryPartitionConstraint);
+        return sortOp;
+    }
 
-	protected TreeIndexBulkLoadOperatorDescriptor createTreeIndexBulkLoadOp(
-			JobSpecification spec, int numSecondaryKeyFields,
-			IIndexDataflowHelperFactory dataflowHelperFactory, float fillFactor)
-			throws MetadataException, AlgebricksException {
-		int[] fieldPermutation = new int[numSecondaryKeyFields + numPrimaryKeys];
-		for (int i = 0; i < numSecondaryKeyFields + numPrimaryKeys; i++) {
-			fieldPermutation[i] = i;
-		}
-		TreeIndexBulkLoadOperatorDescriptor treeIndexBulkLoadOp = new TreeIndexBulkLoadOperatorDescriptor(
-				spec, AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
-				AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
-				secondaryFileSplitProvider, secondaryRecDesc.getTypeTraits(),
-				secondaryComparatorFactories, secondaryBloomFilterKeyFields,
-				fieldPermutation, fillFactor, false, numElementsHint,
-				dataflowHelperFactory, NoOpOperationCallbackFactory.INSTANCE);
-		AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(
-				spec, treeIndexBulkLoadOp, secondaryPartitionConstraint);
-		return treeIndexBulkLoadOp;
-	}
+    protected TreeIndexBulkLoadOperatorDescriptor createTreeIndexBulkLoadOp(JobSpecification spec,
+            int numSecondaryKeyFields, IIndexDataflowHelperFactory dataflowHelperFactory, float fillFactor)
+            throws MetadataException, AlgebricksException {
+        int[] fieldPermutation = new int[numSecondaryKeyFields + numPrimaryKeys];
+        for (int i = 0; i < numSecondaryKeyFields + numPrimaryKeys; i++) {
+            fieldPermutation[i] = i;
+        }
+        TreeIndexBulkLoadOperatorDescriptor treeIndexBulkLoadOp = new TreeIndexBulkLoadOperatorDescriptor(spec,
+                AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER, AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
+                secondaryFileSplitProvider, secondaryRecDesc.getTypeTraits(), secondaryComparatorFactories,
+                secondaryBloomFilterKeyFields, fieldPermutation, fillFactor, false, numElementsHint,
+                dataflowHelperFactory, NoOpOperationCallbackFactory.INSTANCE);
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, treeIndexBulkLoadOp,
+                secondaryPartitionConstraint);
+        return treeIndexBulkLoadOp;
+    }
 
-	public AlgebricksMetaOperatorDescriptor createFilterNullsSelectOp(
-			JobSpecification spec, int numSecondaryKeyFields)
-			throws AlgebricksException {
-		ICopyEvaluatorFactory[] andArgsEvalFactories = new ICopyEvaluatorFactory[numSecondaryKeyFields];
-		NotDescriptor notDesc = new NotDescriptor();
-		IsNullDescriptor isNullDesc = new IsNullDescriptor();
-		for (int i = 0; i < numSecondaryKeyFields; i++) {
-			// Access column i, and apply 'is not null'.
-			ColumnAccessEvalFactory columnAccessEvalFactory = new ColumnAccessEvalFactory(
-					i);
-			ICopyEvaluatorFactory isNullEvalFactory = isNullDesc
-					.createEvaluatorFactory(new ICopyEvaluatorFactory[] { columnAccessEvalFactory });
-			ICopyEvaluatorFactory notEvalFactory = notDesc
-					.createEvaluatorFactory(new ICopyEvaluatorFactory[] { isNullEvalFactory });
-			andArgsEvalFactories[i] = notEvalFactory;
-		}
-		ICopyEvaluatorFactory selectCond = null;
-		if (numSecondaryKeyFields > 1) {
-			// Create conjunctive condition where all secondary index keys must
-			// satisfy 'is not null'.
-			AndDescriptor andDesc = new AndDescriptor();
-			selectCond = andDesc.createEvaluatorFactory(andArgsEvalFactories);
-		} else {
-			selectCond = andArgsEvalFactories[0];
-		}
-		StreamSelectRuntimeFactory select = new StreamSelectRuntimeFactory(
-				new LogicalExpressionJobGenToExpressionRuntimeProviderAdapter.ScalarEvaluatorFactoryAdapter(
-						selectCond), null,
-				AqlBinaryBooleanInspectorImpl.FACTORY);
-		AlgebricksMetaOperatorDescriptor asterixSelectOp = new AlgebricksMetaOperatorDescriptor(
-				spec, 1, 1, new IPushRuntimeFactory[] { select },
-				new RecordDescriptor[] { secondaryRecDesc });
-		AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(
-				spec, asterixSelectOp, primaryPartitionConstraint);
-		return asterixSelectOp;
-	}
+    public AlgebricksMetaOperatorDescriptor createFilterNullsSelectOp(JobSpecification spec, int numSecondaryKeyFields)
+            throws AlgebricksException {
+        ICopyEvaluatorFactory[] andArgsEvalFactories = new ICopyEvaluatorFactory[numSecondaryKeyFields];
+        NotDescriptor notDesc = new NotDescriptor();
+        IsNullDescriptor isNullDesc = new IsNullDescriptor();
+        for (int i = 0; i < numSecondaryKeyFields; i++) {
+            // Access column i, and apply 'is not null'.
+            ColumnAccessEvalFactory columnAccessEvalFactory = new ColumnAccessEvalFactory(i);
+            ICopyEvaluatorFactory isNullEvalFactory = isNullDesc
+                    .createEvaluatorFactory(new ICopyEvaluatorFactory[] { columnAccessEvalFactory });
+            ICopyEvaluatorFactory notEvalFactory = notDesc
+                    .createEvaluatorFactory(new ICopyEvaluatorFactory[] { isNullEvalFactory });
+            andArgsEvalFactories[i] = notEvalFactory;
+        }
+        ICopyEvaluatorFactory selectCond = null;
+        if (numSecondaryKeyFields > 1) {
+            // Create conjunctive condition where all secondary index keys must
+            // satisfy 'is not null'.
+            AndDescriptor andDesc = new AndDescriptor();
+            selectCond = andDesc.createEvaluatorFactory(andArgsEvalFactories);
+        } else {
+            selectCond = andArgsEvalFactories[0];
+        }
+        StreamSelectRuntimeFactory select = new StreamSelectRuntimeFactory(
+                new LogicalExpressionJobGenToExpressionRuntimeProviderAdapter.ScalarEvaluatorFactoryAdapter(selectCond),
+                null, AqlBinaryBooleanInspectorImpl.FACTORY);
+        AlgebricksMetaOperatorDescriptor asterixSelectOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 1,
+                new IPushRuntimeFactory[] { select }, new RecordDescriptor[] { secondaryRecDesc });
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, asterixSelectOp,
+                primaryPartitionConstraint);
+        return asterixSelectOp;
+    }
 }
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/dataflow/IAsterixApplicationContextInfo.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/dataflow/IAsterixApplicationContextInfo.java
index 2b04e32..eacee3c 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/dataflow/IAsterixApplicationContextInfo.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/dataflow/IAsterixApplicationContextInfo.java
@@ -14,8 +14,6 @@
  */
 package edu.uci.ics.asterix.common.dataflow;
 
-import java.util.Map;
-
 import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
 import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
index 1a1b0e7..531cf0a 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -333,8 +333,8 @@
     }
 
     private static void enlistMetadataDataset(IMetadataIndex index, boolean create) throws Exception {
-        String filePath = metadataStore + File.separator + index.getFileNameRelativePath() + "_"
-                + runtimeContext.getMetaDataIODeviceId();
+        String filePath = metadataStore + File.separator + index.getFileNameRelativePath() + File.separator
+                + "device_id_" + runtimeContext.getMetaDataIODeviceId();
         FileReference file = new FileReference(new File(filePath));
         IInMemoryBufferCache memBufferCache = new InMemoryBufferCache(new HeapBufferAllocator(), DEFAULT_MEM_PAGE_SIZE,
                 DEFAULT_MEM_NUM_PAGES, new TransientFileMapManager());