Fixed various bugs that prevented an lsm index from being partitioned accross iodevices.
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java
index 3600bea..742de84 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java
@@ -171,7 +171,7 @@
         //prepare a LocalResourceMetadata which will be stored in NC's local resource repository
         ILocalResourceMetadata localResourceMetadata = new LSMBTreeLocalResourceMetadata(typeTraits,
                 comparatorFactories, blooFilterKeyFields, true, storageProperties.getMemoryComponentPageSize(),
-                storageProperties.getMemoryComponentNumPages());
+                storageProperties.getMemoryComponentNumPages(), fs);
         ILocalResourceFactoryProvider localResourceFactoryProvider = new PersistentLocalResourceFactoryProvider(
                 localResourceMetadata, LocalResource.LSMBTreeResource);
 
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryBTreeCreator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryBTreeCreator.java
index c3a2c01..20b66e9 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryBTreeCreator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryBTreeCreator.java
@@ -40,7 +40,8 @@
         //prepare a LocalResourceMetadata which will be stored in NC's local resource repository
         ILocalResourceMetadata localResourceMetadata = new LSMBTreeLocalResourceMetadata(
                 secondaryRecDesc.getTypeTraits(), secondaryComparatorFactories, secondaryBloomFilterKeyFields, false,
-                storageProperties.getMemoryComponentPageSize(), storageProperties.getMemoryComponentNumPages());
+                storageProperties.getMemoryComponentPageSize(), storageProperties.getMemoryComponentNumPages(),
+                secondaryFileSplitProvider.getFileSplits());
         ILocalResourceFactoryProvider localResourceFactoryProvider = new PersistentLocalResourceFactoryProvider(
                 localResourceMetadata, LocalResource.LSMBTreeResource);
 
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java
index 253df4b..d1666a7 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java
@@ -76,308 +76,366 @@
 import edu.uci.ics.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeDataflowHelperFactory;
 
 @SuppressWarnings("rawtypes")
-// TODO: We should eventually have a hierarchy of classes that can create all possible index job specs, 
+// TODO: We should eventually have a hierarchy of classes that can create all
+// possible index job specs,
 // not just for creation.
 public abstract class SecondaryIndexCreator {
-    protected final PhysicalOptimizationConfig physOptConf;
+	protected final PhysicalOptimizationConfig physOptConf;
 
-    protected int numPrimaryKeys;
-    protected int numSecondaryKeys;
-    protected AqlMetadataProvider metadataProvider;
-    protected String dataverseName;
-    protected String datasetName;
-    protected Dataset dataset;
-    protected ARecordType itemType;
-    protected ISerializerDeserializer payloadSerde;
-    protected IFileSplitProvider primaryFileSplitProvider;
-    protected AlgebricksPartitionConstraint primaryPartitionConstraint;
-    protected IFileSplitProvider secondaryFileSplitProvider;
-    protected AlgebricksPartitionConstraint secondaryPartitionConstraint;
-    protected String secondaryIndexName;
-    protected boolean anySecondaryKeyIsNullable = false;
+	protected int numPrimaryKeys;
+	protected int numSecondaryKeys;
+	protected AqlMetadataProvider metadataProvider;
+	protected String dataverseName;
+	protected String datasetName;
+	protected Dataset dataset;
+	protected ARecordType itemType;
+	protected ISerializerDeserializer payloadSerde;
+	protected IFileSplitProvider primaryFileSplitProvider;
+	protected AlgebricksPartitionConstraint primaryPartitionConstraint;
+	protected IFileSplitProvider secondaryFileSplitProvider;
+	protected AlgebricksPartitionConstraint secondaryPartitionConstraint;
+	protected String secondaryIndexName;
+	protected boolean anySecondaryKeyIsNullable = false;
 
-    protected long numElementsHint;
-    protected IBinaryComparatorFactory[] primaryComparatorFactories;
-    protected int[] primaryBloomFilterKeyFields;
-    protected RecordDescriptor primaryRecDesc;
-    protected IBinaryComparatorFactory[] secondaryComparatorFactories;
-    protected int[] secondaryBloomFilterKeyFields;
-    protected RecordDescriptor secondaryRecDesc;
-    protected ICopyEvaluatorFactory[] secondaryFieldAccessEvalFactories;
+	protected long numElementsHint;
+	protected IBinaryComparatorFactory[] primaryComparatorFactories;
+	protected int[] primaryBloomFilterKeyFields;
+	protected RecordDescriptor primaryRecDesc;
+	protected IBinaryComparatorFactory[] secondaryComparatorFactories;
+	protected int[] secondaryBloomFilterKeyFields;
+	protected RecordDescriptor secondaryRecDesc;
+	protected ICopyEvaluatorFactory[] secondaryFieldAccessEvalFactories;
 
-    protected IAsterixPropertiesProvider propertiesProvider;
+	protected IAsterixPropertiesProvider propertiesProvider;
 
-    // Prevent public construction. Should be created via createIndexCreator().
-    protected SecondaryIndexCreator(PhysicalOptimizationConfig physOptConf,
-            IAsterixPropertiesProvider propertiesProvider) {
-        this.physOptConf = physOptConf;
-        this.propertiesProvider = propertiesProvider;
-    }
+	// Prevent public construction. Should be created via createIndexCreator().
+	protected SecondaryIndexCreator(PhysicalOptimizationConfig physOptConf,
+			IAsterixPropertiesProvider propertiesProvider) {
+		this.physOptConf = physOptConf;
+		this.propertiesProvider = propertiesProvider;
+	}
 
-    public static SecondaryIndexCreator createIndexCreator(CompiledCreateIndexStatement createIndexStmt,
-            AqlMetadataProvider metadataProvider, PhysicalOptimizationConfig physOptConf) throws AsterixException,
-            AlgebricksException {
-        IAsterixPropertiesProvider asterixPropertiesProvider = AsterixAppContextInfo.getInstance();
-        SecondaryIndexCreator indexCreator = null;
-        switch (createIndexStmt.getIndexType()) {
-            case BTREE: {
-                indexCreator = new SecondaryBTreeCreator(physOptConf, asterixPropertiesProvider);
-                break;
-            }
-            case RTREE: {
-                indexCreator = new SecondaryRTreeCreator(physOptConf, asterixPropertiesProvider);
-                break;
-            }
-            case WORD_INVIX:
-            case NGRAM_INVIX:
-            case FUZZY_WORD_INVIX:
-            case FUZZY_NGRAM_INVIX: {
-                indexCreator = new SecondaryInvertedIndexCreator(physOptConf, asterixPropertiesProvider);
-                break;
-            }
-            default: {
-                throw new AsterixException("Unknown Index Type: " + createIndexStmt.getIndexType());
-            }
-        }
-        indexCreator.init(createIndexStmt, metadataProvider);
-        return indexCreator;
-    }
+	public static SecondaryIndexCreator createIndexCreator(
+			CompiledCreateIndexStatement createIndexStmt,
+			AqlMetadataProvider metadataProvider,
+			PhysicalOptimizationConfig physOptConf) throws AsterixException,
+			AlgebricksException {
+		IAsterixPropertiesProvider asterixPropertiesProvider = AsterixAppContextInfo
+				.getInstance();
+		SecondaryIndexCreator indexCreator = null;
+		switch (createIndexStmt.getIndexType()) {
+		case BTREE: {
+			indexCreator = new SecondaryBTreeCreator(physOptConf,
+					asterixPropertiesProvider);
+			break;
+		}
+		case RTREE: {
+			indexCreator = new SecondaryRTreeCreator(physOptConf,
+					asterixPropertiesProvider);
+			break;
+		}
+		case WORD_INVIX:
+		case NGRAM_INVIX:
+		case FUZZY_WORD_INVIX:
+		case FUZZY_NGRAM_INVIX: {
+			indexCreator = new SecondaryInvertedIndexCreator(physOptConf,
+					asterixPropertiesProvider);
+			break;
+		}
+		default: {
+			throw new AsterixException("Unknown Index Type: "
+					+ createIndexStmt.getIndexType());
+		}
+		}
+		indexCreator.init(createIndexStmt, metadataProvider);
+		return indexCreator;
+	}
 
-    public abstract JobSpecification buildCreationJobSpec() throws AsterixException, AlgebricksException;
+	public abstract JobSpecification buildCreationJobSpec()
+			throws AsterixException, AlgebricksException;
 
-    public abstract JobSpecification buildLoadingJobSpec() throws AsterixException, AlgebricksException;
+	public abstract JobSpecification buildLoadingJobSpec()
+			throws AsterixException, AlgebricksException;
 
-    protected void init(CompiledCreateIndexStatement createIndexStmt, AqlMetadataProvider metadataProvider)
-            throws AsterixException, AlgebricksException {
-        this.metadataProvider = metadataProvider;
-        dataverseName = createIndexStmt.getDataverseName() == null ? metadataProvider.getDefaultDataverseName()
-                : createIndexStmt.getDataverseName();
-        datasetName = createIndexStmt.getDatasetName();
-        secondaryIndexName = createIndexStmt.getIndexName();
-        dataset = metadataProvider.findDataset(dataverseName, datasetName);
-        if (dataset == null) {
-            throw new AsterixException("Unknown dataset " + datasetName);
-        }
-        if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
-            throw new AsterixException("Cannot index an external dataset (" + datasetName + ").");
-        }
-        itemType = (ARecordType) metadataProvider.findType(dataset.getDataverseName(), dataset.getItemTypeName());
-        payloadSerde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(itemType);
-        numPrimaryKeys = DatasetUtils.getPartitioningKeys(dataset).size();
-        numSecondaryKeys = createIndexStmt.getKeyFields().size();
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primarySplitsAndConstraint = metadataProvider
-                .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(dataverseName, datasetName, datasetName);
-        primaryFileSplitProvider = primarySplitsAndConstraint.first;
-        primaryPartitionConstraint = primarySplitsAndConstraint.second;
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint = metadataProvider
-                .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(dataverseName, datasetName,
-                        secondaryIndexName);
-        secondaryFileSplitProvider = secondarySplitsAndConstraint.first;
-        secondaryPartitionConstraint = secondarySplitsAndConstraint.second;
-        // Must be called in this order.
-        setPrimaryRecDescAndComparators();
-        setSecondaryRecDescAndComparators(createIndexStmt, metadataProvider);
+	protected void init(CompiledCreateIndexStatement createIndexStmt,
+			AqlMetadataProvider metadataProvider) throws AsterixException,
+			AlgebricksException {
+		this.metadataProvider = metadataProvider;
+		dataverseName = createIndexStmt.getDataverseName() == null ? metadataProvider
+				.getDefaultDataverseName() : createIndexStmt.getDataverseName();
+		datasetName = createIndexStmt.getDatasetName();
+		secondaryIndexName = createIndexStmt.getIndexName();
+		dataset = metadataProvider.findDataset(dataverseName, datasetName);
+		if (dataset == null) {
+			throw new AsterixException("Unknown dataset " + datasetName);
+		}
+		if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
+			throw new AsterixException("Cannot index an external dataset ("
+					+ datasetName + ").");
+		}
+		itemType = (ARecordType) metadataProvider.findType(
+				dataset.getDataverseName(), dataset.getItemTypeName());
+		payloadSerde = AqlSerializerDeserializerProvider.INSTANCE
+				.getSerializerDeserializer(itemType);
+		numPrimaryKeys = DatasetUtils.getPartitioningKeys(dataset).size();
+		numSecondaryKeys = createIndexStmt.getKeyFields().size();
+		Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primarySplitsAndConstraint = metadataProvider
+				.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(
+						dataverseName, datasetName, datasetName);
+		primaryFileSplitProvider = primarySplitsAndConstraint.first;
+		primaryPartitionConstraint = primarySplitsAndConstraint.second;
+		Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint = metadataProvider
+				.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(
+						dataverseName, datasetName, secondaryIndexName);
+		secondaryFileSplitProvider = secondarySplitsAndConstraint.first;
+		secondaryPartitionConstraint = secondarySplitsAndConstraint.second;
+		// Must be called in this order.
+		setPrimaryRecDescAndComparators();
+		setSecondaryRecDescAndComparators(createIndexStmt, metadataProvider);
 
-        String numElementsHintString = dataset.getHints().get("CARDINALITY");
-        if (numElementsHintString == null) {
-            numElementsHint = DatasetCardinalityHint.DEFAULT;
-        } else {
-            numElementsHint = Long.parseLong(dataset.getHints().get("CARDINALITY"));
-        }
-    }
+		String numElementsHintString = dataset.getHints().get("CARDINALITY");
+		if (numElementsHintString == null) {
+			numElementsHint = DatasetCardinalityHint.DEFAULT;
+		} else {
+			numElementsHint = Long.parseLong(dataset.getHints().get(
+					"CARDINALITY"));
+		}
+	}
 
-    protected void setPrimaryRecDescAndComparators() throws AlgebricksException {
-        List<String> partitioningKeys = DatasetUtils.getPartitioningKeys(dataset);
-        int numPrimaryKeys = partitioningKeys.size();
-        ISerializerDeserializer[] primaryRecFields = new ISerializerDeserializer[numPrimaryKeys + 1];
-        ITypeTraits[] primaryTypeTraits = new ITypeTraits[numPrimaryKeys + 1];
-        primaryComparatorFactories = new IBinaryComparatorFactory[numPrimaryKeys];
-        primaryBloomFilterKeyFields = new int[numPrimaryKeys];
-        ISerializerDeserializerProvider serdeProvider = metadataProvider.getFormat().getSerdeProvider();
-        for (int i = 0; i < numPrimaryKeys; i++) {
-            IAType keyType;
-            try {
-                keyType = itemType.getFieldType(partitioningKeys.get(i));
-            } catch (IOException e) {
-                throw new AlgebricksException(e);
-            }
-            primaryRecFields[i] = serdeProvider.getSerializerDeserializer(keyType);
-            primaryComparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
-                    keyType, true);
-            primaryTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
-            primaryBloomFilterKeyFields[i] = i;
-        }
-        primaryRecFields[numPrimaryKeys] = payloadSerde;
-        primaryTypeTraits[numPrimaryKeys] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(itemType);
-        primaryRecDesc = new RecordDescriptor(primaryRecFields, primaryTypeTraits);
-    }
+	protected void setPrimaryRecDescAndComparators() throws AlgebricksException {
+		List<String> partitioningKeys = DatasetUtils
+				.getPartitioningKeys(dataset);
+		int numPrimaryKeys = partitioningKeys.size();
+		ISerializerDeserializer[] primaryRecFields = new ISerializerDeserializer[numPrimaryKeys + 1];
+		ITypeTraits[] primaryTypeTraits = new ITypeTraits[numPrimaryKeys + 1];
+		primaryComparatorFactories = new IBinaryComparatorFactory[numPrimaryKeys];
+		primaryBloomFilterKeyFields = new int[numPrimaryKeys];
+		ISerializerDeserializerProvider serdeProvider = metadataProvider
+				.getFormat().getSerdeProvider();
+		for (int i = 0; i < numPrimaryKeys; i++) {
+			IAType keyType;
+			try {
+				keyType = itemType.getFieldType(partitioningKeys.get(i));
+			} catch (IOException e) {
+				throw new AlgebricksException(e);
+			}
+			primaryRecFields[i] = serdeProvider
+					.getSerializerDeserializer(keyType);
+			primaryComparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE
+					.getBinaryComparatorFactory(keyType, true);
+			primaryTypeTraits[i] = AqlTypeTraitProvider.INSTANCE
+					.getTypeTrait(keyType);
+			primaryBloomFilterKeyFields[i] = i;
+		}
+		primaryRecFields[numPrimaryKeys] = payloadSerde;
+		primaryTypeTraits[numPrimaryKeys] = AqlTypeTraitProvider.INSTANCE
+				.getTypeTrait(itemType);
+		primaryRecDesc = new RecordDescriptor(primaryRecFields,
+				primaryTypeTraits);
+	}
 
-    protected void setSecondaryRecDescAndComparators(CompiledCreateIndexStatement createIndexStmt,
-            AqlMetadataProvider metadataProvider) throws AlgebricksException, AsterixException {
-        List<String> secondaryKeyFields = createIndexStmt.getKeyFields();
-        secondaryFieldAccessEvalFactories = new ICopyEvaluatorFactory[numSecondaryKeys];
-        secondaryComparatorFactories = new IBinaryComparatorFactory[numSecondaryKeys + numPrimaryKeys];
-        secondaryBloomFilterKeyFields = new int[numSecondaryKeys];
-        ISerializerDeserializer[] secondaryRecFields = new ISerializerDeserializer[numPrimaryKeys + numSecondaryKeys];
-        ITypeTraits[] secondaryTypeTraits = new ITypeTraits[numSecondaryKeys + numPrimaryKeys];
-        ISerializerDeserializerProvider serdeProvider = metadataProvider.getFormat().getSerdeProvider();
-        ITypeTraitProvider typeTraitProvider = metadataProvider.getFormat().getTypeTraitProvider();
-        IBinaryComparatorFactoryProvider comparatorFactoryProvider = metadataProvider.getFormat()
-                .getBinaryComparatorFactoryProvider();
-        for (int i = 0; i < numSecondaryKeys; i++) {
-            secondaryFieldAccessEvalFactories[i] = metadataProvider.getFormat().getFieldAccessEvaluatorFactory(
-                    itemType, secondaryKeyFields.get(i), numPrimaryKeys);
-            Pair<IAType, Boolean> keyTypePair = Index.getNonNullableKeyFieldType(secondaryKeyFields.get(i), itemType);
-            IAType keyType = keyTypePair.first;
-            anySecondaryKeyIsNullable = anySecondaryKeyIsNullable || keyTypePair.second;
-            ISerializerDeserializer keySerde = serdeProvider.getSerializerDeserializer(keyType);
-            secondaryRecFields[i] = keySerde;
-            secondaryComparatorFactories[i] = comparatorFactoryProvider.getBinaryComparatorFactory(keyType, true);
-            secondaryTypeTraits[i] = typeTraitProvider.getTypeTrait(keyType);
-            secondaryBloomFilterKeyFields[i] = i;
-        }
-        // Add serializers and comparators for primary index fields.
-        for (int i = 0; i < numPrimaryKeys; i++) {
-            secondaryRecFields[numSecondaryKeys + i] = primaryRecDesc.getFields()[i];
-            secondaryTypeTraits[numSecondaryKeys + i] = primaryRecDesc.getTypeTraits()[i];
-            secondaryComparatorFactories[numSecondaryKeys + i] = primaryComparatorFactories[i];
-        }
-        secondaryRecDesc = new RecordDescriptor(secondaryRecFields, secondaryTypeTraits);
-    }
+	protected void setSecondaryRecDescAndComparators(
+			CompiledCreateIndexStatement createIndexStmt,
+			AqlMetadataProvider metadataProvider) throws AlgebricksException,
+			AsterixException {
+		List<String> secondaryKeyFields = createIndexStmt.getKeyFields();
+		secondaryFieldAccessEvalFactories = new ICopyEvaluatorFactory[numSecondaryKeys];
+		secondaryComparatorFactories = new IBinaryComparatorFactory[numSecondaryKeys
+				+ numPrimaryKeys];
+		secondaryBloomFilterKeyFields = new int[numSecondaryKeys];
+		ISerializerDeserializer[] secondaryRecFields = new ISerializerDeserializer[numPrimaryKeys
+				+ numSecondaryKeys];
+		ITypeTraits[] secondaryTypeTraits = new ITypeTraits[numSecondaryKeys
+				+ numPrimaryKeys];
+		ISerializerDeserializerProvider serdeProvider = metadataProvider
+				.getFormat().getSerdeProvider();
+		ITypeTraitProvider typeTraitProvider = metadataProvider.getFormat()
+				.getTypeTraitProvider();
+		IBinaryComparatorFactoryProvider comparatorFactoryProvider = metadataProvider
+				.getFormat().getBinaryComparatorFactoryProvider();
+		for (int i = 0; i < numSecondaryKeys; i++) {
+			secondaryFieldAccessEvalFactories[i] = metadataProvider.getFormat()
+					.getFieldAccessEvaluatorFactory(itemType,
+							secondaryKeyFields.get(i), numPrimaryKeys);
+			Pair<IAType, Boolean> keyTypePair = Index
+					.getNonNullableKeyFieldType(secondaryKeyFields.get(i),
+							itemType);
+			IAType keyType = keyTypePair.first;
+			anySecondaryKeyIsNullable = anySecondaryKeyIsNullable
+					|| keyTypePair.second;
+			ISerializerDeserializer keySerde = serdeProvider
+					.getSerializerDeserializer(keyType);
+			secondaryRecFields[i] = keySerde;
+			secondaryComparatorFactories[i] = comparatorFactoryProvider
+					.getBinaryComparatorFactory(keyType, true);
+			secondaryTypeTraits[i] = typeTraitProvider.getTypeTrait(keyType);
+			secondaryBloomFilterKeyFields[i] = i;
+		}
+		// Add serializers and comparators for primary index fields.
+		for (int i = 0; i < numPrimaryKeys; i++) {
+			secondaryRecFields[numSecondaryKeys + i] = primaryRecDesc
+					.getFields()[i];
+			secondaryTypeTraits[numSecondaryKeys + i] = primaryRecDesc
+					.getTypeTraits()[i];
+			secondaryComparatorFactories[numSecondaryKeys + i] = primaryComparatorFactories[i];
+		}
+		secondaryRecDesc = new RecordDescriptor(secondaryRecFields,
+				secondaryTypeTraits);
+	}
 
-    protected AbstractOperatorDescriptor createDummyKeyProviderOp(JobSpecification spec) throws AsterixException,
-            AlgebricksException {
-        // Build dummy tuple containing one field with a dummy value inside.
-        ArrayTupleBuilder tb = new ArrayTupleBuilder(1);
-        DataOutput dos = tb.getDataOutput();
-        tb.reset();
-        try {
-            // Serialize dummy value into a field.
-            IntegerSerializerDeserializer.INSTANCE.serialize(0, dos);
-        } catch (HyracksDataException e) {
-            throw new AsterixException(e);
-        }
-        // Add dummy field.
-        tb.addFieldEndOffset();
-        ISerializerDeserializer[] keyRecDescSers = { IntegerSerializerDeserializer.INSTANCE };
-        RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
-        ConstantTupleSourceOperatorDescriptor keyProviderOp = new ConstantTupleSourceOperatorDescriptor(spec,
-                keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
-        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, keyProviderOp,
-                primaryPartitionConstraint);
-        return keyProviderOp;
-    }
+	protected AbstractOperatorDescriptor createDummyKeyProviderOp(
+			JobSpecification spec) throws AsterixException, AlgebricksException {
+		// Build dummy tuple containing one field with a dummy value inside.
+		ArrayTupleBuilder tb = new ArrayTupleBuilder(1);
+		DataOutput dos = tb.getDataOutput();
+		tb.reset();
+		try {
+			// Serialize dummy value into a field.
+			IntegerSerializerDeserializer.INSTANCE.serialize(0, dos);
+		} catch (HyracksDataException e) {
+			throw new AsterixException(e);
+		}
+		// Add dummy field.
+		tb.addFieldEndOffset();
+		ISerializerDeserializer[] keyRecDescSers = { IntegerSerializerDeserializer.INSTANCE };
+		RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
+		ConstantTupleSourceOperatorDescriptor keyProviderOp = new ConstantTupleSourceOperatorDescriptor(
+				spec, keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(),
+				tb.getSize());
+		AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(
+				spec, keyProviderOp, primaryPartitionConstraint);
+		return keyProviderOp;
+	}
 
-    protected BTreeSearchOperatorDescriptor createPrimaryIndexScanOp(JobSpecification spec) throws AlgebricksException {
-        // -Infinity
-        int[] lowKeyFields = null;
-        // +Infinity
-        int[] highKeyFields = null;
-        AsterixStorageProperties storageProperties = propertiesProvider.getStorageProperties();
-        BTreeSearchOperatorDescriptor primarySearchOp = new BTreeSearchOperatorDescriptor(spec, primaryRecDesc,
-                AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER, AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER,
-                primaryFileSplitProvider, primaryRecDesc.getTypeTraits(), primaryComparatorFactories,
-                primaryBloomFilterKeyFields, lowKeyFields, highKeyFields, true, true,
-                new LSMBTreeDataflowHelperFactory(AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
-                        AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
-                        AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
-                        AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER, storageProperties
-                                .getMemoryComponentPageSize(), storageProperties.getMemoryComponentNumPages(),
-                        storageProperties.getBloomFilterFalsePositiveRate()), false,
-                NoOpOperationCallbackFactory.INSTANCE);
-        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, primarySearchOp,
-                primaryPartitionConstraint);
-        return primarySearchOp;
-    }
+	protected BTreeSearchOperatorDescriptor createPrimaryIndexScanOp(
+			JobSpecification spec) throws AlgebricksException {
+		// -Infinity
+		int[] lowKeyFields = null;
+		// +Infinity
+		int[] highKeyFields = null;
+		AsterixStorageProperties storageProperties = propertiesProvider
+				.getStorageProperties();
+		BTreeSearchOperatorDescriptor primarySearchOp = new BTreeSearchOperatorDescriptor(
+				spec, primaryRecDesc,
+				AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER,
+				AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER,
+				primaryFileSplitProvider, primaryRecDesc.getTypeTraits(),
+				primaryComparatorFactories, primaryBloomFilterKeyFields,
+				lowKeyFields, highKeyFields, true, true,
+				new LSMBTreeDataflowHelperFactory(
+						AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
+						AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
+						AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
+						AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
+						storageProperties.getMemoryComponentPageSize(),
+						storageProperties.getMemoryComponentNumPages(),
+						storageProperties.getBloomFilterFalsePositiveRate()),
+				false, NoOpOperationCallbackFactory.INSTANCE);
+		AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(
+				spec, primarySearchOp, primaryPartitionConstraint);
+		return primarySearchOp;
+	}
 
-    protected AlgebricksMetaOperatorDescriptor createAssignOp(JobSpecification spec,
-            BTreeSearchOperatorDescriptor primaryScanOp, int numSecondaryKeyFields) throws AlgebricksException {
-        int[] outColumns = new int[numSecondaryKeyFields];
-        int[] projectionList = new int[numSecondaryKeyFields + numPrimaryKeys];
-        for (int i = 0; i < numSecondaryKeyFields; i++) {
-            outColumns[i] = numPrimaryKeys + i + 1;
-        }
-        int projCount = 0;
-        for (int i = 0; i < numSecondaryKeyFields; i++) {
-            projectionList[projCount++] = numPrimaryKeys + i + 1;
-        }
-        for (int i = 0; i < numPrimaryKeys; i++) {
-            projectionList[projCount++] = i;
-        }
-        IScalarEvaluatorFactory[] sefs = new IScalarEvaluatorFactory[secondaryFieldAccessEvalFactories.length];
-        for (int i = 0; i < secondaryFieldAccessEvalFactories.length; ++i) {
-            sefs[i] = new LogicalExpressionJobGenToExpressionRuntimeProviderAdapter.ScalarEvaluatorFactoryAdapter(
-                    secondaryFieldAccessEvalFactories[i]);
-        }
-        AssignRuntimeFactory assign = new AssignRuntimeFactory(outColumns, sefs, projectionList);
-        AlgebricksMetaOperatorDescriptor asterixAssignOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 1,
-                new IPushRuntimeFactory[] { assign }, new RecordDescriptor[] { secondaryRecDesc });
-        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, asterixAssignOp,
-                primaryPartitionConstraint);
-        return asterixAssignOp;
-    }
+	protected AlgebricksMetaOperatorDescriptor createAssignOp(
+			JobSpecification spec, BTreeSearchOperatorDescriptor primaryScanOp,
+			int numSecondaryKeyFields) throws AlgebricksException {
+		int[] outColumns = new int[numSecondaryKeyFields];
+		int[] projectionList = new int[numSecondaryKeyFields + numPrimaryKeys];
+		for (int i = 0; i < numSecondaryKeyFields; i++) {
+			outColumns[i] = numPrimaryKeys + i + 1;
+		}
+		int projCount = 0;
+		for (int i = 0; i < numSecondaryKeyFields; i++) {
+			projectionList[projCount++] = numPrimaryKeys + i + 1;
+		}
+		for (int i = 0; i < numPrimaryKeys; i++) {
+			projectionList[projCount++] = i;
+		}
+		IScalarEvaluatorFactory[] sefs = new IScalarEvaluatorFactory[secondaryFieldAccessEvalFactories.length];
+		for (int i = 0; i < secondaryFieldAccessEvalFactories.length; ++i) {
+			sefs[i] = new LogicalExpressionJobGenToExpressionRuntimeProviderAdapter.ScalarEvaluatorFactoryAdapter(
+					secondaryFieldAccessEvalFactories[i]);
+		}
+		AssignRuntimeFactory assign = new AssignRuntimeFactory(outColumns,
+				sefs, projectionList);
+		AlgebricksMetaOperatorDescriptor asterixAssignOp = new AlgebricksMetaOperatorDescriptor(
+				spec, 1, 1, new IPushRuntimeFactory[] { assign },
+				new RecordDescriptor[] { secondaryRecDesc });
+		AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(
+				spec, asterixAssignOp, primaryPartitionConstraint);
+		return asterixAssignOp;
+	}
 
-    protected ExternalSortOperatorDescriptor createSortOp(JobSpecification spec,
-            IBinaryComparatorFactory[] secondaryComparatorFactories, RecordDescriptor secondaryRecDesc) {
-        int[] sortFields = new int[secondaryComparatorFactories.length];
-        for (int i = 0; i < secondaryComparatorFactories.length; i++) {
-            sortFields[i] = i;
-        }
-        ExternalSortOperatorDescriptor sortOp = new ExternalSortOperatorDescriptor(spec,
-                physOptConf.getMaxFramesExternalSort(), sortFields, secondaryComparatorFactories, secondaryRecDesc);
-        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, sortOp, primaryPartitionConstraint);
-        return sortOp;
-    }
+	protected ExternalSortOperatorDescriptor createSortOp(
+			JobSpecification spec,
+			IBinaryComparatorFactory[] secondaryComparatorFactories,
+			RecordDescriptor secondaryRecDesc) {
+		int[] sortFields = new int[secondaryComparatorFactories.length];
+		for (int i = 0; i < secondaryComparatorFactories.length; i++) {
+			sortFields[i] = i;
+		}
+		ExternalSortOperatorDescriptor sortOp = new ExternalSortOperatorDescriptor(
+				spec, physOptConf.getMaxFramesExternalSort(), sortFields,
+				secondaryComparatorFactories, secondaryRecDesc);
+		AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(
+				spec, sortOp, primaryPartitionConstraint);
+		return sortOp;
+	}
 
-    protected TreeIndexBulkLoadOperatorDescriptor createTreeIndexBulkLoadOp(JobSpecification spec,
-            int numSecondaryKeyFields, IIndexDataflowHelperFactory dataflowHelperFactory, float fillFactor)
-            throws MetadataException, AlgebricksException {
-        int[] fieldPermutation = new int[numSecondaryKeyFields + numPrimaryKeys];
-        for (int i = 0; i < numSecondaryKeyFields + numPrimaryKeys; i++) {
-            fieldPermutation[i] = i;
-        }
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint = metadataProvider
-                .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(dataverseName, datasetName,
-                        secondaryIndexName);
-        TreeIndexBulkLoadOperatorDescriptor treeIndexBulkLoadOp = new TreeIndexBulkLoadOperatorDescriptor(spec,
-                AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER, AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
-                secondarySplitsAndConstraint.first, secondaryRecDesc.getTypeTraits(), secondaryComparatorFactories,
-                secondaryBloomFilterKeyFields, fieldPermutation, fillFactor, false, numElementsHint,
-                dataflowHelperFactory, NoOpOperationCallbackFactory.INSTANCE);
-        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, treeIndexBulkLoadOp,
-                secondarySplitsAndConstraint.second);
-        return treeIndexBulkLoadOp;
-    }
+	protected TreeIndexBulkLoadOperatorDescriptor createTreeIndexBulkLoadOp(
+			JobSpecification spec, int numSecondaryKeyFields,
+			IIndexDataflowHelperFactory dataflowHelperFactory, float fillFactor)
+			throws MetadataException, AlgebricksException {
+		int[] fieldPermutation = new int[numSecondaryKeyFields + numPrimaryKeys];
+		for (int i = 0; i < numSecondaryKeyFields + numPrimaryKeys; i++) {
+			fieldPermutation[i] = i;
+		}
+		TreeIndexBulkLoadOperatorDescriptor treeIndexBulkLoadOp = new TreeIndexBulkLoadOperatorDescriptor(
+				spec, AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
+				AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
+				secondaryFileSplitProvider, secondaryRecDesc.getTypeTraits(),
+				secondaryComparatorFactories, secondaryBloomFilterKeyFields,
+				fieldPermutation, fillFactor, false, numElementsHint,
+				dataflowHelperFactory, NoOpOperationCallbackFactory.INSTANCE);
+		AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(
+				spec, treeIndexBulkLoadOp, secondaryPartitionConstraint);
+		return treeIndexBulkLoadOp;
+	}
 
-    public AlgebricksMetaOperatorDescriptor createFilterNullsSelectOp(JobSpecification spec, int numSecondaryKeyFields)
-            throws AlgebricksException {
-        ICopyEvaluatorFactory[] andArgsEvalFactories = new ICopyEvaluatorFactory[numSecondaryKeyFields];
-        NotDescriptor notDesc = new NotDescriptor();
-        IsNullDescriptor isNullDesc = new IsNullDescriptor();
-        for (int i = 0; i < numSecondaryKeyFields; i++) {
-            // Access column i, and apply 'is not null'.
-            ColumnAccessEvalFactory columnAccessEvalFactory = new ColumnAccessEvalFactory(i);
-            ICopyEvaluatorFactory isNullEvalFactory = isNullDesc
-                    .createEvaluatorFactory(new ICopyEvaluatorFactory[] { columnAccessEvalFactory });
-            ICopyEvaluatorFactory notEvalFactory = notDesc
-                    .createEvaluatorFactory(new ICopyEvaluatorFactory[] { isNullEvalFactory });
-            andArgsEvalFactories[i] = notEvalFactory;
-        }
-        ICopyEvaluatorFactory selectCond = null;
-        if (numSecondaryKeyFields > 1) {
-            // Create conjunctive condition where all secondary index keys must satisfy 'is not null'.
-            AndDescriptor andDesc = new AndDescriptor();
-            selectCond = andDesc.createEvaluatorFactory(andArgsEvalFactories);
-        } else {
-            selectCond = andArgsEvalFactories[0];
-        }
-        StreamSelectRuntimeFactory select = new StreamSelectRuntimeFactory(
-                new LogicalExpressionJobGenToExpressionRuntimeProviderAdapter.ScalarEvaluatorFactoryAdapter(selectCond),
-                null, AqlBinaryBooleanInspectorImpl.FACTORY);
-        AlgebricksMetaOperatorDescriptor asterixSelectOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 1,
-                new IPushRuntimeFactory[] { select }, new RecordDescriptor[] { secondaryRecDesc });
-        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, asterixSelectOp,
-                primaryPartitionConstraint);
-        return asterixSelectOp;
-    }
+	public AlgebricksMetaOperatorDescriptor createFilterNullsSelectOp(
+			JobSpecification spec, int numSecondaryKeyFields)
+			throws AlgebricksException {
+		ICopyEvaluatorFactory[] andArgsEvalFactories = new ICopyEvaluatorFactory[numSecondaryKeyFields];
+		NotDescriptor notDesc = new NotDescriptor();
+		IsNullDescriptor isNullDesc = new IsNullDescriptor();
+		for (int i = 0; i < numSecondaryKeyFields; i++) {
+			// Access column i, and apply 'is not null'.
+			ColumnAccessEvalFactory columnAccessEvalFactory = new ColumnAccessEvalFactory(
+					i);
+			ICopyEvaluatorFactory isNullEvalFactory = isNullDesc
+					.createEvaluatorFactory(new ICopyEvaluatorFactory[] { columnAccessEvalFactory });
+			ICopyEvaluatorFactory notEvalFactory = notDesc
+					.createEvaluatorFactory(new ICopyEvaluatorFactory[] { isNullEvalFactory });
+			andArgsEvalFactories[i] = notEvalFactory;
+		}
+		ICopyEvaluatorFactory selectCond = null;
+		if (numSecondaryKeyFields > 1) {
+			// Create conjunctive condition where all secondary index keys must
+			// satisfy 'is not null'.
+			AndDescriptor andDesc = new AndDescriptor();
+			selectCond = andDesc.createEvaluatorFactory(andArgsEvalFactories);
+		} else {
+			selectCond = andArgsEvalFactories[0];
+		}
+		StreamSelectRuntimeFactory select = new StreamSelectRuntimeFactory(
+				new LogicalExpressionJobGenToExpressionRuntimeProviderAdapter.ScalarEvaluatorFactoryAdapter(
+						selectCond), null,
+				AqlBinaryBooleanInspectorImpl.FACTORY);
+		AlgebricksMetaOperatorDescriptor asterixSelectOp = new AlgebricksMetaOperatorDescriptor(
+				spec, 1, 1, new IPushRuntimeFactory[] { select },
+				new RecordDescriptor[] { secondaryRecDesc });
+		AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(
+				spec, asterixSelectOp, primaryPartitionConstraint);
+		return asterixSelectOp;
+	}
 }
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryInvertedIndexCreator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryInvertedIndexCreator.java
index 366e247..b84f322 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryInvertedIndexCreator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryInvertedIndexCreator.java
@@ -157,7 +157,7 @@
         ILocalResourceMetadata localResourceMetadata = new LSMInvertedIndexLocalResourceMetadata(invListsTypeTraits,
                 primaryComparatorFactories, tokenTypeTraits, tokenComparatorFactories, tokenizerFactory,
                 storageProperties.getMemoryComponentPageSize(), storageProperties.getMemoryComponentNumPages(),
-                isPartitioned);
+                isPartitioned, secondaryFileSplitProvider.getFileSplits());
         ILocalResourceFactoryProvider localResourceFactoryProvider = new PersistentLocalResourceFactoryProvider(
                 localResourceMetadata, LocalResource.LSMInvertedIndexResource);
 
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryRTreeCreator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryRTreeCreator.java
index 89a59e8..16e65f9 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryRTreeCreator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryRTreeCreator.java
@@ -65,7 +65,7 @@
                 secondaryRecDesc.getTypeTraits(), secondaryComparatorFactories, primaryComparatorFactories,
                 valueProviderFactories, RTreePolicyType.RTREE, AqlMetadataProvider.proposeLinearizer(keyType,
                         secondaryComparatorFactories.length), storageProperties.getMemoryComponentPageSize(),
-                storageProperties.getMemoryComponentNumPages());
+                storageProperties.getMemoryComponentNumPages(), secondaryFileSplitProvider.getFileSplits());
         ILocalResourceFactoryProvider localResourceFactoryProvider = new PersistentLocalResourceFactoryProvider(
                 localResourceMetadata, LocalResource.LSMRTreeResource);
 
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
index f16c09a..d8fd7c2 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -358,12 +358,14 @@
             AsterixStorageProperties storageProperties = propertiesProvider.getStorageProperties();
             ILocalResourceMetadata localResourceMetadata = new LSMBTreeLocalResourceMetadata(typeTraits,
                     comparatorFactories, bloomFilterKeyFields, index.isPrimaryIndex(),
-                    storageProperties.getMemoryComponentPageSize(), storageProperties.getMemoryComponentNumPages());
+                    storageProperties.getMemoryComponentPageSize(), storageProperties.getMemoryComponentNumPages(),
+                    runtimeContext.getMetaDataIODeviceId());
             ILocalResourceFactoryProvider localResourceFactoryProvider = new PersistentLocalResourceFactoryProvider(
                     localResourceMetadata, LocalResource.LSMBTreeResource);
             ILocalResourceFactory localResourceFactory = localResourceFactoryProvider.getLocalResourceFactory();
-            localResourceRepository.insert(localResourceFactory.createLocalResource(resourceID, file.getFile()
-                    .getPath(), 0));
+            localResourceRepository.insert(
+                    localResourceFactory.createLocalResource(resourceID, file.getFile().getPath(), 0),
+                    runtimeContext.getMetaDataIODeviceId());
         } else {
             resourceID = localResourceRepository.getResourceByName(file.getFile().getPath()).getResourceId();
             lsmBtree = (LSMBTree) indexLifecycleManager.getIndex(resourceID);
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
index ef292cd..cde0cd9 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
@@ -418,7 +418,8 @@
                 adapterFactoryClassname = adapterFactoryMapping.get(adapterName);
                 if (adapterFactoryClassname != null) {
                 } else {
-                    // adapterName has been provided as a fully qualified classname 
+                    // adapterName has been provided as a fully qualified
+                    // classname
                     adapterFactoryClassname = adapterName;
                 }
                 adapterFactory = (IAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
@@ -768,9 +769,11 @@
                 numElementsHint = Long.parseLong(dataset.getHints().get("CARDINALITY"));
             }
 
-            //TODO
-            //figure out the right behavior of the bulkload and then give the right callback
-            //(ex. what's the expected behavior when there is an error during bulkload?)
+            // TODO
+            // figure out the right behavior of the bulkload and then give the
+            // right callback
+            // (ex. what's the expected behavior when there is an error during
+            // bulkload?)
             TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
                     appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(),
                     splitsAndConstraint.first, typeTraits, comparatorFactories, bloomFilterKeyFields, fieldPermutation,
@@ -829,7 +832,7 @@
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = splitProviderAndPartitionConstraintsForInternalOrFeedDataset(
                     dataSource.getId().getDataverseName(), datasetName, indexName);
 
-            //prepare callback
+            // prepare callback
             JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
             int datasetId = dataset.getDatasetId();
             int[] primaryKeyFields = new int[numKeys];
@@ -1022,7 +1025,7 @@
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = splitProviderAndPartitionConstraintsForInternalOrFeedDataset(
                     dataverseName, datasetName, indexName);
 
-            //prepare callback
+            // prepare callback
             JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
             int datasetId = dataset.getDatasetId();
             int[] primaryKeyFields = new int[primaryKeys.size()];
@@ -1137,7 +1140,8 @@
             tokenComparatorFactories[0] = NonTaggedFormatUtil.getTokenBinaryComparatorFactory(secondaryKeyType);
             tokenTypeTraits[0] = NonTaggedFormatUtil.getTokenTypeTrait(secondaryKeyType);
             if (isPartitioned) {
-                // The partitioning field is hardcoded to be a short *without* an Asterix type tag.
+                // The partitioning field is hardcoded to be a short *without*
+                // an Asterix type tag.
                 tokenComparatorFactories[1] = PointableBinaryComparatorFactory.of(ShortPointable.FACTORY);
                 tokenTypeTraits[1] = ShortPointable.TYPE_TRAITS;
             }
@@ -1148,7 +1152,7 @@
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = splitProviderAndPartitionConstraintsForInternalOrFeedDataset(
                     dataverseName, datasetName, indexName);
 
-            //prepare callback
+            // prepare callback
             JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
             int datasetId = dataset.getDatasetId();
             int[] primaryKeyFields = new int[primaryKeys.size()];
@@ -1243,7 +1247,7 @@
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = splitProviderAndPartitionConstraintsForInternalOrFeedDataset(
                     dataverseName, datasetName, indexName);
 
-            //prepare callback
+            // prepare callback
             JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
             int datasetId = dataset.getDatasetId();
             int[] primaryKeyFields = new int[numPrimaryKeys];
@@ -1337,8 +1341,13 @@
                 continue;
             }
             for (int i = 0; i < nodeStores.length; i++) {
-                File f = new File(nodeStores[i] + File.separator + relPathFile);
-                splits.add(new FileSplit(node, new FileReference(f)));
+                int numIODevices = AsterixClusterProperties.INSTANCE.getNumberOfIODevices(node);
+                for (int j = 0; j < nodeStores.length; j++) {
+                    for (int k = 0; k < numIODevices; k++) {
+                        File f = new File(nodeStores[j] + File.separator + relPathFile);
+                        splits.add(new FileSplit(node, new FileReference(f), k));
+                    }
+                }
             }
         }
         return splits.toArray(new FileSplit[] {});
@@ -1419,21 +1428,6 @@
         return dataverseName + File.separator + fileName;
     }
 
-    public Pair<IFileSplitProvider, IFileSplitProvider> getInvertedIndexFileSplitProviders(
-            IFileSplitProvider splitProvider) {
-        int numSplits = splitProvider.getFileSplits().length;
-        FileSplit[] btreeSplits = new FileSplit[numSplits];
-        FileSplit[] invListsSplits = new FileSplit[numSplits];
-        for (int i = 0; i < numSplits; i++) {
-            String nodeName = splitProvider.getFileSplits()[i].getNodeName();
-            String path = splitProvider.getFileSplits()[i].getLocalFile().getFile().getPath();
-            btreeSplits[i] = new FileSplit(nodeName, path + "_$btree");
-            invListsSplits[i] = new FileSplit(nodeName, path + "_$invlists");
-        }
-        return new Pair<IFileSplitProvider, IFileSplitProvider>(new ConstantFileSplitProvider(btreeSplits),
-                new ConstantFileSplitProvider(invListsSplits));
-    }
-
     public Dataset findDataset(String dataverse, String dataset) throws AlgebricksException {
         try {
             return MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverse, dataset);
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMBTreeLocalResourceMetadata.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMBTreeLocalResourceMetadata.java
index 7ab0d10..c59d854 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMBTreeLocalResourceMetadata.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMBTreeLocalResourceMetadata.java
@@ -6,6 +6,7 @@
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
 import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
 import edu.uci.ics.hyracks.storage.am.common.api.IInMemoryFreePageManager;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexMetaDataFrameFactory;
 import edu.uci.ics.hyracks.storage.am.common.frames.LIFOMetaDataFrameFactory;
@@ -27,14 +28,27 @@
     private final int[] bloomFilterKeyFields;
     private final int memPageSize;
     private final int memNumPages;
+    private FileSplit[] fileSplits;
+    private int ioDeviceID;
 
     public LSMBTreeLocalResourceMetadata(ITypeTraits[] typeTraits, IBinaryComparatorFactory[] cmpFactories,
-            int[] bloomFilterKeyFields, boolean isPrimary, int memPageSize, int memNumPages) {
+            int[] bloomFilterKeyFields, boolean isPrimary, int memPageSize, int memNumPages, FileSplit[] fileSplits) {
         this.typeTraits = typeTraits;
         this.cmpFactories = cmpFactories;
         this.bloomFilterKeyFields = bloomFilterKeyFields;
         this.memPageSize = memPageSize;
         this.memNumPages = memNumPages;
+        this.fileSplits = fileSplits;
+    }
+
+    public LSMBTreeLocalResourceMetadata(ITypeTraits[] typeTraits, IBinaryComparatorFactory[] cmpFactories,
+            int[] bloomFilterKeyFields, boolean isPrimary, int memPageSize, int memNumPages, int ioDeviceID) {
+        this.typeTraits = typeTraits;
+        this.cmpFactories = cmpFactories;
+        this.bloomFilterKeyFields = bloomFilterKeyFields;
+        this.memPageSize = memPageSize;
+        this.memNumPages = memNumPages;
+        this.ioDeviceID = ioDeviceID;
     }
 
     @Override
@@ -45,13 +59,13 @@
                 memNumPages, new TransientFileMapManager());
         ITreeIndexMetaDataFrameFactory metaDataFrameFactory = new LIFOMetaDataFrameFactory();
         IInMemoryFreePageManager memFreePageManager = new InMemoryFreePageManager(memNumPages, metaDataFrameFactory);
-        LSMBTree lsmBTree = LSMBTreeUtils.createLSMTree(memBufferCache, memFreePageManager,
-                runtimeContextProvider.getIOManager(), file, runtimeContextProvider.getBufferCache(),
-                runtimeContextProvider.getFileMapManager(), typeTraits, cmpFactories, bloomFilterKeyFields,
-                runtimeContextProvider.getBloomFilterFalsePositiveRate(), runtimeContextProvider.getLSMMergePolicy(),
-                runtimeContextProvider.getLSMBTreeOperationTrackerFactory(),
-                runtimeContextProvider.getLSMIOScheduler(),
-                runtimeContextProvider.getLSMBTreeIOOperationCallbackProvider(), partition);
+        LSMBTree lsmBTree = LSMBTreeUtils.createLSMTree(memBufferCache, memFreePageManager, runtimeContextProvider
+                .getIOManager(), file, runtimeContextProvider.getBufferCache(), runtimeContextProvider
+                .getFileMapManager(), typeTraits, cmpFactories, bloomFilterKeyFields, runtimeContextProvider
+                .getBloomFilterFalsePositiveRate(), runtimeContextProvider.getLSMMergePolicy(), runtimeContextProvider
+                .getLSMBTreeOperationTrackerFactory(), runtimeContextProvider.getLSMIOScheduler(),
+                runtimeContextProvider.getLSMBTreeIOOperationCallbackProvider(), fileSplits == null ? ioDeviceID
+                        : fileSplits[partition].getIODeviceId());
         return lsmBTree;
     }
 
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMInvertedIndexLocalResourceMetadata.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMInvertedIndexLocalResourceMetadata.java
index 3bcb747..c655770 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMInvertedIndexLocalResourceMetadata.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMInvertedIndexLocalResourceMetadata.java
@@ -4,6 +4,7 @@
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
 import edu.uci.ics.hyracks.storage.am.common.api.IInMemoryFreePageManager;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexMetaDataFrameFactory;
 import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
@@ -28,11 +29,12 @@
     private final int memPageSize;
     private final int memNumPages;
     private final boolean isPartitioned;
+    private final FileSplit[] fileSplits;
 
     public LSMInvertedIndexLocalResourceMetadata(ITypeTraits[] invListTypeTraits,
             IBinaryComparatorFactory[] invListCmpFactories, ITypeTraits[] tokenTypeTraits,
             IBinaryComparatorFactory[] tokenCmpFactories, IBinaryTokenizerFactory tokenizerFactory, int memPageSize,
-            int memNumPages, boolean isPartitioned) {
+            int memNumPages, boolean isPartitioned, FileSplit[] fileSplits) {
         this.invListTypeTraits = invListTypeTraits;
         this.invListCmpFactories = invListCmpFactories;
         this.tokenTypeTraits = tokenTypeTraits;
@@ -41,6 +43,7 @@
         this.memPageSize = memPageSize;
         this.memNumPages = memNumPages;
         this.isPartitioned = isPartitioned;
+        this.fileSplits = fileSplits;
     }
 
     @Override
@@ -62,7 +65,8 @@
                         runtimeContextProvider.getLSMMergePolicy(),
                         runtimeContextProvider.getLSMInvertedIndexOperationTrackerFactory(),
                         runtimeContextProvider.getLSMIOScheduler(),
-                        runtimeContextProvider.getLSMInvertedIndexIOOperationCallbackProvider(), partition);
+                        runtimeContextProvider.getLSMInvertedIndexIOOperationCallbackProvider(),
+                        fileSplits[partition].getIODeviceId());
             } else {
                 return InvertedIndexUtils.createLSMInvertedIndex(memBufferCache, memFreePageManager,
                         runtimeContextProvider.getFileMapManager(), invListTypeTraits, invListCmpFactories,
@@ -72,7 +76,8 @@
                         runtimeContextProvider.getLSMMergePolicy(),
                         runtimeContextProvider.getLSMInvertedIndexOperationTrackerFactory(),
                         runtimeContextProvider.getLSMIOScheduler(),
-                        runtimeContextProvider.getLSMInvertedIndexIOOperationCallbackProvider(), partition);
+                        runtimeContextProvider.getLSMInvertedIndexIOOperationCallbackProvider(),
+                        fileSplits[partition].getIODeviceId());
             }
         } catch (IndexException e) {
             throw new HyracksDataException(e);
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMRTreeLocalResourceMetadata.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMRTreeLocalResourceMetadata.java
index ff26c54..6c806c8 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMRTreeLocalResourceMetadata.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMRTreeLocalResourceMetadata.java
@@ -8,6 +8,7 @@
 import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
 import edu.uci.ics.hyracks.storage.am.common.api.IInMemoryFreePageManager;
 import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexMetaDataFrameFactory;
@@ -33,11 +34,12 @@
     private final ILinearizeComparatorFactory linearizeCmpFactory;
     private final int memPageSize;
     private final int memNumPages;
+    private final FileSplit[] fileSplits;
 
     public LSMRTreeLocalResourceMetadata(ITypeTraits[] typeTraits, IBinaryComparatorFactory[] rtreeCmpFactories,
             IBinaryComparatorFactory[] btreeCmpFactories, IPrimitiveValueProviderFactory[] valueProviderFactories,
             RTreePolicyType rtreePolicyType, ILinearizeComparatorFactory linearizeCmpFactory, int memPageSize,
-            int memNumPages) {
+            int memNumPages, FileSplit[] fileSplits) {
         this.typeTraits = typeTraits;
         this.rtreeCmpFactories = rtreeCmpFactories;
         this.btreeCmpFactories = btreeCmpFactories;
@@ -46,6 +48,7 @@
         this.linearizeCmpFactory = linearizeCmpFactory;
         this.memPageSize = memPageSize;
         this.memNumPages = memNumPages;
+        this.fileSplits = fileSplits;
     }
 
     @Override
@@ -66,7 +69,8 @@
                     runtimeContextProvider.getLSMMergePolicy(),
                     runtimeContextProvider.getLSMRTreeOperationTrackerFactory(),
                     runtimeContextProvider.getLSMIOScheduler(),
-                    runtimeContextProvider.getLSMRTreeIOOperationCallbackProvider(), linearizeCmpFactory, partition);
+                    runtimeContextProvider.getLSMRTreeIOOperationCallbackProvider(), linearizeCmpFactory,
+                    fileSplits[partition].getIODeviceId());
         } catch (TreeIndexException e) {
             throw new HyracksDataException(e);
         }
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index 01dce6c..275e42e 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -28,8 +28,8 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import edu.uci.ics.asterix.transaction.management.service.logging.LogManager;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.IODeviceHandle;
 import edu.uci.ics.hyracks.storage.common.file.ILocalResourceRepository;
 import edu.uci.ics.hyracks.storage.common.file.LocalResource;
 import edu.uci.ics.hyracks.storage.common.file.ResourceIdFactory;
@@ -37,25 +37,29 @@
 public class PersistentLocalResourceRepository implements ILocalResourceRepository {
 
     private static final Logger LOGGER = Logger.getLogger(PersistentLocalResourceRepository.class.getName());
-    private final String mountPoint;
+    private final String[] mountPoints;
     private static final String ROOT_METADATA_DIRECTORY = "asterix_root_metadata/";
     private static final String ROOT_METADATA_FILE_NAME_PREFIX = ".asterix_root_metadata_";
     private static final long ROOT_LOCAL_RESOURCE_ID = -4321;
     private static final String METADATA_FILE_NAME = ".metadata";
     private Map<String, LocalResource> name2ResourceMap = new HashMap<String, LocalResource>();
     private Map<Long, LocalResource> id2ResourceMap = new HashMap<Long, LocalResource>();
-    private String rootMetadataFileName;
-    private String rootDir;
+    private final int numIODevices;
 
-    public PersistentLocalResourceRepository(String mountPoint) throws HyracksDataException {
-        File mountPointDir = new File(mountPoint);
-        if (!mountPointDir.exists()) {
-            throw new HyracksDataException(mountPointDir.getAbsolutePath() + "doesn't exist.");
-        }
-        if (!mountPoint.endsWith(System.getProperty("file.separator"))) {
-            this.mountPoint = new String(mountPoint + System.getProperty("file.separator"));
-        } else {
-            this.mountPoint = new String(mountPoint);
+    public PersistentLocalResourceRepository(List<IODeviceHandle> devices) throws HyracksDataException {
+        numIODevices = devices.size();
+        this.mountPoints = new String[numIODevices];
+        for (int i = 0; i < numIODevices; i++) {
+            String mountPoint = devices.get(i).getPath().getPath();
+            File mountPointDir = new File(mountPoint);
+            if (!mountPointDir.exists()) {
+                throw new HyracksDataException(mountPointDir.getAbsolutePath() + "doesn't exist.");
+            }
+            if (!mountPoint.endsWith(System.getProperty("file.separator"))) {
+                mountPoints[i] = new String(mountPoint + System.getProperty("file.separator"));
+            } else {
+                mountPoints[i] = new String(mountPoint);
+            }
         }
     }
 
@@ -64,53 +68,40 @@
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Initializing local resource repository ... ");
         }
-        LocalResource rootLocalResource = null;
 
-        //#. if the rootMetadataFile doesn't exist, create it and return.
-        rootMetadataFileName = new String(mountPoint + ROOT_METADATA_DIRECTORY + ROOT_METADATA_FILE_NAME_PREFIX
-                + nodeId);
-        File rootMetadataFile = new File(rootMetadataFileName);
         if (isNewUniverse) {
-            File rootMetadataDir = new File(mountPoint + ROOT_METADATA_DIRECTORY);
-            if (!rootMetadataDir.exists()) {
-                rootMetadataDir.mkdir();
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("created the root-metadata-file's directory: " + rootMetadataDir.getAbsolutePath());
+            //#. if the rootMetadataFile doesn't exist, create it and return.
+            for (int i = 0; i < numIODevices; i++) {
+                String rootMetadataFileName = new String(mountPoints[i] + ROOT_METADATA_DIRECTORY
+                        + ROOT_METADATA_FILE_NAME_PREFIX + nodeId);
+                File rootMetadataFile = new File(rootMetadataFileName);
+
+                File rootMetadataDir = new File(mountPoints[i] + ROOT_METADATA_DIRECTORY);
+                if (!rootMetadataDir.exists()) {
+                    rootMetadataDir.mkdir();
+                    if (LOGGER.isLoggable(Level.INFO)) {
+                        LOGGER.info("created the root-metadata-file's directory: " + rootMetadataDir.getAbsolutePath());
+                    }
                 }
-            }
 
-            rootMetadataFile.delete();
-            if (rootDir.startsWith(System.getProperty("file.separator"))) {
-                this.rootDir = new String(mountPoint + rootDir.substring(System.getProperty("file.separator").length()));
-            } else {
-                this.rootDir = new String(mountPoint + rootDir);
-            }
-            rootLocalResource = new LocalResource(ROOT_LOCAL_RESOURCE_ID, rootMetadataFileName, 0, 0, this.rootDir);
-            insert(rootLocalResource);
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("created the root-metadata-file: " + rootMetadataFileName);
-            }
-            
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Completed the initialization of the local resource repository");
-            }
-            return;
-        }
+                rootMetadataFile.delete();
+                String mountedRootDir;
+                if (rootDir.startsWith(System.getProperty("file.separator"))) {
+                    mountedRootDir = new String(mountPoints[i]
+                            + rootDir.substring(System.getProperty("file.separator").length()));
+                } else {
+                    mountedRootDir = new String(mountPoints[i] + rootDir);
+                }
+                LocalResource rootLocalResource = new LocalResource(ROOT_LOCAL_RESOURCE_ID, rootMetadataFileName, 0, 0,
+                        mountedRootDir);
+                insert(rootLocalResource, i);
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("created the root-metadata-file: " + rootMetadataFileName);
+                }
 
-        //#. if the rootMetadataFile exists, read it and set this.rootDir.
-        rootLocalResource = readLocalResource(rootMetadataFile);
-        this.rootDir = (String) rootLocalResource.getResourceObject();
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("The root directory of the local resource repository is " + this.rootDir);
-        }
-
-        //#. load all local resources. 
-        File rootDirFile = new File(this.rootDir);
-        if (!rootDirFile.exists()) {
-            //rootDir may not exist if this node is not the metadata node and doesn't have any user data.
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("The root directory of the local resource repository doesn't exist: there is no local resource.");
-                LOGGER.info("Completed the initialization of the local resource repository");
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("Completed the initialization of the local resource repository");
+                }
             }
             return;
         }
@@ -125,27 +116,50 @@
             }
         };
 
-        long maxResourceId = 0;
-        File[] dataverseFileList = rootDirFile.listFiles();
-        if (dataverseFileList == null) {
-            throw new HyracksDataException("Metadata dataverse doesn't exist.");
-        }
-        for (File dataverseFile : dataverseFileList) {
-            if (dataverseFile.isDirectory()) {
-                File[] indexFileList = dataverseFile.listFiles();
-                if (indexFileList != null) {
-                    for (File indexFile : indexFileList) {
-                        if (indexFile.isDirectory()) {
-                            File[] metadataFiles = indexFile.listFiles(filter);
-                            if (metadataFiles != null) {
-                                for (File metadataFile : metadataFiles) {
-                                    LocalResource localResource = readLocalResource(metadataFile);
-                                    id2ResourceMap.put(localResource.getResourceId(), localResource);
-                                    name2ResourceMap.put(localResource.getResourceName(), localResource);
-                                    maxResourceId = Math.max(localResource.getResourceId(), maxResourceId);
-                                    if (LOGGER.isLoggable(Level.INFO)) {
-                                        LOGGER.info("loaded local resource - [id: " + localResource.getResourceId()
-                                                + ", name: " + localResource.getResourceName() + "]");
+        for (int i = 0; i < numIODevices; i++) {
+            String rootMetadataFileName = new String(mountPoints[i] + ROOT_METADATA_DIRECTORY
+                    + ROOT_METADATA_FILE_NAME_PREFIX + nodeId);
+            File rootMetadataFile = new File(rootMetadataFileName);
+            //#. if the rootMetadataFile exists, read it and set this.rootDir.
+            LocalResource rootLocalResource = readLocalResource(rootMetadataFile);
+            String mountedRootDir = (String) rootLocalResource.getResourceObject();
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("The root directory of the local resource repository is " + mountedRootDir);
+            }
+
+            //#. load all local resources. 
+            File rootDirFile = new File(mountedRootDir);
+            if (!rootDirFile.exists()) {
+                //rootDir may not exist if this node is not the metadata node and doesn't have any user data.
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("The root directory of the local resource repository doesn't exist: there is no local resource.");
+                    LOGGER.info("Completed the initialization of the local resource repository");
+                }
+                continue;
+            }
+
+            long maxResourceId = 0;
+            File[] dataverseFileList = rootDirFile.listFiles();
+            if (dataverseFileList == null) {
+                throw new HyracksDataException("Metadata dataverse doesn't exist.");
+            }
+            for (File dataverseFile : dataverseFileList) {
+                if (dataverseFile.isDirectory()) {
+                    File[] indexFileList = dataverseFile.listFiles();
+                    if (indexFileList != null) {
+                        for (File indexFile : indexFileList) {
+                            if (indexFile.isDirectory()) {
+                                File[] metadataFiles = indexFile.listFiles(filter);
+                                if (metadataFiles != null) {
+                                    for (File metadataFile : metadataFiles) {
+                                        LocalResource localResource = readLocalResource(metadataFile);
+                                        id2ResourceMap.put(localResource.getResourceId(), localResource);
+                                        name2ResourceMap.put(localResource.getResourceName(), localResource);
+                                        maxResourceId = Math.max(localResource.getResourceId(), maxResourceId);
+                                        if (LOGGER.isLoggable(Level.INFO)) {
+                                            LOGGER.info("loaded local resource - [id: " + localResource.getResourceId()
+                                                    + ", name: " + localResource.getResourceName() + "]");
+                                        }
                                     }
                                 }
                             }
@@ -153,11 +167,11 @@
                     }
                 }
             }
-        }
-        resourceIdFactory.initId(maxResourceId + 1);
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("The resource id factory is intialized with the value: " + (maxResourceId + 1));
-            LOGGER.info("Completed the initialization of the local resource repository");
+            resourceIdFactory.initId(maxResourceId + 1);
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("The resource id factory is intialized with the value: " + (maxResourceId + 1));
+                LOGGER.info("Completed the initialization of the local resource repository");
+            }
         }
     }
 
@@ -172,7 +186,7 @@
     }
 
     @Override
-    public synchronized void insert(LocalResource resource) throws HyracksDataException {
+    public synchronized void insert(LocalResource resource, int ioDeviceId) throws HyracksDataException {
         long id = resource.getResourceId();
 
         if (id2ResourceMap.containsKey(id)) {
@@ -186,8 +200,10 @@
 
         FileOutputStream fos = null;
         ObjectOutputStream oosToFos = null;
+
         try {
-            fos = new FileOutputStream(getFileName(mountPoint, resource.getResourceName(), resource.getResourceId()));
+            fos = new FileOutputStream(getFileName(mountPoints[ioDeviceId], resource.getResourceName(),
+                    resource.getResourceId()));
             oosToFos = new ObjectOutputStream(fos);
             oosToFos.writeObject(resource);
             oosToFos.flush();
@@ -212,26 +228,26 @@
     }
 
     @Override
-    public synchronized void deleteResourceById(long id) throws HyracksDataException {
+    public synchronized void deleteResourceById(long id, int ioDeviceId) throws HyracksDataException {
         LocalResource resource = id2ResourceMap.get(id);
         if (resource == null) {
             throw new HyracksDataException("Resource doesn't exist");
         }
         id2ResourceMap.remove(id);
         name2ResourceMap.remove(resource.getResourceName());
-        File file = new File(getFileName(mountPoint, resource.getResourceName(), resource.getResourceId()));
+        File file = new File(getFileName(mountPoints[ioDeviceId], resource.getResourceName(), resource.getResourceId()));
         file.delete();
     }
 
     @Override
-    public synchronized void deleteResourceByName(String name) throws HyracksDataException {
+    public synchronized void deleteResourceByName(String name, int ioDeviceId) throws HyracksDataException {
         LocalResource resource = name2ResourceMap.get(name);
         if (resource == null) {
             throw new HyracksDataException("Resource doesn't exist");
         }
         id2ResourceMap.remove(resource.getResourceId());
         name2ResourceMap.remove(name);
-        File file = new File(getFileName(mountPoint, resource.getResourceName(), resource.getResourceId()));
+        File file = new File(getFileName(mountPoints[ioDeviceId], resource.getResourceName(), resource.getResourceId()));
         file.delete();
     }
 
@@ -286,4 +302,4 @@
             }
         }
     }
-}
+}
\ No newline at end of file
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/PersistentLocalResourceRepositoryFactory.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/PersistentLocalResourceRepositoryFactory.java
index c7efca5..f6847f9 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/PersistentLocalResourceRepositoryFactory.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/PersistentLocalResourceRepositoryFactory.java
@@ -14,11 +14,8 @@
  */
 package edu.uci.ics.asterix.transaction.management.resource;
 
-import java.util.List;
-
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.io.IIOManager;
-import edu.uci.ics.hyracks.api.io.IODeviceHandle;
 import edu.uci.ics.hyracks.storage.common.file.ILocalResourceRepository;
 import edu.uci.ics.hyracks.storage.common.file.ILocalResourceRepositoryFactory;
 
@@ -31,7 +28,6 @@
 
     @Override
     public ILocalResourceRepository createRepository() throws HyracksDataException {
-        List<IODeviceHandle> devices = ioManager.getIODevices();
-        return new PersistentLocalResourceRepository(devices.get(0).getPath().getPath());
+        return new PersistentLocalResourceRepository(ioManager.getIODevices());
     }
-}
+}
\ No newline at end of file