fix merge issues
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryBTreeCreator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryBTreeCreator.java
index 144131f..baf16de 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryBTreeCreator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryBTreeCreator.java
@@ -112,11 +112,12 @@
 			// If any of the secondary fields are nullable, then add a select op that filters nulls.
 			AlgebricksMetaOperatorDescriptor selectOp = null;
 			if (anySecondaryKeyIsNullable) {
-				selectOp = createFilterNullsSelectOp(spec, numSecondaryKeys,RIDScanOpAndConstraints.second);
+				selectOp = createFilterNullsSelectOp(spec, numSecondaryKeys);
 			}
 
 			// Sort by secondary keys.
 			ExternalSortOperatorDescriptor sortOp = createSortOp(spec, secondaryComparatorFactories, secondaryRecDesc,RIDScanOpAndConstraints.second);
+			AsterixStorageProperties storageProperties = propertiesProvider.getStorageProperties();
 			// Create secondary BTree bulk load op.
         TreeIndexBulkLoadOperatorDescriptor secondaryBulkLoadOp = createTreeIndexBulkLoadOp(
                 spec,
@@ -199,3 +200,4 @@
 		}
 	}
 }
+
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java
index 60a4451..5da336f 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java
@@ -19,6 +19,9 @@
 import java.io.IOException;
 import java.util.List;
 
+import edu.uci.ics.asterix.external.adapter.factory.HDFSAdapterFactory;
+import edu.uci.ics.asterix.external.data.operator.ExternalDataIndexingOperatorDescriptor;
+import edu.uci.ics.asterix.external.dataset.adapter.HiveAdapter;
 import edu.uci.ics.asterix.common.config.AsterixStorageProperties;
 import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
 import edu.uci.ics.asterix.common.config.IAsterixPropertiesProvider;
@@ -28,22 +31,18 @@
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
 import edu.uci.ics.asterix.common.transactions.IRecoveryManager.ResourceType;
 import edu.uci.ics.asterix.common.transactions.JobId;
-import edu.uci.ics.asterix.external.adapter.factory.HDFSAdapterFactory;
-import edu.uci.ics.asterix.external.adapter.factory.HiveAdapterFactory;
-import edu.uci.ics.asterix.external.data.operator.ExternalDataIndexingOperatorDescriptor;
-import edu.uci.ics.asterix.external.dataset.adapter.HiveAdapter;
 import edu.uci.ics.asterix.formats.nontagged.AqlBinaryBooleanInspectorImpl;
 import edu.uci.ics.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
 import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
 import edu.uci.ics.asterix.formats.nontagged.AqlTypeTraitProvider;
+import edu.uci.ics.asterix.metadata.entities.ExternalDatasetDetails;
 import edu.uci.ics.asterix.metadata.MetadataException;
 import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
 import edu.uci.ics.asterix.metadata.entities.Dataset;
-import edu.uci.ics.asterix.metadata.entities.ExternalDatasetDetails;
 import edu.uci.ics.asterix.metadata.entities.Index;
 import edu.uci.ics.asterix.metadata.utils.DatasetUtils;
-import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.asterix.om.types.IAType;
 import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
 import edu.uci.ics.asterix.runtime.evaluators.functions.AndDescriptor;
@@ -51,7 +50,6 @@
 import edu.uci.ics.asterix.runtime.evaluators.functions.NotDescriptor;
 import edu.uci.ics.asterix.runtime.job.listener.JobEventListenerFactory;
 import edu.uci.ics.asterix.transaction.management.opcallbacks.PrimaryIndexInstantSearchOperationCallbackFactory;
-import edu.uci.ics.asterix.runtime.formats.NonTaggedDataFormat;
 import edu.uci.ics.asterix.transaction.management.opcallbacks.PrimaryIndexOperationTrackerProvider;
 import edu.uci.ics.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
 import edu.uci.ics.asterix.transaction.management.service.transaction.JobIdFactory;
@@ -85,6 +83,7 @@
 import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
 import edu.uci.ics.hyracks.dataflow.std.misc.ConstantTupleSourceOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
+import edu.uci.ics.asterix.runtime.formats.NonTaggedDataFormat;
 import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
@@ -92,6 +91,7 @@
 import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeDataflowHelperFactory;
 
+
 @SuppressWarnings("rawtypes")
 // TODO: We should eventually have a hierarchy of classes that can create all
 // possible index job specs,
@@ -109,11 +109,11 @@
     protected ISerializerDeserializer payloadSerde;
     protected IFileSplitProvider primaryFileSplitProvider;
     protected AlgebricksPartitionConstraint primaryPartitionConstraint;
+    protected List<String> secondaryKeyFields;
     protected IFileSplitProvider secondaryFileSplitProvider;
     protected AlgebricksPartitionConstraint secondaryPartitionConstraint;
     protected String secondaryIndexName;
     protected boolean anySecondaryKeyIsNullable = false;
-
     protected long numElementsHint;
     protected IBinaryComparatorFactory[] primaryComparatorFactories;
     protected int[] primaryBloomFilterKeyFields;
@@ -122,7 +122,6 @@
     protected int[] secondaryBloomFilterKeyFields;
     protected RecordDescriptor secondaryRecDesc;
     protected ICopyEvaluatorFactory[] secondaryFieldAccessEvalFactories;
-
     protected IAsterixPropertiesProvider propertiesProvider;
 
     // Prevent public construction. Should be created via createIndexCreator().
@@ -166,37 +165,68 @@
     public abstract JobSpecification buildLoadingJobSpec() throws AsterixException, AlgebricksException;
 
     protected void init(CompiledCreateIndexStatement createIndexStmt, AqlMetadataProvider metadataProvider)
-            throws AsterixException, AlgebricksException {
-        this.metadataProvider = metadataProvider;
-        dataverseName = createIndexStmt.getDataverseName() == null ? metadataProvider.getDefaultDataverseName()
-                : createIndexStmt.getDataverseName();
-        datasetName = createIndexStmt.getDatasetName();
-        secondaryIndexName = createIndexStmt.getIndexName();
-        dataset = metadataProvider.findDataset(dataverseName, datasetName);
-        if (dataset == null) {
-            throw new AsterixException("Unknown dataset " + datasetName);
+                        throws AsterixException, AlgebricksException {
+                this.metadataProvider = metadataProvider;
+                dataverseName = createIndexStmt.getDataverseName() == null ? metadataProvider.getDefaultDataverseName()
+                                : createIndexStmt.getDataverseName();
+                datasetName = createIndexStmt.getDatasetName();
+                secondaryIndexName = createIndexStmt.getIndexName();
+                dataset = metadataProvider.findDataset(dataverseName, datasetName);
+                if (dataset == null) {
+                        throw new AsterixException("Unknown dataset " + datasetName);
+                }
+                if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
+                        //get external dataset details
+                        ExternalDatasetDetails edsd = ((ExternalDatasetDetails)dataset.getDatasetDetails());
+                        //get adapter name
+                        String adapter = edsd.getAdapter();
+                        //if not an hdfs adapter, throw an exception
+                        if(!adapter.equals(HDFSAdapterFactory.HDFS_ADAPTER_NAME) && !adapter.equals(HiveAdapter.class.getName()))
+                        {
+                                throw new AsterixException("Cannot index an external dataset with adapter type(" + adapter + ").");
+                        }
+                        //get the item type
+                        ARecordType externalItemType = (ARecordType) metadataProvider.findType(dataset.getDataverseName(), dataset.getItemTypeName());
+                        //number of primary keys here depends on the file input, 3 for rcfiles and 2 for text and sequence files.
+                        numPrimaryKeys = DatasetUtils.getExternalRIDSize(dataset);
+                        itemType = createExternalItemTypeWithRID(externalItemType);
+                        payloadSerde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(itemType);
+                        numSecondaryKeys = createIndexStmt.getKeyFields().size();
+                        //splits and constraints <--They don't exist-->
+                        primaryFileSplitProvider = null;
+                        primaryPartitionConstraint = null;
+                        //create secondary split and constraints
+                        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint = metadataProvider
+                                        .splitProviderAndPartitionConstraintsForExternalDataset(dataverseName, datasetName,
+                                                        secondaryIndexName);
+                        secondaryFileSplitProvider = secondarySplitsAndConstraint.first;
+                        secondaryPartitionConstraint = secondarySplitsAndConstraint.second;
+                        // Must be called in this order.
+                        setExternalRIDDescAndComparators();
+                        setExternalSecondaryRecDescAndComparators(createIndexStmt, metadataProvider);
+                        numElementsHint = metadataProvider.getCardinalityPerPartitionHint(dataset);
+                }
+                else
+                {
+                        itemType = (ARecordType) metadataProvider.findType(dataset.getDataverseName(), dataset.getItemTypeName());
+                        payloadSerde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(itemType);
+                        numPrimaryKeys = DatasetUtils.getPartitioningKeys(dataset).size();
+                        numSecondaryKeys = createIndexStmt.getKeyFields().size();
+                        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primarySplitsAndConstraint = metadataProvider
+                                        .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(dataverseName, datasetName, datasetName);
+                        primaryFileSplitProvider = primarySplitsAndConstraint.first;
+                        primaryPartitionConstraint = primarySplitsAndConstraint.second;
+                        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint = metadataProvider
+                                        .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(dataverseName, datasetName,
+                                                        secondaryIndexName);
+                        secondaryFileSplitProvider = secondarySplitsAndConstraint.first;
+                        secondaryPartitionConstraint = secondarySplitsAndConstraint.second;
+                        // Must be called in this order.
+                        setPrimaryRecDescAndComparators();
+                        setSecondaryRecDescAndComparators(createIndexStmt, metadataProvider);
+                        numElementsHint = metadataProvider.getCardinalityPerPartitionHint(dataset);
+                }
         }
-        if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
-            throw new AsterixException("Cannot index an external dataset (" + datasetName + ").");
-        }
-        itemType = (ARecordType) metadataProvider.findType(dataset.getDataverseName(), dataset.getItemTypeName());
-        payloadSerde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(itemType);
-        numPrimaryKeys = DatasetUtils.getPartitioningKeys(dataset).size();
-        numSecondaryKeys = createIndexStmt.getKeyFields().size();
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primarySplitsAndConstraint = metadataProvider
-                .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(dataverseName, datasetName, datasetName);
-        primaryFileSplitProvider = primarySplitsAndConstraint.first;
-        primaryPartitionConstraint = primarySplitsAndConstraint.second;
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint = metadataProvider
-                .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(dataverseName, datasetName,
-                        secondaryIndexName);
-        secondaryFileSplitProvider = secondarySplitsAndConstraint.first;
-        secondaryPartitionConstraint = secondarySplitsAndConstraint.second;
-        // Must be called in this order.
-        setPrimaryRecDescAndComparators();
-        setSecondaryRecDescAndComparators(createIndexStmt, metadataProvider);
-        numElementsHint = metadataProvider.getCardinalityPerPartitionHint(dataset);
-    }
 
     protected void setPrimaryRecDescAndComparators() throws AlgebricksException {
         List<String> partitioningKeys = DatasetUtils.getPartitioningKeys(dataset);
@@ -280,23 +310,235 @@
         return keyProviderOp;
     }
 
+protected ARecordType createExternalItemTypeWithRID(
+                        ARecordType externalItemType) throws AsterixException {
+
+                String[] fieldsNames = new String[externalItemType.getFieldNames().length+numPrimaryKeys];
+                IAType[] fieldsTypes = new IAType[externalItemType.getFieldTypes().length+numPrimaryKeys];
+
+                //add RID fields names and types
+                if(AqlMetadataProvider.isOptimizeExternalIndexes())
+                {
+                        fieldsNames[0] = "_file-number";
+                        fieldsTypes[0] = BuiltinType.AINT32;
+                }
+                else
+                {
+                        fieldsNames[0] = "_file-name";
+                        fieldsTypes[0] = BuiltinType.ASTRING;
+                }
+                fieldsNames[1] = "_byte-location";
+                fieldsTypes[1] = BuiltinType.AINT64;
+                if(numPrimaryKeys == 3)
+                {       
+                        //add the row number for rc files
+                        fieldsNames[2] = "_row-number";
+                        fieldsTypes[2] = BuiltinType.AINT32;
+                }
+                
+                //add the original fields names and types
+                for(int i=0; i < externalItemType.getFieldNames().length; i++)
+                {
+                        fieldsNames[i+numPrimaryKeys] = externalItemType.getFieldNames()[i];
+                        fieldsTypes[i+numPrimaryKeys] = externalItemType.getFieldTypes()[i];
+                }
+                return new ARecordType(externalItemType.getTypeName(), fieldsNames, fieldsTypes, externalItemType.isOpen());
+        }
+
+        protected void setExternalRIDDescAndComparators() throws AlgebricksException {
+
+                ISerializerDeserializer[] externalRecFields = new ISerializerDeserializer[itemType.getFieldNames().length];
+                ITypeTraits[] externalTypeTraits = new ITypeTraits[itemType.getFieldNames().length];
+
+                primaryComparatorFactories = new IBinaryComparatorFactory[numPrimaryKeys];
+                primaryBloomFilterKeyFields = new int[numPrimaryKeys];
+                ISerializerDeserializerProvider serdeProvider = metadataProvider.getFormat().getSerdeProvider();
+                
+                if(AqlMetadataProvider.isOptimizeExternalIndexes())
+                {
+                        primaryComparatorFactories[0] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(BuiltinType.AINT32, true);
+                }
+                else
+                {
+                        primaryComparatorFactories[0] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(BuiltinType.ASTRING, true);
+                }
+                primaryComparatorFactories[1] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(BuiltinType.AINT64, true);
+
+                primaryBloomFilterKeyFields[0]=0;
+                primaryBloomFilterKeyFields[1]=1;
+
+                if(numPrimaryKeys == 3)
+                {
+                        primaryComparatorFactories[2] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(BuiltinType.AINT32, true);
+                        primaryBloomFilterKeyFields[2]=2;
+                }
+
+                for(int i=0; i < itemType.getFieldNames().length; i++)
+                {
+                        externalRecFields[i] = serdeProvider.getSerializerDeserializer(itemType.getFieldTypes()[i]); 
+                        externalTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(itemType.getFieldTypes()[i]);
+                }
+                primaryRecDesc = new RecordDescriptor(externalRecFields, externalTypeTraits);
+        }
+
+protected void setExternalSecondaryRecDescAndComparators(CompiledCreateIndexStatement createIndexStmt,
+                        AqlMetadataProvider metadataProvider) throws AlgebricksException, AsterixException {
+                secondaryKeyFields = createIndexStmt.getKeyFields();
+                secondaryFieldAccessEvalFactories = new ICopyEvaluatorFactory[numSecondaryKeys+ numPrimaryKeys];
+                secondaryComparatorFactories = new IBinaryComparatorFactory[numSecondaryKeys + numPrimaryKeys];
+                secondaryBloomFilterKeyFields = new int[numSecondaryKeys];
+                ISerializerDeserializer[] secondaryRecFields = new ISerializerDeserializer[numPrimaryKeys + numSecondaryKeys];
+                ITypeTraits[] secondaryTypeTraits = new ITypeTraits[numSecondaryKeys + numPrimaryKeys];
+                ISerializerDeserializerProvider serdeProvider = metadataProvider.getFormat().getSerdeProvider();
+                ITypeTraitProvider typeTraitProvider = metadataProvider.getFormat().getTypeTraitProvider();
+                IBinaryComparatorFactoryProvider comparatorFactoryProvider = metadataProvider.getFormat()
+                                .getBinaryComparatorFactoryProvider();
+
+                for (int i = 0; i < numSecondaryKeys; i++) {
+                        secondaryFieldAccessEvalFactories[i] = metadataProvider.getFormat().getFieldAccessEvaluatorFactory(
+                                        itemType, secondaryKeyFields.get(i), 0);
+                        Pair<IAType, Boolean> keyTypePair = Index.getNonNullableKeyFieldType(secondaryKeyFields.get(i), itemType);
+                        IAType keyType = keyTypePair.first;
+                        anySecondaryKeyIsNullable = anySecondaryKeyIsNullable || keyTypePair.second;
+                        ISerializerDeserializer keySerde = serdeProvider.getSerializerDeserializer(keyType);
+                        secondaryRecFields[i] = keySerde;
+                        secondaryComparatorFactories[i] = comparatorFactoryProvider.getBinaryComparatorFactory(keyType, true);
+                        secondaryTypeTraits[i] = typeTraitProvider.getTypeTrait(keyType);
+                        secondaryBloomFilterKeyFields[i] = i;
+                }
+
+                if(AqlMetadataProvider.isOptimizeExternalIndexes())
+                {
+                        secondaryFieldAccessEvalFactories[numSecondaryKeys] = metadataProvider.getFormat().getFieldAccessEvaluatorFactory(
+                                itemType, "_file-number", 0);
+                }
+                else
+                {
+                        secondaryFieldAccessEvalFactories[numSecondaryKeys] = metadataProvider.getFormat().getFieldAccessEvaluatorFactory(
+                                        itemType, "_file-name", 0);
+                }
+                secondaryFieldAccessEvalFactories[numSecondaryKeys+1] = metadataProvider.getFormat().getFieldAccessEvaluatorFactory(
+                                itemType, "_byte-location", 0);
+                if(numPrimaryKeys == 3)
+                {
+                        secondaryFieldAccessEvalFactories[numSecondaryKeys+2] = metadataProvider.getFormat().getFieldAccessEvaluatorFactory(
+                                        itemType, "_row-number", 0);
+                }
+
+                for (int i = 0; i < numPrimaryKeys; i++) {
+                        secondaryRecFields[numSecondaryKeys + i] = primaryRecDesc.getFields()[i];
+                        secondaryTypeTraits[numSecondaryKeys + i] = primaryRecDesc.getTypeTraits()[i];
+                        secondaryComparatorFactories[numSecondaryKeys + i] = primaryComparatorFactories[i];
+                }
+                secondaryRecDesc = new RecordDescriptor(secondaryRecFields, secondaryTypeTraits);
+        }
+
+protected Pair<ExternalDataIndexingOperatorDescriptor, AlgebricksPartitionConstraint> createExternalIndexingOp(JobSpecification spec) throws Exception {
+                Pair<ExternalDataIndexingOperatorDescriptor,AlgebricksPartitionConstraint> indexingOpAndConstraints = metadataProvider.buildExternalDataIndexingRuntime(spec, itemType, dataset, NonTaggedDataFormat.INSTANCE);
+                AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, indexingOpAndConstraints.first,
+                                indexingOpAndConstraints.second);
+                return indexingOpAndConstraints;
+        }
+
+protected AlgebricksMetaOperatorDescriptor createExternalAssignOp(JobSpecification spec) throws AlgebricksException {
+                int[] outColumns = new int[numSecondaryKeys + numPrimaryKeys];
+                int[] projectionList = new int[numSecondaryKeys + numPrimaryKeys];
+                for (int i = 0; i < numSecondaryKeys + numPrimaryKeys; i++) {
+                        outColumns[i] = i;
+                        projectionList[i] = i;
+                }
+
+                IScalarEvaluatorFactory[] sefs = new IScalarEvaluatorFactory[secondaryFieldAccessEvalFactories.length];
+                for (int i = 0; i < secondaryFieldAccessEvalFactories.length; ++i) {
+                        sefs[i] = new LogicalExpressionJobGenToExpressionRuntimeProviderAdapter.ScalarEvaluatorFactoryAdapter(
+                                        secondaryFieldAccessEvalFactories[i]);
+                }
+                AssignRuntimeFactory assign = new AssignRuntimeFactory(outColumns, sefs, projectionList);
+                AlgebricksMetaOperatorDescriptor asterixAssignOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 1,
+                                new IPushRuntimeFactory[] { assign }, new RecordDescriptor[] { secondaryRecDesc });
+                return asterixAssignOp;
+        }
+
+        protected ExternalSortOperatorDescriptor createSortOp(JobSpecification spec,
+                        IBinaryComparatorFactory[] secondaryComparatorFactories, RecordDescriptor secondaryRecDesc,
+                        AlgebricksPartitionConstraint partitionConstraints) {
+                int[] sortFields = new int[secondaryComparatorFactories.length];
+                for (int i = 0; i < secondaryComparatorFactories.length; i++) {
+                        sortFields[i] = i;
+                }
+                ExternalSortOperatorDescriptor sortOp = new ExternalSortOperatorDescriptor(spec,
+                                physOptConf.getMaxFramesExternalSort(), sortFields, secondaryComparatorFactories, secondaryRecDesc);
+                AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, sortOp, partitionConstraints);
+                return sortOp;
+        }
+
+protected ARecordType createSecondaryItemType(ARecordType externalItemType, boolean isRCFile) throws AsterixException
+        {
+
+                String[] fieldsNames = new String[numSecondaryKeys+numPrimaryKeys];
+                IAType[] fieldsTypes = new IAType[numSecondaryKeys+numPrimaryKeys];
+
+                //first create the secondary index fields
+                for(int i=0; i<numSecondaryKeys; i++)
+                {
+                        fieldsNames[i] = secondaryKeyFields.get(i);
+                        try {
+                                fieldsTypes[i] = externalItemType.getFieldType(fieldsNames[i]);
+                        } catch (IOException e) {
+                                // TODO Auto-generated catch block
+                                throw new AsterixException(e);
+                        }
+                }
+
+                //second add RID fields (File name or number and byte location)
+                if(AqlMetadataProvider.isOptimizeExternalIndexes())
+                {
+                        fieldsNames[numSecondaryKeys] = "_file-number";
+                        fieldsTypes[numSecondaryKeys] = BuiltinType.ASTRING;
+                }
+                else
+                {
+                        fieldsNames[numSecondaryKeys] = "_file-name";
+                        fieldsTypes[numSecondaryKeys] = BuiltinType.ASTRING;
+                }
+                fieldsNames[numSecondaryKeys+1] = "_byte-location";
+                fieldsTypes[numSecondaryKeys+1] = BuiltinType.AINT64;
+
+                if(isRCFile)
+                {
+                        fieldsNames[numSecondaryKeys+2] = "_row-Number";
+                        fieldsTypes[numSecondaryKeys+2] = BuiltinType.AINT32;
+                }
+
+                //return type
+                return new ARecordType(externalItemType.getTypeName(), fieldsNames, fieldsTypes, externalItemType.isOpen());
+        }
+
     protected BTreeSearchOperatorDescriptor createPrimaryIndexScanOp(JobSpecification spec) throws AlgebricksException {
         // -Infinity
         int[] lowKeyFields = null;
         // +Infinity
         int[] highKeyFields = null;
+        ITransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider();
+        JobId jobId = JobIdFactory.generateJobId();
+        metadataProvider.setJobId(jobId);
+        boolean isWriteTransaction = metadataProvider.isWriteTransaction();
+        IJobletEventListenerFactory jobEventListenerFactory = new JobEventListenerFactory(jobId, isWriteTransaction);
+        spec.setJobletEventListenerFactory(jobEventListenerFactory);
+
+        ISearchOperationCallbackFactory searchCallbackFactory = new PrimaryIndexInstantSearchOperationCallbackFactory(
+                jobId, dataset.getDatasetId(), primaryBloomFilterKeyFields, txnSubsystemProvider,
+                ResourceType.LSM_BTREE);
         AsterixStorageProperties storageProperties = propertiesProvider.getStorageProperties();
         BTreeSearchOperatorDescriptor primarySearchOp = new BTreeSearchOperatorDescriptor(spec, primaryRecDesc,
-                AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER, AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER,
+                AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
                 primaryFileSplitProvider, primaryRecDesc.getTypeTraits(), primaryComparatorFactories,
                 primaryBloomFilterKeyFields, lowKeyFields, highKeyFields, true, true,
                 new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
-                        AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER,
-                        new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
-                        AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER,
-                        AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER, storageProperties
-                                .getBloomFilterFalsePositiveRate()), false,
-                NoOpOperationCallbackFactory.INSTANCE);
+                        AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, new PrimaryIndexOperationTrackerProvider(
+                                dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                        AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                        storageProperties.getBloomFilterFalsePositiveRate()), false, searchCallbackFactory);
         AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, primarySearchOp,
                 primaryPartitionConstraint);
         return primarySearchOp;
@@ -349,11 +591,10 @@
             fieldPermutation[i] = i;
         }
         TreeIndexBulkLoadOperatorDescriptor treeIndexBulkLoadOp = new TreeIndexBulkLoadOperatorDescriptor(spec,
-                AsterixRuntimeComponentsProvider.LSMBTREE_SECONDARY_PROVIDER,
-                AsterixRuntimeComponentsProvider.LSMBTREE_SECONDARY_PROVIDER, secondaryFileSplitProvider,
-                secondaryRecDesc.getTypeTraits(), secondaryComparatorFactories, secondaryBloomFilterKeyFields,
-                fieldPermutation, fillFactor, false, numElementsHint, false, dataflowHelperFactory,
-                NoOpOperationCallbackFactory.INSTANCE);
+                AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                secondaryFileSplitProvider, secondaryRecDesc.getTypeTraits(), secondaryComparatorFactories,
+                secondaryBloomFilterKeyFields, fieldPermutation, fillFactor, false, numElementsHint, false,
+                dataflowHelperFactory, NoOpOperationCallbackFactory.INSTANCE);
         AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, treeIndexBulkLoadOp,
                 secondaryPartitionConstraint);
         return treeIndexBulkLoadOp;
@@ -392,3 +633,4 @@
         return asterixSelectOp;
     }
 }
+
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryRTreeCreator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryRTreeCreator.java
index c91903f..6ce694c 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryRTreeCreator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryRTreeCreator.java
@@ -228,7 +228,7 @@
 			// If any of the secondary fields are nullable, then add a select op that filters nulls.
 			AlgebricksMetaOperatorDescriptor selectOp = null;
 			if (anySecondaryKeyIsNullable) {
-				selectOp = createFilterNullsSelectOp(spec, numSecondaryKeys,RIDScanOpAndConstraints.second);
+				selectOp = createFilterNullsSelectOp(spec, numSecondaryKeys);
 			}
 
 			// Create secondary RTree bulk load op.
@@ -237,13 +237,13 @@
 					spec,
 					numNestedSecondaryKeyFields,
 					new LSMRTreeDataflowHelperFactory(valueProviderFactories, RTreePolicyType.RTREE,
-							primaryComparatorFactories, new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
-							AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
-							AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
-							AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
-							AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER, AqlMetadataProvider.proposeLinearizer(
-									keyType, secondaryComparatorFactories.length), storageProperties
-									.getBloomFilterFalsePositiveRate()), BTree.DEFAULT_FILL_FACTOR);
+	                        primaryComparatorFactories, new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
+	                        AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, new SecondaryIndexOperationTrackerProvider(
+	                                LSMRTreeIOOperationCallbackFactory.INSTANCE, dataset.getDatasetId()),
+	                        AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+	                        AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AqlMetadataProvider.proposeLinearizer(
+	                                keyType, secondaryComparatorFactories.length), storageProperties
+	                                .getBloomFilterFalsePositiveRate()), BTree.DEFAULT_FILL_FACTOR);
 			// Connect the operators.
 			// Create a hash partitioning connector
 			ExternalDatasetDetails edsd = (ExternalDatasetDetails)dataset.getDatasetDetails();
@@ -322,3 +322,4 @@
 		}
     }
 }
+
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
index 6509da4..81ce4f8 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
@@ -29,7 +29,6 @@
 import edu.uci.ics.asterix.common.transactions.DatasetId;
 import edu.uci.ics.asterix.common.transactions.IRecoveryManager.ResourceType;
 import edu.uci.ics.asterix.common.transactions.ITransactionContext;
-import edu.uci.ics.asterix.common.transactions.ITransactionContext.TransactionType;
 import edu.uci.ics.asterix.common.transactions.ITransactionSubsystem;
 import edu.uci.ics.asterix.common.transactions.JobId;
 import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
@@ -1250,6 +1249,22 @@
 			throw new MetadataException(e);
 		}
 	}
+	
+	@Override
+    public void addExternalDatasetFile(JobId jobId, ExternalFile externalFile)
+                    throws MetadataException, RemoteException {
+            try {
+                    // Insert into the 'externalFiles' dataset.
+                    ExternalFileTupleTranslator tupleReaderWriter = new ExternalFileTupleTranslator(true);
+                    ITupleReference externalFileTuple = tupleReaderWriter.getTupleFromMetadataEntity(externalFile);
+                    insertTupleIntoIndex(jobId, MetadataPrimaryIndexes.EXTERNAL_FILE_DATASET, externalFileTuple);
+            } catch (TreeIndexDuplicateKeyException e) {
+                    throw new MetadataException("An externalFile with this number " + externalFile.getFileNumber()
+                                    + " already exists in dataset '" + externalFile.getDatasetName() + "' in dataverse '"+externalFile.getDataverseName()+"'.", e);
+            } catch (Exception e) {
+                    throw new MetadataException(e);
+            }
+    }
 
 
 	@Override
@@ -1257,3 +1272,4 @@
 		return DatasetIdFactory.getMostRecentDatasetId();
 	}
 }
+
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
index f0c3a63..078b340d 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
@@ -178,22 +178,6 @@
     private static final Map<String, String> adapterFactoryMapping = initializeAdapterFactoryMapping();
     private static Scheduler hdfsScheduler;
 
-    public String getPropertyValue(String propertyName) {
-        return config.get(propertyName);
-    }
-
-    public void setConfig(Map<String, String> config) {
-        this.config = config;
-    }
-
-    public Map<String, String[]> getAllStores() {
-        return stores;
-    }
-
-    public Map<String, String> getConfig() {
-        return config;
-    }
-
     public AqlMetadataProvider(Dataverse defaultDataverse) {
         this.defaultDataverse = defaultDataverse;
         this.stores = AsterixAppContextInfo.getInstance().getMetadataProperties().getStores();
@@ -780,15 +764,17 @@
 				}
 				ISearchOperationCallbackFactory searchCallbackFactory = null;
 				searchCallbackFactory = new SecondaryIndexSearchOperationCallbackFactory();
-				AsterixRuntimeComponentsProvider rtcProvider = AsterixRuntimeComponentsProvider.LSMBTREE_SECONDARY_PROVIDER;
+				AsterixRuntimeComponentsProvider rtcProvider = AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER;
 				BTreeSearchOperatorDescriptor btreeSearchOp = new BTreeSearchOperatorDescriptor(jobSpec, outputRecDesc,
-						appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(), spPc.first,
-						typeTraits, comparatorFactories, bloomFilterKeyFields, lowKeyFields, highKeyFields,
-						lowKeyInclusive, highKeyInclusive, new LSMBTreeDataflowHelperFactory(
-								new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), rtcProvider,
-								AsterixRuntimeComponentsProvider.LSMBTREE_SECONDARY_PROVIDER, rtcProvider,
-								rtcProvider, storageProperties.getBloomFilterFalsePositiveRate()), retainInput,
-								searchCallbackFactory);
+	                    appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(), spPc.first,
+	                    typeTraits, comparatorFactories, bloomFilterKeyFields, lowKeyFields, highKeyFields,
+	                    lowKeyInclusive, highKeyInclusive, new LSMBTreeDataflowHelperFactory(
+	                            new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), rtcProvider,
+	                            isSecondary ? new SecondaryIndexOperationTrackerProvider(
+	                                    LSMBTreeIOOperationCallbackFactory.INSTANCE, dataset.getDatasetId())
+	                                    : new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()), rtcProvider,
+	                            rtcProvider, storageProperties.getBloomFilterFalsePositiveRate()), retainInput,
+	                    searchCallbackFactory);
 				return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeSearchOp, spPc.second);
 			} catch (MetadataException me) {
 				throw new AlgebricksException(me);
@@ -936,10 +922,10 @@
                     typeTraits, comparatorFactories, keyFields, new LSMRTreeDataflowHelperFactory(
                             valueProviderFactories, RTreePolicyType.RTREE, primaryComparatorFactories,
                             new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
-                            AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
-                            AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
-                            AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
-                            AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER, proposeLinearizer(
+                            AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                            new SecondaryIndexOperationTrackerProvider(LSMRTreeIOOperationCallbackFactory.INSTANCE,
+                                    dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                            AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, proposeLinearizer(
                                     nestedKeyType.getTypeTag(), comparatorFactories.length),
                             storageProperties.getBloomFilterFalsePositiveRate()), retainInput, searchCallbackFactory);
             return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(rtreeSearchOp, spPc.second);
@@ -1162,10 +1148,10 @@
                     appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits,
                     comparatorFactories, bloomFilterKeyFields, fieldPermutation, indexOp,
                     new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(datasetId),
-                            AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER,
+                            AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
                             new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
-                            AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER,
-                            AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER, storageProperties
+                            AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                            AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, storageProperties
                                     .getBloomFilterFalsePositiveRate()), null, modificationCallbackFactory, true);
 
             return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(insertDeleteOp,
@@ -1924,3 +1910,4 @@
     }
 
 }
+