add R-Tree support for external indexes and indexes optimizations
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/RTreeAccessMethod.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/RTreeAccessMethod.java
index b8125aa..8af4ac1 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/RTreeAccessMethod.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/RTreeAccessMethod.java
@@ -21,6 +21,7 @@
 import org.apache.commons.lang3.mutable.MutableObject;
 
 import edu.uci.ics.asterix.aql.util.FunctionUtils;
+import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
 import edu.uci.ics.asterix.common.config.DatasetConfig.IndexType;
 import edu.uci.ics.asterix.metadata.entities.Dataset;
 import edu.uci.ics.asterix.metadata.entities.Index;
@@ -44,6 +45,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExternalDataAccessByRIDOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
 
@@ -196,10 +198,19 @@
         UnnestMapOperator secondaryIndexUnnestOp = AccessMethodUtils.createSecondaryIndexUnnestMap(dataset, recordType,
                 chosenIndex, assignSearchKeys, jobGenParams, context, false, retainInput);
         // Generate the rest of the upstream plan which feeds the search results into the primary index.
-        UnnestMapOperator primaryIndexUnnestOp = AccessMethodUtils.createPrimaryIndexUnnestMap(dataSourceScan, dataset,
-                recordType, secondaryIndexUnnestOp, context, true, retainInput, false);
+        if(dataset.getDatasetType() == DatasetType.EXTERNAL)
+		{
+        	ExternalDataAccessByRIDOperator externalDataAccessOp = AccessMethodUtils.createExternalDataAccessByRIDUnnestMap(dataSourceScan, dataset, 
+    				recordType, secondaryIndexUnnestOp, context, chosenIndex);
+			return externalDataAccessOp;
+		}
+		else
+		{
+			UnnestMapOperator primaryIndexUnnestOp = AccessMethodUtils.createPrimaryIndexUnnestMap(dataSourceScan, dataset,
+					recordType, secondaryIndexUnnestOp, context, true, retainInput, false);
 
-        return primaryIndexUnnestOp;
+			return primaryIndexUnnestOp;
+		}
     }
 
     @Override
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
index 2198d67..855ee19 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
@@ -590,6 +590,18 @@
             //#. create the index artifact in NC.
             runJob(hcc, spec, true);
 
+            //if external data and optimization is turned on, load file names
+            if(ds.getDatasetType() == DatasetType.EXTERNAL && AqlMetadataProvider.isOptimizeExternalIndexes())
+            {
+            	//load the file names into external files index
+            	mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+            	bActiveTxn = true;
+                metadataProvider.setMetadataTxnContext(mdTxnCtx);
+                IndexOperations.addExternalDatasetFilesToMetadata(metadataProvider, ds);
+                MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+                bActiveTxn = false;
+            }
+            
             mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
             bActiveTxn = true;
             metadataProvider.setMetadataTxnContext(mdTxnCtx);
@@ -654,6 +666,8 @@
                     throw new IllegalStateException("System is inconsistent state: pending index(" + dataverseName
                             + "." + datasetName + "." + indexName + ") couldn't be removed from the metadata", e);
                 }
+                
+                //if external dataset, remove external files from metadata
             }
             throw e;
         } finally {
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/IndexOperations.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/IndexOperations.java
index 7bd6c69..e2767cd 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/IndexOperations.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/IndexOperations.java
@@ -14,13 +14,17 @@
  */
 package edu.uci.ics.asterix.file;
 
+import java.util.ArrayList;
+
 import edu.uci.ics.asterix.common.config.AsterixStorageProperties;
 import edu.uci.ics.asterix.common.config.OptimizationConfUtil;
 import edu.uci.ics.asterix.common.context.AsterixVirtualBufferCacheProvider;
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
 import edu.uci.ics.asterix.metadata.MetadataException;
+import edu.uci.ics.asterix.metadata.MetadataManager;
 import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
 import edu.uci.ics.asterix.metadata.entities.Dataset;
+import edu.uci.ics.asterix.metadata.entities.ExternalFile;
 import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
 import edu.uci.ics.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
 import edu.uci.ics.asterix.translator.CompiledStatements.CompiledCreateIndexStatement;
@@ -53,6 +57,17 @@
                 metadataProvider, physicalOptimizationConfig);
         return secondaryIndexCreator.buildLoadingJobSpec();
     }
+    
+    public static void addExternalDatasetFilesToMetadata(AqlMetadataProvider metadataProvider, 
+			Dataset dataset) throws AlgebricksException, MetadataException{
+			//get the file list
+			ArrayList<ExternalFile> files = metadataProvider.getExternalDatasetFiles(dataset);
+			//add files to metadata
+			for(int i=0; i < files.size(); i++)
+			{
+				MetadataManager.INSTANCE.addExternalFile(metadataProvider.getMetadataTxnContext(), files.get(i));
+			}
+	}
 
     public static JobSpecification buildDropSecondaryIndexJobSpec(CompiledIndexDropStatement indexDropStmt,
             AqlMetadataProvider metadataProvider, Dataset dataset) throws AlgebricksException, MetadataException {
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java
index 0d1741a..e55ae5a 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java
@@ -28,6 +28,7 @@
 import edu.uci.ics.asterix.external.adapter.factory.HDFSAdapterFactory;
 import edu.uci.ics.asterix.external.adapter.factory.HiveAdapterFactory;
 import edu.uci.ics.asterix.external.data.operator.ExternalDataIndexingOperatorDescriptor;
+import edu.uci.ics.asterix.external.dataset.adapter.HiveAdapter;
 import edu.uci.ics.asterix.formats.nontagged.AqlBinaryBooleanInspectorImpl;
 import edu.uci.ics.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
 import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
@@ -171,7 +172,7 @@
 			//get adapter name
 			String adapter = edsd.getAdapter();
 			//if not an hdfs adapter, throw an exception
-			if(!adapter.equals(HDFSAdapterFactory.HDFS_ADAPTER_NAME) && !adapter.equals(HiveAdapterFactory.HDFS_ADAPTER_NAME))
+			if(!adapter.equals(HDFSAdapterFactory.HDFS_ADAPTER_NAME) && !adapter.equals(HiveAdapter.class.getName()))
 			{
 				throw new AsterixException("Cannot index an external dataset with adapter type(" + adapter + ").");
 			}
@@ -236,10 +237,17 @@
 			}
 		}
 
-		//second add RID fields (File name and byte location)
-		fieldsNames[numSecondaryKeys] = "_file-name";
-		fieldsTypes[numSecondaryKeys] = BuiltinType.ASTRING;
-
+		//second add RID fields (File name or number and byte location)
+		if(AqlMetadataProvider.isOptimizeExternalIndexes())
+		{
+			fieldsNames[numSecondaryKeys] = "_file-number";
+			fieldsTypes[numSecondaryKeys] = BuiltinType.ASTRING;
+		}
+		else
+		{
+			fieldsNames[numSecondaryKeys] = "_file-name";
+			fieldsTypes[numSecondaryKeys] = BuiltinType.ASTRING;
+		}
 		fieldsNames[numSecondaryKeys+1] = "_byte-location";
 		fieldsTypes[numSecondaryKeys+1] = BuiltinType.AINT64;
 
@@ -258,21 +266,26 @@
 		String[] fieldsNames = new String[externalItemType.getFieldNames().length+numPrimaryKeys];
 		IAType[] fieldsTypes = new IAType[externalItemType.getFieldTypes().length+numPrimaryKeys];
 
-		//add RID fields names
-		fieldsNames[0] = "_file-name";
+		//add RID fields names and types
+		if(AqlMetadataProvider.isOptimizeExternalIndexes())
+		{
+			fieldsNames[0] = "_file-number";
+			fieldsTypes[0] = BuiltinType.AINT32;
+		}
+		else
+		{
+			fieldsNames[0] = "_file-name";
+			fieldsTypes[0] = BuiltinType.ASTRING;
+		}
 		fieldsNames[1] = "_byte-location";
-
-		//add RID types
-		fieldsTypes[0] = BuiltinType.ASTRING;
 		fieldsTypes[1] = BuiltinType.AINT64;
-
-
 		if(numPrimaryKeys == 3)
 		{	
 			//add the row number for rc files
 			fieldsNames[2] = "_row-number";
 			fieldsTypes[2] = BuiltinType.AINT32;
 		}
+		
 		//add the original fields names and types
 		for(int i=0; i < externalItemType.getFieldNames().length; i++)
 		{
@@ -291,7 +304,14 @@
 		primaryBloomFilterKeyFields = new int[numPrimaryKeys];
 		ISerializerDeserializerProvider serdeProvider = metadataProvider.getFormat().getSerdeProvider();
 		
-		primaryComparatorFactories[0] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(BuiltinType.ASTRING, true);
+		if(AqlMetadataProvider.isOptimizeExternalIndexes())
+		{
+			primaryComparatorFactories[0] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(BuiltinType.AINT32, true);
+		}
+		else
+		{
+			primaryComparatorFactories[0] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(BuiltinType.ASTRING, true);
+		}
 		primaryComparatorFactories[1] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(BuiltinType.AINT64, true);
 
 		primaryBloomFilterKeyFields[0]=0;
@@ -396,8 +416,16 @@
 			secondaryBloomFilterKeyFields[i] = i;
 		}
 
-		secondaryFieldAccessEvalFactories[numSecondaryKeys] = metadataProvider.getFormat().getFieldAccessEvaluatorFactory(
-				itemType, "_file-name", 0);
+		if(AqlMetadataProvider.isOptimizeExternalIndexes())
+		{
+			secondaryFieldAccessEvalFactories[numSecondaryKeys] = metadataProvider.getFormat().getFieldAccessEvaluatorFactory(
+				itemType, "_file-number", 0);
+		}
+		else
+		{
+			secondaryFieldAccessEvalFactories[numSecondaryKeys] = metadataProvider.getFormat().getFieldAccessEvaluatorFactory(
+					itemType, "_file-name", 0);
+		}
 		secondaryFieldAccessEvalFactories[numSecondaryKeys+1] = metadataProvider.getFormat().getFieldAccessEvaluatorFactory(
 				itemType, "_byte-location", 0);
 		if(numPrimaryKeys == 3)
@@ -438,7 +466,7 @@
 	}
 
 	protected Pair<ExternalDataIndexingOperatorDescriptor, AlgebricksPartitionConstraint> createExternalIndexingOp(JobSpecification spec) throws Exception {
-		Pair<ExternalDataIndexingOperatorDescriptor,AlgebricksPartitionConstraint> indexingOpAndConstraints = metadataProvider.buildExternalDataIndexingRuntime(spec, itemType, (ExternalDatasetDetails)dataset.getDatasetDetails(), NonTaggedDataFormat.INSTANCE);
+		Pair<ExternalDataIndexingOperatorDescriptor,AlgebricksPartitionConstraint> indexingOpAndConstraints = metadataProvider.buildExternalDataIndexingRuntime(spec, itemType, dataset, NonTaggedDataFormat.INSTANCE);
 		AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, indexingOpAndConstraints.first,
 				indexingOpAndConstraints.second);
 		return indexingOpAndConstraints;
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryInvertedIndexCreator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryInvertedIndexCreator.java
index 17590c5..70124e8 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryInvertedIndexCreator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryInvertedIndexCreator.java
@@ -294,4 +294,10 @@
                     storageProperties.getBloomFilterFalsePositiveRate());
         }
     }
+    
+    @Override
+	protected void setExternalSecondaryRecDescAndComparators(CompiledCreateIndexStatement createIndexStmt,
+			AqlMetadataProvider metadataProvider) throws AlgebricksException, AsterixException {
+    	throw new AsterixException("Cannot create inverted index on external dataset due to composite RID Fields.");
+	}
 }
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryRTreeCreator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryRTreeCreator.java
index 17632aa..cc8eb1b 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryRTreeCreator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryRTreeCreator.java
@@ -18,34 +18,45 @@
 
 import edu.uci.ics.asterix.common.api.ILocalResourceMetadata;
 import edu.uci.ics.asterix.common.config.AsterixStorageProperties;
+import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
 import edu.uci.ics.asterix.common.config.IAsterixPropertiesProvider;
 import edu.uci.ics.asterix.common.context.AsterixVirtualBufferCacheProvider;
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
 import edu.uci.ics.asterix.dataflow.data.nontagged.valueproviders.AqlPrimitiveValueProviderFactory;
+import edu.uci.ics.asterix.external.adapter.factory.HDFSAdapterFactory;
+import edu.uci.ics.asterix.external.data.operator.ExternalDataIndexingOperatorDescriptor;
+import edu.uci.ics.asterix.external.util.ExternalIndexHashPartitionComputerFactory;
 import edu.uci.ics.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
 import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
 import edu.uci.ics.asterix.formats.nontagged.AqlTypeTraitProvider;
 import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
+import edu.uci.ics.asterix.metadata.entities.ExternalDatasetDetails;
 import edu.uci.ics.asterix.metadata.entities.Index;
+import edu.uci.ics.asterix.metadata.utils.DatasetUtils;
 import edu.uci.ics.asterix.om.types.ATypeTag;
 import edu.uci.ics.asterix.om.types.IAType;
 import edu.uci.ics.asterix.om.util.NonTaggedFormatUtil;
+import edu.uci.ics.asterix.runtime.formats.NonTaggedDataFormat;
 import edu.uci.ics.asterix.transaction.management.resource.LSMRTreeLocalResourceMetadata;
 import edu.uci.ics.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider;
 import edu.uci.ics.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
 import edu.uci.ics.asterix.translator.CompiledStatements.CompiledCreateIndexStatement;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
 import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.ConnectorPolicyAssignmentPolicy;
 import edu.uci.ics.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
 import edu.uci.ics.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
 import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
@@ -148,8 +159,121 @@
     }
 
     @Override
+	protected void setExternalSecondaryRecDescAndComparators(CompiledCreateIndexStatement createIndexStmt,
+			AqlMetadataProvider metadataProvider) throws AlgebricksException, AsterixException {
+		secondaryKeyFields = createIndexStmt.getKeyFields();
+		if (numSecondaryKeys != 1) {
+			throw new AsterixException(
+					"Cannot use "
+							+ numSecondaryKeys
+							+ " fields as a key for the R-tree index. There can be only one field as a key for the R-tree index.");
+		}
+		Pair<IAType, Boolean> spatialTypePair = Index.getNonNullableKeyFieldType(secondaryKeyFields.get(0), itemType);
+		IAType spatialType = spatialTypePair.first;
+		anySecondaryKeyIsNullable = spatialTypePair.second;
+		if (spatialType == null) {
+			throw new AsterixException("Could not find field " + secondaryKeyFields.get(0) + " in the schema.");
+		}
+		int numDimensions = NonTaggedFormatUtil.getNumDimensions(spatialType.getTypeTag());
+		numNestedSecondaryKeyFields = numDimensions * 2;
+		secondaryFieldAccessEvalFactories = metadataProvider.getFormat().createMBRFactory(itemType, secondaryKeyFields.get(0),
+				numPrimaryKeys, numDimensions);
+		secondaryComparatorFactories = new IBinaryComparatorFactory[numNestedSecondaryKeyFields];
+		valueProviderFactories = new IPrimitiveValueProviderFactory[numNestedSecondaryKeyFields];
+		ISerializerDeserializer[] secondaryRecFields = new ISerializerDeserializer[numPrimaryKeys
+		                                                                           + numNestedSecondaryKeyFields];
+		ITypeTraits[] secondaryTypeTraits = new ITypeTraits[numNestedSecondaryKeyFields + numPrimaryKeys];
+		IAType nestedKeyType = NonTaggedFormatUtil.getNestedSpatialType(spatialType.getTypeTag());
+		keyType = nestedKeyType.getTypeTag();
+		for (int i = 0; i < numNestedSecondaryKeyFields; i++) {
+			ISerializerDeserializer keySerde = AqlSerializerDeserializerProvider.INSTANCE
+					.getSerializerDeserializer(nestedKeyType);
+			secondaryRecFields[i] = keySerde;
+			secondaryComparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
+					nestedKeyType, true);
+			secondaryTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(nestedKeyType);
+			valueProviderFactories[i] = AqlPrimitiveValueProviderFactory.INSTANCE;
+		}
+
+		// Add serializers and comparators for primary index fields.
+		for (int i = 0; i < numPrimaryKeys; i++) {
+			secondaryRecFields[numNestedSecondaryKeyFields + i] = primaryRecDesc.getFields()[i];
+			secondaryTypeTraits[numNestedSecondaryKeyFields + i] = primaryRecDesc.getTypeTraits()[i];
+		}
+		secondaryRecDesc = new RecordDescriptor(secondaryRecFields, secondaryTypeTraits);
+	}
+    
+    @Override
     public JobSpecification buildLoadingJobSpec() throws AsterixException, AlgebricksException {
         JobSpecification spec = JobSpecificationUtils.createJobSpecification();
+        if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
+			Pair<ExternalDataIndexingOperatorDescriptor, AlgebricksPartitionConstraint> RIDScanOpAndConstraints;
+			AlgebricksMetaOperatorDescriptor asterixAssignOp;
+			try
+			{
+				//create external indexing scan operator
+				RIDScanOpAndConstraints = createExternalIndexingOp(spec);
+				//create assign operator
+				asterixAssignOp = createExternalAssignOp(spec);
+				AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, asterixAssignOp,
+						RIDScanOpAndConstraints.second);
+			}
+			catch(Exception e)
+			{
+				throw new AsterixException("Failed to create external index scanning and loading job");
+			}
+
+			// If any of the secondary fields are nullable, then add a select op that filters nulls.
+			AlgebricksMetaOperatorDescriptor selectOp = null;
+			if (anySecondaryKeyIsNullable) {
+				selectOp = createFilterNullsSelectOp(spec, numSecondaryKeys,RIDScanOpAndConstraints.second);
+			}
+
+			// Create secondary RTree bulk load op.
+			AsterixStorageProperties storageProperties = propertiesProvider.getStorageProperties();
+			TreeIndexBulkLoadOperatorDescriptor secondaryBulkLoadOp = createTreeIndexBulkLoadOp(
+					spec,
+					numNestedSecondaryKeyFields,
+					new LSMRTreeDataflowHelperFactory(valueProviderFactories, RTreePolicyType.RTREE,
+							primaryComparatorFactories, new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
+							AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
+							AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
+							AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
+							AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER, AqlMetadataProvider.proposeLinearizer(
+									keyType, secondaryComparatorFactories.length), storageProperties
+									.getBloomFilterFalsePositiveRate()), BTree.DEFAULT_FILL_FACTOR);
+			// Connect the operators.
+			// Create a hash partitioning connector
+			ExternalDatasetDetails edsd = (ExternalDatasetDetails)dataset.getDatasetDetails();
+			IBinaryHashFunctionFactory[] hashFactories = null;
+			if(edsd.getProperties().get(HDFSAdapterFactory.KEY_INPUT_FORMAT).trim().equals(HDFSAdapterFactory.INPUT_FORMAT_RC))
+			{
+				hashFactories = DatasetUtils.computeExternalDataKeysBinaryHashFunFactories(dataset, NonTaggedDataFormat.INSTANCE.getBinaryHashFunctionFactoryProvider());
+			}
+			else
+			{
+				hashFactories = DatasetUtils.computeExternalDataKeysBinaryHashFunFactories(dataset, NonTaggedDataFormat.INSTANCE.getBinaryHashFunctionFactoryProvider());
+			}	 
+			//select partitioning keys (always the first 2 after secondary keys)
+			int[] keys = new int[2];
+			keys[0] = numSecondaryKeys;
+			keys[1] = numSecondaryKeys + 1;
+
+			IConnectorDescriptor hashConn = new MToNPartitioningConnectorDescriptor(spec,
+					new ExternalIndexHashPartitionComputerFactory(keys, hashFactories));
+			spec.connect(new OneToOneConnectorDescriptor(spec), RIDScanOpAndConstraints.first, 0, asterixAssignOp, 0);
+			if (anySecondaryKeyIsNullable) {
+				spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, selectOp, 0);
+				spec.connect(hashConn, selectOp, 0, secondaryBulkLoadOp, 0);
+			} else {
+				spec.connect(hashConn, asterixAssignOp, 0, secondaryBulkLoadOp, 0);
+			}
+			spec.addRoot(secondaryBulkLoadOp);
+			spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+			return spec;
+		}
+		else
+		{
 
         // Create dummy key provider for feeding the primary index scan. 
         AbstractOperatorDescriptor keyProviderOp = createDummyKeyProviderOp(spec);
@@ -193,5 +317,6 @@
         spec.addRoot(secondaryBulkLoadOp);
         spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
         return spec;
+		}
     }
 }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java
index ccab0f4..7e63e84 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java
@@ -116,7 +116,7 @@
 
 	@Override
 	public IControlledAdapter createAccessByRIDAdapter(
-			Map<String, Object> configuration, IAType atype) throws Exception {
+			Map<String, Object> configuration, IAType atype, HashMap<Integer, String> files) throws Exception {
 		Configuration conf = configureHadoopConnection(configuration);
 		clusterLocations = (AlgebricksPartitionConstraint) configuration.get(CLUSTER_LOCATIONS);
 		
@@ -129,19 +129,19 @@
 			char delimeter = 0x01;
 			configuration.put(KEY_FORMAT, FORMAT_DELIMITED_TEXT);
 			configuration.put(KEY_DELIMITER, Character.toString(delimeter));
-			ridRecordDesc = getRIDRecDesc(true);
+			ridRecordDesc = getRIDRecDesc(true, files != null);
 		}
 		else
 		{
-			ridRecordDesc = getRIDRecDesc(false);
+			ridRecordDesc = getRIDRecDesc(false, files != null);
 		}
-		HDFSAccessByRIDAdapter adapter = new HDFSAccessByRIDAdapter(atype, ((String)configuration.get(KEY_INPUT_FORMAT)), clusterLocations,ridRecordDesc, conf);
+		HDFSAccessByRIDAdapter adapter = new HDFSAccessByRIDAdapter(atype, ((String)configuration.get(KEY_INPUT_FORMAT)), clusterLocations,ridRecordDesc, conf, files);
 		adapter.configure(configuration);
 		return adapter;
 	}
 
 	@Override
-	public IDatasourceAdapter createIndexingAdapter(Map<String, Object> configuration, IAType atype) throws Exception {
+	public IDatasourceAdapter createIndexingAdapter(Map<String, Object> configuration, IAType atype, Map<String,Integer> files) throws Exception {
 		if (!setup) {
 			/** set up the factory --serializable stuff --- this if-block should be called only once for each factory instance */
 			configureJobConf(configuration);
@@ -170,7 +170,7 @@
 			configuration.put(KEY_FORMAT, FORMAT_DELIMITED_TEXT);
 			configuration.put(KEY_DELIMITER, Character.toString(delimeter));	
 		}
-		HDFSIndexingAdapter hdfsIndexingAdapter = new HDFSIndexingAdapter(atype, readSchedule, executed, inputSplits, conf, clusterLocations);
+		HDFSIndexingAdapter hdfsIndexingAdapter = new HDFSIndexingAdapter(atype, readSchedule, executed, inputSplits, conf, clusterLocations, files);
 		hdfsIndexingAdapter.configure(configuration);
 		return hdfsIndexingAdapter;
 	}
@@ -199,7 +199,7 @@
 		return conf;
 	}
 	
-	public static RecordDescriptor getRIDRecDesc(boolean isRCFile){
+	public static RecordDescriptor getRIDRecDesc(boolean isRCFile, boolean optimize){
 		int numOfPrimaryKeys = 2;
 		if(isRCFile)
 		{
@@ -208,8 +208,16 @@
 		@SuppressWarnings("rawtypes")
 		ISerializerDeserializer[] serde = new ISerializerDeserializer[numOfPrimaryKeys];
 		ITypeTraits[] tt = new ITypeTraits[numOfPrimaryKeys];
-		serde[0] = AqlSerializerDeserializerProvider.INSTANCE.getNonTaggedSerializerDeserializer(BuiltinType.ASTRING);
-		tt[0] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(BuiltinType.ASTRING);
+		if(optimize)
+		{
+			serde[0] = AqlSerializerDeserializerProvider.INSTANCE.getNonTaggedSerializerDeserializer(BuiltinType.AINT32);
+			tt[0] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(BuiltinType.AINT32);
+		}
+		else
+		{
+			serde[0] = AqlSerializerDeserializerProvider.INSTANCE.getNonTaggedSerializerDeserializer(BuiltinType.ASTRING);
+			tt[0] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(BuiltinType.ASTRING);
+		}
 		serde[1] = AqlSerializerDeserializerProvider.INSTANCE.getNonTaggedSerializerDeserializer(BuiltinType.AINT64);
 		tt[1] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(BuiltinType.AINT64);
 		if(isRCFile)
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java
index e4a1570..64c8153 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java
@@ -86,7 +86,7 @@
 
 
 	@Override
-	public IControlledAdapter createAccessByRIDAdapter(Map<String, Object> configuration, IAType atype) throws Exception {
+	public IControlledAdapter createAccessByRIDAdapter(Map<String, Object> configuration, IAType atype, HashMap<Integer, String> files) throws Exception {
 		Configuration conf = HDFSAdapterFactory.configureHadoopConnection(configuration);
 		clusterLocations = (AlgebricksPartitionConstraint) configuration.get(CLUSTER_LOCATIONS);
 		//Create RID record desc
@@ -98,20 +98,20 @@
 			char delimeter = 0x01;
 			configuration.put(KEY_FORMAT, FORMAT_DELIMITED_TEXT);
 			configuration.put(KEY_DELIMITER, Character.toString(delimeter));
-			ridRecordDesc = HDFSAdapterFactory.getRIDRecDesc(true);
+			ridRecordDesc = HDFSAdapterFactory.getRIDRecDesc(true, files != null);
 		}
 		else
 		{
-			ridRecordDesc = HDFSAdapterFactory.getRIDRecDesc(false);
+			ridRecordDesc = HDFSAdapterFactory.getRIDRecDesc(false, files != null);
 		}
-		HDFSAccessByRIDAdapter adapter = new HDFSAccessByRIDAdapter(atype, ((String)configuration.get(KEY_INPUT_FORMAT)), clusterLocations,ridRecordDesc, conf);
+		HDFSAccessByRIDAdapter adapter = new HDFSAccessByRIDAdapter(atype, ((String)configuration.get(KEY_INPUT_FORMAT)), clusterLocations,ridRecordDesc, conf, files);
 		adapter.configure(configuration);
 		return adapter;
 	}
 
 	@Override
 	public IDatasourceAdapter createIndexingAdapter(
-			Map<String, Object> configuration, IAType atype) throws Exception {
+			Map<String, Object> configuration, IAType atype, Map<String,Integer> files) throws Exception {
 		if (!setup) {
 			/** set up the factory --serializable stuff --- this if-block should be called only once for each factory instance */
 			configureJobConf(configuration);
@@ -133,7 +133,7 @@
 		}
 		JobConf conf = confFactory.getConf();
 		InputSplit[] inputSplits = inputSplitsFactory.getSplits();
-		HiveIndexingAdapter hiveIndexingAdapter = new HiveIndexingAdapter(atype, readSchedule, executed, inputSplits, conf, clusterLocations);
+		HiveIndexingAdapter hiveIndexingAdapter = new HiveIndexingAdapter(atype, readSchedule, executed, inputSplits, conf, clusterLocations, files);
 
 		//If input format is rcfile, configure parser expected format to delimeted text with 0x01 (default ) as delimiter
 		if(((String)configuration.get(KEY_INPUT_FORMAT)).equals(INPUT_FORMAT_RC))
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/IGenericDatasetAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/IGenericDatasetAdapterFactory.java
index 75d972e..f046f88 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/IGenericDatasetAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/IGenericDatasetAdapterFactory.java
@@ -14,6 +14,7 @@
  */
 package edu.uci.ics.asterix.external.adapter.factory;
 
+import java.util.HashMap;
 import java.util.Map;
 
 import edu.uci.ics.asterix.external.dataset.adapter.IControlledAdapter;
@@ -52,7 +53,7 @@
      * @return An instance of IDatasourceAdapter.
      * @throws Exception
      */
-    public IDatasourceAdapter createIndexingAdapter(Map<String, Object> configuration, IAType atype) throws Exception;
+    public IDatasourceAdapter createIndexingAdapter(Map<String, Object> configuration, IAType atype, Map<String,Integer> files) throws Exception;
 
     /**
      * Creates an instance of IDatasourceAdapter that is used to read records using their RIDs.
@@ -65,5 +66,5 @@
      * @return An instance of IControlledAdapter.
      * @throws Exception
      */
-    public IControlledAdapter createAccessByRIDAdapter(Map<String, Object> configuration, IAType atype) throws Exception;
+    public IControlledAdapter createAccessByRIDAdapter(Map<String, Object> configuration, IAType atype, HashMap<Integer, String> files) throws Exception;
 }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java
index 3dc6cc8..4fae7e7 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java
@@ -14,6 +14,7 @@
  */
 package edu.uci.ics.asterix.external.adapter.factory;
 
+import java.util.HashMap;
 import java.util.Map;
 
 import edu.uci.ics.asterix.external.dataset.adapter.IControlledAdapter;
@@ -45,14 +46,14 @@
 
 	@Override
 	public IDatasourceAdapter createIndexingAdapter(
-			Map<String, Object> configuration, IAType atype) throws Exception {
+			Map<String, Object> configuration, IAType atype, Map<String,Integer> files) throws Exception {
 		throw new NotImplementedException("Indexing Adapter is not implemented for NC FileSystem Data");
 	}
 
 
 
 	@Override
-	public IControlledAdapter createAccessByRIDAdapter(Map<String, Object> configuration, IAType atype) throws Exception {
+	public IControlledAdapter createAccessByRIDAdapter(Map<String, Object> configuration, IAType atype, HashMap<Integer, String> files) throws Exception {
 		throw new NotImplementedException("Access by RID Adapter is not implemented for NC FileSystem Data");
 	}
 }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/ExternalDataAccessByRIDOperatorDescriptor.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/ExternalDataAccessByRIDOperatorDescriptor.java
index 36968ef..aa91a56 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/ExternalDataAccessByRIDOperatorDescriptor.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/ExternalDataAccessByRIDOperatorDescriptor.java
@@ -1,6 +1,7 @@
 package edu.uci.ics.asterix.external.data.operator;
 
 import java.nio.ByteBuffer;
+import java.util.HashMap;
 import java.util.Map;
 
 import edu.uci.ics.asterix.external.adapter.factory.IGenericDatasetAdapterFactory;
@@ -25,15 +26,17 @@
 	private final IAType atype;
 	private IGenericDatasetAdapterFactory datasourceAdapterFactory;
 	private IControlledAdapter adapter;
+	private final HashMap<Integer, String> files;
 	
 	public ExternalDataAccessByRIDOperatorDescriptor(
 			IOperatorDescriptorRegistry spec, Map<String, Object> arguments, IAType atype,
-			RecordDescriptor outRecDesc,IGenericDatasetAdapterFactory dataSourceAdapterFactory) {
+			RecordDescriptor outRecDesc,IGenericDatasetAdapterFactory dataSourceAdapterFactory, HashMap<Integer, String> files) {
 		super(spec, 1, 1);
 		this.atype = atype;
 		this.adapterConfiguration = arguments;
 		this.datasourceAdapterFactory = dataSourceAdapterFactory;
 		this.recordDescriptors[0] = outRecDesc;
+		this.files = files;
 	}
 
 	@Override
@@ -45,7 +48,7 @@
 			public void open() throws HyracksDataException {
 				//create the access by index adapter
 				try {
-					adapter = datasourceAdapterFactory.createAccessByRIDAdapter(adapterConfiguration, atype);
+					adapter = datasourceAdapterFactory.createAccessByRIDAdapter(adapterConfiguration, atype, files);
 					adapter.initialize(ctx);
 				} catch (Exception e) {
 					throw new HyracksDataException("error during creation of external read by RID adapter", e);
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/ExternalDataIndexingOperatorDescriptor.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/ExternalDataIndexingOperatorDescriptor.java
index 436c7cc..9ff1f06 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/ExternalDataIndexingOperatorDescriptor.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/ExternalDataIndexingOperatorDescriptor.java
@@ -24,16 +24,18 @@
 	private static final long serialVersionUID = 1L;
 
 	private final Map<String, Object> adapterConfiguration;
+	private final Map<String,Integer> files;
 	private final IAType atype;
 	private IGenericDatasetAdapterFactory datasourceAdapterFactory;
 
 	public ExternalDataIndexingOperatorDescriptor(JobSpecification spec, Map<String, Object> arguments, IAType atype,
-			RecordDescriptor rDesc, IGenericDatasetAdapterFactory dataSourceAdapterFactory) {
+			RecordDescriptor rDesc, IGenericDatasetAdapterFactory dataSourceAdapterFactory, Map<String,Integer> files) {
 		super(spec, 0, 1);
 		recordDescriptors[0] = rDesc;
 		this.adapterConfiguration = arguments;
 		this.atype = atype;
 		this.datasourceAdapterFactory = dataSourceAdapterFactory;
+		this.files = files;
 	}
 
 	@Override
@@ -48,7 +50,7 @@
 				IDatasourceAdapter adapter = null;
 				try {
 					adapter = ((IGenericDatasetAdapterFactory) datasourceAdapterFactory).createIndexingAdapter(
-							adapterConfiguration, atype);
+							adapterConfiguration, atype, files);
 					adapter.initialize(ctx);
 					adapter.start(partition, writer);
 				} catch (Exception e) {
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAccessByRIDAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAccessByRIDAdapter.java
index fd846f2..86a060c 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAccessByRIDAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAccessByRIDAdapter.java
@@ -18,6 +18,7 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
+import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.hadoop.io.LongWritable;
@@ -31,17 +32,13 @@
 import org.apache.hadoop.hive.ql.io.RCFile;
 import org.apache.hadoop.hive.ql.io.RCFile.Reader;
 import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
-import org.apache.log4j.Logger;
-
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
 import edu.uci.ics.asterix.external.adapter.factory.HDFSAdapterFactory;
-import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
 import edu.uci.ics.asterix.om.base.AInt32;
 import edu.uci.ics.asterix.om.base.AInt64;
 import edu.uci.ics.asterix.om.base.AString;
 import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.asterix.om.types.BuiltinType;
 import edu.uci.ics.asterix.om.types.IAType;
 import edu.uci.ics.asterix.runtime.operators.file.ControlledADMTupleParserFactory;
 import edu.uci.ics.asterix.runtime.operators.file.ControlledDelimitedDataTupleParserFactory;
@@ -51,12 +48,8 @@
 import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.data.std.primitive.IntegerPointable;
-import edu.uci.ics.hyracks.data.std.primitive.LongPointable;
-import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import edu.uci.ics.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
 import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
@@ -70,29 +63,26 @@
 public class HDFSAccessByRIDAdapter extends FileSystemBasedAdapter implements IControlledAdapter{
 
 	private static final long serialVersionUID = 1L;
-	private transient AlgebricksPartitionConstraint clusterLocations;
 	private boolean newFrame;
 	private transient ByteBuffer frameBuffer;
 	private String inputFormat;
 	private Configuration conf;
 	private transient FileSystem fs;
 	private RecordDescriptor inRecDesc;
+	private final HashMap<Integer, String> files;
 
-	public HDFSAccessByRIDAdapter(IAType atype, String inputFormat, AlgebricksPartitionConstraint clusterLocations, RecordDescriptor inRecDesc, Configuration conf) {
+	public HDFSAccessByRIDAdapter(IAType atype, String inputFormat, AlgebricksPartitionConstraint clusterLocations, RecordDescriptor inRecDesc, Configuration conf, HashMap<Integer,String> files) {
 		super(atype);
-		this.clusterLocations = clusterLocations;
 		this.inputFormat = inputFormat;
 		this.conf = conf;
 		this.inRecDesc = inRecDesc;
+		this.files = files;
 	}
 
 	@Override
 	public void configure(Map<String, Object> arguments) throws Exception {
 		this.configuration = arguments;
 		fs = FileSystem.get(conf);
-		//set up the parser factory here for now -> when everything works, make it professional
-		//The one below doesn't work for this one
-		//configureFormat();
 		String specifiedFormat = (String) configuration.get(KEY_FORMAT);
 		if (specifiedFormat == null) {
 			throw new IllegalArgumentException(" Unspecified data format");
@@ -147,8 +137,6 @@
 		return AdapterType.READ;
 	}
 
-	//modefy the initialize function
-	//add the initial set up for the adapter
 	@Override
 	public void initialize(IHyracksTaskContext ctx) throws Exception {
 		this.ctx = ctx;
@@ -157,144 +145,108 @@
 		((ControlledTupleParser)parser).initialize(getInputStream(0));
 	}
 
-	@SuppressWarnings("unchecked")
 	@Override
 	public InputStream getInputStream(int partition) throws IOException {
 
-		//different input stream implementation based on the input format
-		if(inputFormat.equals(HDFSAdapterFactory.INPUT_FORMAT_RC))
-		{
-			return new InputStream() {
-				private RCFile.Reader reader;
-				private int rowDifference;
-				private String lastFileName = "";
-				private String newFileName;
-				private long lastByteLocation = 0;
-				private long newByteLocation = 0;
-				private int lastRowNumber = 0;
-				private int newRowNumber = 0;
-				private LongWritable key;
-				private BytesRefArrayWritable value;
-				private int EOL = "\n".getBytes()[0];
-				private byte delimiter = 0x01;
-				private boolean pendingValue = false;
-				private int currentTupleIdx;
-				private int numberOfTuplesInCurrentFrame;
-				private IFrameTupleAccessor tupleAccessor = new FrameTupleAccessor(ctx.getFrameSize(),inRecDesc);
-				private ByteBufferInputStream bbis = new ByteBufferInputStream();
-				private DataInputStream dis = new DataInputStream(bbis);
+		//if files map is not null, then it is optimized and we should return optimized inputStream, else return regular
+		if(files == null)
+		{	
 
-				@Override
-				public void close()
-				{
-					if (reader != null)
+			//different input stream implementation based on the input format
+			if(inputFormat.equals(HDFSAdapterFactory.INPUT_FORMAT_RC))
+			{
+				return new InputStream() {
+					private RCFile.Reader reader;
+					private int rowDifference;
+					private String lastFileName = "";
+					private String newFileName;
+					private long lastByteLocation = 0;
+					private long newByteLocation = 0;
+					private int lastRowNumber = 0;
+					private int newRowNumber = 0;
+					private LongWritable key;
+					private BytesRefArrayWritable value;
+					private int EOL = "\n".getBytes()[0];
+					private byte delimiter = 0x01;
+					private boolean pendingValue = false;
+					private int currentTupleIdx;
+					private int numberOfTuplesInCurrentFrame;
+					private IFrameTupleAccessor tupleAccessor = new FrameTupleAccessor(ctx.getFrameSize(),inRecDesc);
+					private ByteBufferInputStream bbis = new ByteBufferInputStream();
+					private DataInputStream dis = new DataInputStream(bbis);
+
+					@Override
+					public void close()
 					{
-						reader.close();
-					}
-					try {
-						super.close();
-					} catch (IOException e) {
-						e.printStackTrace();
-					}
-				}
-
-				@Override
-				public int read(byte[] buffer, int offset, int len) throws IOException {
-					if(newFrame)
-					{
-						//first time called with this frame
-						//reset frame buffer
-						tupleAccessor.reset(frameBuffer);
-						//get number of tuples in frame
-						numberOfTuplesInCurrentFrame = tupleAccessor.getTupleCount();
-						//set tuple index to first tuple
-						currentTupleIdx = 0;
-						//set new frame to false
-						newFrame = false;
-						pendingValue = false;
-					}
-
-					//check and see if there is a pending value
-					//Double check this
-					int numBytes = 0;
-					if (pendingValue) {
-						//last value didn't fit into buffer
-						int sizeOfNextTuple = getTupleSize(value) + 1;
-						if(sizeOfNextTuple > len)
+						if (reader != null)
 						{
-							return 0;
+							reader.close();
 						}
-						copyCurrentTuple(buffer, offset + numBytes);
-						buffer[offset + numBytes + sizeOfNextTuple - 1] = (byte) EOL;
-						numBytes += sizeOfNextTuple;
-						//set pending to false
-						pendingValue = false;
-						//move to next tuple
-						currentTupleIdx++;
+						try {
+							super.close();
+						} catch (IOException e) {
+							e.printStackTrace();
+						}
 					}
 
-					//No pending value or done with pending value
-					//check if there are more tuples in the frame
-					while(currentTupleIdx < numberOfTuplesInCurrentFrame)
-					{
-						//get 3 things from the current tuple in the frame(File name, byte location and row number)
-						//get the fileName
-						bbis.setByteBuffer(frameBuffer, tupleAccessor.getTupleStartOffset(currentTupleIdx) + tupleAccessor.getFieldSlotsLength() + tupleAccessor.getFieldStartOffset(currentTupleIdx, 0));
-						newFileName = ((AString) inRecDesc.getFields()[0].deserialize(dis)).getStringValue();
-						//check if it is a new file
-						if(!lastFileName.equals(newFileName))//stringBuilder.toString()))
+					@Override
+					public int read(byte[] buffer, int offset, int len) throws IOException {
+						if(newFrame)
 						{
-							//new file
-							lastFileName = newFileName;
-							//close old file
-							if(reader != null)
-							{
-								reader.close();
-							}
-							//open new file
-							reader = new Reader(fs, new Path(lastFileName), conf);
-							//read and save byte location
-							bbis.setByteBuffer(frameBuffer, tupleAccessor.getTupleStartOffset(currentTupleIdx) + tupleAccessor.getFieldSlotsLength() + tupleAccessor.getFieldStartOffset(currentTupleIdx, 1));
-							lastByteLocation = ((AInt64) inRecDesc.getFields()[1].deserialize(dis)).getLongValue();
-							//seek
-							reader.seek(lastByteLocation);
-							//read and save rowNumber
-							bbis.setByteBuffer(frameBuffer, tupleAccessor.getTupleStartOffset(currentTupleIdx) + tupleAccessor.getFieldSlotsLength() + tupleAccessor.getFieldStartOffset(currentTupleIdx, 2));
-							lastRowNumber = ((AInt32)(inRecDesc.getFields()[2].deserialize(dis))).getIntegerValue();
-							//loop until row
-							for(int i=0; i < lastRowNumber; i++)
-							{
-								//this loop perform a single I/O and move to the next record in the block which is already in memory
-								//if no more records in the current block, it perform another I/O and get the next block
-								//<this should never happen here>
-								reader.next(key);
-							}
-							//read record
-							reader.getCurrentRow(value);
-							//copy it to the buffer if there is enough space
+							//first time called with this frame
+							//reset frame buffer
+							tupleAccessor.reset(frameBuffer);
+							//get number of tuples in frame
+							numberOfTuplesInCurrentFrame = tupleAccessor.getTupleCount();
+							//set tuple index to first tuple
+							currentTupleIdx = 0;
+							//set new frame to false
+							newFrame = false;
+							pendingValue = false;
+						}
+
+						//check and see if there is a pending value
+						//Double check this
+						int numBytes = 0;
+						if (pendingValue) {
+							//last value didn't fit into buffer
 							int sizeOfNextTuple = getTupleSize(value) + 1;
-							if(sizeOfNextTuple + numBytes > len)
+							if(sizeOfNextTuple > len)
 							{
-								//mark waiting value
-								pendingValue = true;
-								return numBytes;
+								return 0;
 							}
 							copyCurrentTuple(buffer, offset + numBytes);
 							buffer[offset + numBytes + sizeOfNextTuple - 1] = (byte) EOL;
 							numBytes += sizeOfNextTuple;
+							//set pending to false
+							pendingValue = false;
+							//move to next tuple
+							currentTupleIdx++;
 						}
-						else
-						{
-							//same file
-							//get the byte location
-							bbis.setByteBuffer(frameBuffer, tupleAccessor.getTupleStartOffset(currentTupleIdx) + tupleAccessor.getFieldSlotsLength() + tupleAccessor.getFieldStartOffset(currentTupleIdx, 1));
-							newByteLocation = ((AInt64) inRecDesc.getFields()[1].deserialize(dis)).getLongValue();
 
-							//check if same block
-							if(lastByteLocation != newByteLocation)
+						//No pending value or done with pending value
+						//check if there are more tuples in the frame
+						while(currentTupleIdx < numberOfTuplesInCurrentFrame)
+						{
+							//get 3 things from the current tuple in the frame(File name, byte location and row number)
+							//get the fileName
+							bbis.setByteBuffer(frameBuffer, tupleAccessor.getTupleStartOffset(currentTupleIdx) + tupleAccessor.getFieldSlotsLength() + tupleAccessor.getFieldStartOffset(currentTupleIdx, 0));
+							newFileName = ((AString) inRecDesc.getFields()[0].deserialize(dis)).getStringValue();
+							//check if it is a new file
+							if(!lastFileName.equals(newFileName))//stringBuilder.toString()))
 							{
-								//new block
-								lastByteLocation = newByteLocation;
+								//new file
+								lastFileName = newFileName;
+								//close old file
+								if(reader != null)
+								{
+									reader.close();
+								}
+								//open new file
+								reader = new Reader(fs, new Path(lastFileName), conf);
+								//read and save byte location
+								bbis.setByteBuffer(frameBuffer, tupleAccessor.getTupleStartOffset(currentTupleIdx) + tupleAccessor.getFieldSlotsLength() + tupleAccessor.getFieldStartOffset(currentTupleIdx, 1));
+								lastByteLocation = ((AInt64) inRecDesc.getFields()[1].deserialize(dis)).getLongValue();
 								//seek
 								reader.seek(lastByteLocation);
 								//read and save rowNumber
@@ -303,6 +255,9 @@
 								//loop until row
 								for(int i=0; i < lastRowNumber; i++)
 								{
+									//this loop perform a single I/O and move to the next record in the block which is already in memory
+									//if no more records in the current block, it perform another I/O and get the next block
+									//<this should never happen here>
 									reader.next(key);
 								}
 								//read record
@@ -321,25 +276,497 @@
 							}
 							else
 							{
-								//same block
-								//get the row number
-								bbis.setByteBuffer(frameBuffer, tupleAccessor.getTupleStartOffset(currentTupleIdx) + tupleAccessor.getFieldSlotsLength() + tupleAccessor.getFieldStartOffset(currentTupleIdx, 2));
-								newRowNumber = ((AInt32)(inRecDesc.getFields()[2].deserialize(dis))).getIntegerValue();
+								//same file
+								//get the byte location
+								bbis.setByteBuffer(frameBuffer, tupleAccessor.getTupleStartOffset(currentTupleIdx) + tupleAccessor.getFieldSlotsLength() + tupleAccessor.getFieldStartOffset(currentTupleIdx, 1));
+								newByteLocation = ((AInt64) inRecDesc.getFields()[1].deserialize(dis)).getLongValue();
 
-								//calculate row difference
-								rowDifference = newRowNumber - lastRowNumber;
-
-								//update last row number
-								lastRowNumber = newRowNumber;
-
-								//move to the new row
-								for(int i=0; i < rowDifference; i++)
+								//check if same block
+								if(lastByteLocation != newByteLocation)
 								{
+									//new block
+									lastByteLocation = newByteLocation;
+									//seek
+									reader.seek(lastByteLocation);
+									//read and save rowNumber
+									bbis.setByteBuffer(frameBuffer, tupleAccessor.getTupleStartOffset(currentTupleIdx) + tupleAccessor.getFieldSlotsLength() + tupleAccessor.getFieldStartOffset(currentTupleIdx, 2));
+									lastRowNumber = ((AInt32)(inRecDesc.getFields()[2].deserialize(dis))).getIntegerValue();
+									//loop until row
+									for(int i=0; i < lastRowNumber; i++)
+									{
+										reader.next(key);
+									}
+									//read record
+									reader.getCurrentRow(value);
+									//copy it to the buffer if there is enough space
+									int sizeOfNextTuple = getTupleSize(value) + 1;
+									if(sizeOfNextTuple + numBytes > len)
+									{
+										//mark waiting value
+										pendingValue = true;
+										return numBytes;
+									}
+									copyCurrentTuple(buffer, offset + numBytes);
+									buffer[offset + numBytes + sizeOfNextTuple - 1] = (byte) EOL;
+									numBytes += sizeOfNextTuple;
+								}
+								else
+								{
+									//same block
+									//get the row number
+									bbis.setByteBuffer(frameBuffer, tupleAccessor.getTupleStartOffset(currentTupleIdx) + tupleAccessor.getFieldSlotsLength() + tupleAccessor.getFieldStartOffset(currentTupleIdx, 2));
+									newRowNumber = ((AInt32)(inRecDesc.getFields()[2].deserialize(dis))).getIntegerValue();
+
+									//calculate row difference
+									rowDifference = newRowNumber - lastRowNumber;
+
+									//update last row number
+									lastRowNumber = newRowNumber;
+
+									//move to the new row
+									for(int i=0; i < rowDifference; i++)
+									{
+										reader.next(key);
+									}
+									//read record
+									reader.getCurrentRow(value);
+
+									//copy it to the buffer if there is enough space
+									int sizeOfNextTuple = getTupleSize(value) + 1;
+									if(sizeOfNextTuple + numBytes > len)
+									{
+										//mark waiting value
+										pendingValue = true;
+										return numBytes;
+									}
+									copyCurrentTuple(buffer, offset + numBytes);
+									buffer[offset + numBytes + sizeOfNextTuple - 1] = (byte) EOL;
+									numBytes += sizeOfNextTuple;
+								}
+							}
+							//move to next tuple
+							currentTupleIdx++;
+						}	
+						//no more tuples in frame
+						return (numBytes == 0) ? -1 : numBytes;
+					}
+
+					private void copyCurrentTuple(byte[] buffer, int offset) throws IOException {
+						int rcOffset = 0;
+						for(int i=0; i< value.size(); i++)
+						{
+							System.arraycopy(value.get(i).getData(), value.get(i).getStart(), buffer, offset + rcOffset, value.get(i).getLength());
+							rcOffset += value.get(i).getLength() + 1;
+							buffer[rcOffset - 1] = delimiter;
+						}
+					}
+
+					private int getTupleSize(BytesRefArrayWritable value2) {
+						int size=0;
+						//loop over rc column and add lengths
+						for(int i=0; i< value.size(); i++)
+						{
+							size += value.get(i).getLength();
+						}
+						//add delimeters bytes sizes
+						size += value.size() -1;
+						return size;
+					}
+
+					@Override
+					public int read() throws IOException {
+						throw new NotImplementedException("Use read(byte[], int, int");
+					}
+				};
+			}
+			else if (inputFormat.equals(HDFSAdapterFactory.INPUT_FORMAT_TEXT))
+			{
+				return new InputStream() {
+					private FSDataInputStream reader;
+					private String lastFileName = "";
+					private String newFileName;
+					private int EOL = "\n".getBytes()[0];
+					private int currentTupleIdx;
+					private int numberOfTuplesInCurrentFrame;
+					private long byteLocation;
+					private IFrameTupleAccessor tupleAccessor = new FrameTupleAccessor(ctx.getFrameSize(),inRecDesc);
+					private String value;
+					private String pendingValue = null;
+					private ByteBufferInputStream bbis = new ByteBufferInputStream();
+					private DataInputStream dis = new DataInputStream(bbis);
+
+					@Override
+					public int read(byte[] buffer, int offset, int len) throws IOException {
+						if(newFrame)
+						{
+							//first time called with this frame
+							//reset frame buffer
+							tupleAccessor.reset(frameBuffer);
+							//get number of tuples in frame
+							numberOfTuplesInCurrentFrame = tupleAccessor.getTupleCount();
+							//set tuple index to first tuple
+							currentTupleIdx = 0;
+							//set new frame to false
+							newFrame = false;
+						}
+
+						//check and see if there is a pending value
+						int numBytes = 0;
+						if (pendingValue != null) {
+							//last value didn't fit into buffer
+							int sizeOfNextTuple = pendingValue.length() + 1;
+							if(sizeOfNextTuple > len)
+							{
+								return 0;
+							}
+							//there is enough space
+							System.arraycopy(pendingValue.getBytes(), 0, buffer, offset + numBytes, pendingValue.length());
+							buffer[offset + numBytes + pendingValue.length()] = (byte) EOL;
+							numBytes += sizeOfNextTuple;
+							//set pending to false
+							pendingValue = null;
+							//move to next tuple
+							currentTupleIdx++;
+						}
+
+						//No pending value or done with pending value
+						//check if there are more tuples in the frame
+						while(currentTupleIdx < numberOfTuplesInCurrentFrame)
+						{
+							//get the fileName
+							bbis.setByteBuffer(frameBuffer, tupleAccessor.getTupleStartOffset(currentTupleIdx) + tupleAccessor.getFieldSlotsLength() + tupleAccessor.getFieldStartOffset(currentTupleIdx, 0));
+							newFileName = ((AString) inRecDesc.getFields()[0].deserialize(dis)).getStringValue();
+							//check if it is a new file
+							if(!lastFileName.equals(newFileName))
+							{
+								//new file
+								lastFileName = newFileName;
+								//close old file
+								if(reader != null)
+								{
+									reader.close();
+								}
+								//open new file
+								reader = fs.open(new Path(lastFileName));
+								//read byte location
+								bbis.setByteBuffer(frameBuffer, tupleAccessor.getTupleStartOffset(currentTupleIdx) + tupleAccessor.getFieldSlotsLength() + tupleAccessor.getFieldStartOffset(currentTupleIdx, 1));
+								byteLocation = ((AInt64) inRecDesc.getFields()[1].deserialize(dis)).getLongValue();
+								//seek
+								reader.seek(byteLocation);
+								//read record
+								value = reader.readLine();
+								//copy it to the buffer if there is enough space
+								int sizeOfNextTuple = value.length() + 1;
+								if(sizeOfNextTuple + numBytes > len)
+								{
+									//mark waiting value
+									pendingValue = value;
+									return numBytes;
+								}
+								System.arraycopy(value.getBytes(), 0, buffer, offset + numBytes, value.length());
+								buffer[offset + numBytes + value.length()] = (byte) EOL;
+								numBytes += sizeOfNextTuple;
+							}
+							else
+							{
+								//same file, just seek and read
+								//read byte location
+								bbis.setByteBuffer(frameBuffer, tupleAccessor.getTupleStartOffset(currentTupleIdx) + tupleAccessor.getFieldSlotsLength() + tupleAccessor.getFieldStartOffset(currentTupleIdx, 1));
+								byteLocation = ((AInt64) inRecDesc.getFields()[1].deserialize(dis)).getLongValue();
+								//seek
+								reader.seek(byteLocation);
+								//read record
+								value = reader.readLine();
+								//copy it to the buffer if there is enough space
+								int sizeOfNextTuple = value.length() + 1;
+								if(sizeOfNextTuple + numBytes > len)
+								{
+									//mark waiting value
+									pendingValue = value;
+									return numBytes;
+								}
+								System.arraycopy(value.getBytes(), 0, buffer, offset + numBytes, value.length());
+								buffer[offset + numBytes + value.length()] = (byte) EOL;
+								numBytes += sizeOfNextTuple;
+							}
+							currentTupleIdx++;
+						}
+						return (numBytes == 0) ? -1 : numBytes;
+					}
+
+					@Override
+					public int read() throws IOException {
+						throw new NotImplementedException("Use read(byte[], int, int");
+					}
+
+					@Override
+					public void close(){
+						try {
+							if (reader != null)
+							{
+								reader.close();
+							}
+							super.close();
+						} catch (IOException e) {
+							e.printStackTrace();
+						}
+					}
+
+				};
+			}
+			else if (inputFormat.equals(HDFSAdapterFactory.INPUT_FORMAT_SEQUENCE))
+			{
+				return new InputStream() {
+					private SequenceFile.Reader reader;
+					private Writable key;
+					private Text value;
+					private String lastFileName = "";
+					private String newFileName;
+					private long byteLocation;
+					private int EOL = "\n".getBytes()[0];
+					private int currentTupleIdx;
+					private int numberOfTuplesInCurrentFrame;
+					private IFrameTupleAccessor tupleAccessor = new FrameTupleAccessor(ctx.getFrameSize(),inRecDesc);
+					private Text pendingValue = null;
+					private ByteBufferInputStream bbis = new ByteBufferInputStream();
+					private DataInputStream dis = new DataInputStream(bbis);
+
+					@Override
+					public int read(byte[] buffer, int offset, int len) throws IOException {
+
+						if(newFrame)
+						{
+							//first time called with this frame
+							//reset frame buffer
+							tupleAccessor.reset(frameBuffer);
+							//get number of tuples in frame
+							numberOfTuplesInCurrentFrame = tupleAccessor.getTupleCount();
+							//set tuple index to first tuple
+							currentTupleIdx = 0;
+							//set new frame to false
+							newFrame = false;
+						}
+
+						//check and see if there is a pending value
+						//Double check this
+						int numBytes = 0;
+						if (pendingValue != null) {
+							//last value didn't fit into buffer
+							int sizeOfNextTuple = pendingValue.getLength() + 1;
+							if(sizeOfNextTuple > len)
+							{
+								return 0;
+							}
+							//there is enough space
+							System.arraycopy(pendingValue.getBytes(), 0, buffer, offset + numBytes, pendingValue.getLength());
+							buffer[offset + numBytes + pendingValue.getLength()] = (byte) EOL;
+							numBytes += sizeOfNextTuple;
+							//set pending to false
+							pendingValue = null;
+							//move to next tuple
+							currentTupleIdx++;
+						}
+
+						//No pending value or done with pending value
+						//check if there are more tuples in the frame
+						while(currentTupleIdx < numberOfTuplesInCurrentFrame)
+						{
+							//get the fileName]
+							bbis.setByteBuffer(frameBuffer, tupleAccessor.getTupleStartOffset(currentTupleIdx) + tupleAccessor.getFieldSlotsLength() + tupleAccessor.getFieldStartOffset(currentTupleIdx, 0));
+							newFileName = ((AString) inRecDesc.getFields()[0].deserialize(dis)).getStringValue();
+							//check if it is a new file
+							if(!lastFileName.equals(newFileName))
+							{
+								//new file
+								lastFileName = newFileName;
+								//close old file
+								if(reader != null)
+								{
+									reader.close();
+								}
+								//open new file
+								reader = new SequenceFile.Reader(fs,new Path(lastFileName),conf);
+								//read byte location
+								bbis.setByteBuffer(frameBuffer, tupleAccessor.getTupleStartOffset(currentTupleIdx) + tupleAccessor.getFieldSlotsLength() + tupleAccessor.getFieldStartOffset(currentTupleIdx, 1));
+								byteLocation = ((AInt64) inRecDesc.getFields()[1].deserialize(dis)).getLongValue();
+								//seek
+								reader.seek(byteLocation);
+								//read record
+								reader.next(key, value);
+								//copy it to the buffer if there is enough space
+								int sizeOfNextTuple = value.getLength() + 1;
+								if(sizeOfNextTuple + numBytes > len)
+								{
+									//mark waiting value
+									pendingValue = value;
+									return numBytes;
+								}
+								System.arraycopy(value.getBytes(), 0, buffer, offset + numBytes, value.getLength());
+								buffer[offset + numBytes + value.getLength()] = (byte) EOL;
+								numBytes += sizeOfNextTuple;
+							}
+							else
+							{
+								//same file, just seek and read
+								//read byte location
+								bbis.setByteBuffer(frameBuffer, tupleAccessor.getTupleStartOffset(currentTupleIdx) + tupleAccessor.getFieldSlotsLength() + tupleAccessor.getFieldStartOffset(currentTupleIdx, 1));
+								byteLocation = ((AInt64) inRecDesc.getFields()[1].deserialize(dis)).getLongValue();
+								//seek
+								reader.seek(byteLocation);
+								//read record
+								reader.next(key, value);
+								//copy it to the buffer if there is enough space
+								int sizeOfNextTuple = value.getLength() + 1;
+								if(sizeOfNextTuple + numBytes > len)
+								{
+									//mark waiting value
+									pendingValue = value;
+									return numBytes;
+								}
+								System.arraycopy(value.getBytes(), 0, buffer, offset + numBytes, value.getLength());
+								buffer[offset + numBytes + value.getLength()] = (byte) EOL;
+								numBytes += sizeOfNextTuple;
+							}
+							currentTupleIdx++;
+						}
+						return (numBytes == 0) ? -1 : numBytes;
+					}
+
+					@Override
+					public int read() throws IOException {
+						throw new NotImplementedException("Use read(byte[], int, int");
+					}
+
+					@Override
+					public void close(){
+						try {
+							if (reader != null)
+							{
+								reader.close();
+							}
+							super.close();
+						} catch (IOException e) {
+							e.printStackTrace();
+						}
+					}
+				};
+			}
+			//unknow format
+			throw new IOException("Unknown input format");
+		}
+		else
+		{
+			//optimized
+			//different input stream implementation based on the input format
+			if(inputFormat.equals(HDFSAdapterFactory.INPUT_FORMAT_RC))
+			{
+				return new InputStream() {
+					private RCFile.Reader reader;
+					private int rowDifference;
+					private int lastFileNumber = -1;
+					private int newFileNumber = 0;
+					private long lastByteLocation = 0;
+					private long newByteLocation = 0;
+					private int lastRowNumber = 0;
+					private int newRowNumber = 0;
+					private LongWritable key;
+					private BytesRefArrayWritable value;
+					private int EOL = "\n".getBytes()[0];
+					private byte delimiter = 0x01;
+					private boolean pendingValue = false;
+					private int currentTupleIdx;
+					private int numberOfTuplesInCurrentFrame;
+					private IFrameTupleAccessor tupleAccessor = new FrameTupleAccessor(ctx.getFrameSize(),inRecDesc);
+					private ByteBufferInputStream bbis = new ByteBufferInputStream();
+					private DataInputStream dis = new DataInputStream(bbis);
+
+					@Override
+					public void close()
+					{
+						if (reader != null)
+						{
+							reader.close();
+						}
+						try {
+							super.close();
+						} catch (IOException e) {
+							e.printStackTrace();
+						}
+					}
+
+					@Override
+					public int read(byte[] buffer, int offset, int len) throws IOException {
+						if(newFrame)
+						{
+							//first time called with this frame
+							//reset frame buffer
+							tupleAccessor.reset(frameBuffer);
+							//get number of tuples in frame
+							numberOfTuplesInCurrentFrame = tupleAccessor.getTupleCount();
+							//set tuple index to first tuple
+							currentTupleIdx = 0;
+							//set new frame to false
+							newFrame = false;
+							pendingValue = false;
+						}
+
+						//check and see if there is a pending value
+						//Double check this
+						int numBytes = 0;
+						if (pendingValue) {
+							//last value didn't fit into buffer
+							int sizeOfNextTuple = getTupleSize(value) + 1;
+							if(sizeOfNextTuple > len)
+							{
+								return 0;
+							}
+							copyCurrentTuple(buffer, offset + numBytes);
+							buffer[offset + numBytes + sizeOfNextTuple - 1] = (byte) EOL;
+							numBytes += sizeOfNextTuple;
+							//set pending to false
+							pendingValue = false;
+							//move to next tuple
+							currentTupleIdx++;
+						}
+
+						//No pending value or done with pending value
+						//check if there are more tuples in the frame
+						while(currentTupleIdx < numberOfTuplesInCurrentFrame)
+						{
+							//get 3 things from the current tuple in the frame(File name, byte location and row number)
+							//get the fileName
+							bbis.setByteBuffer(frameBuffer, tupleAccessor.getTupleStartOffset(currentTupleIdx) + tupleAccessor.getFieldSlotsLength() + tupleAccessor.getFieldStartOffset(currentTupleIdx, 0));
+							newFileNumber = ((AInt32) inRecDesc.getFields()[0].deserialize(dis)).getIntegerValue();
+							//check if it is a new file
+							if(lastFileNumber != newFileNumber)
+							{
+								//new file
+								lastFileNumber = newFileNumber;
+								//close old file
+								if(reader != null)
+								{
+									reader.close();
+								}
+								//open new file
+								reader = new Reader(fs, new Path(files.get(newFileNumber)), conf);
+								//read and save byte location
+								bbis.setByteBuffer(frameBuffer, tupleAccessor.getTupleStartOffset(currentTupleIdx) + tupleAccessor.getFieldSlotsLength() + tupleAccessor.getFieldStartOffset(currentTupleIdx, 1));
+								lastByteLocation = ((AInt64) inRecDesc.getFields()[1].deserialize(dis)).getLongValue();
+								//seek
+								reader.seek(lastByteLocation);
+								//read and save rowNumber
+								bbis.setByteBuffer(frameBuffer, tupleAccessor.getTupleStartOffset(currentTupleIdx) + tupleAccessor.getFieldSlotsLength() + tupleAccessor.getFieldStartOffset(currentTupleIdx, 2));
+								lastRowNumber = ((AInt32)(inRecDesc.getFields()[2].deserialize(dis))).getIntegerValue();
+								//loop until row
+								for(int i=0; i < lastRowNumber; i++)
+								{
+									//this loop perform a single I/O and move to the next record in the block which is already in memory
+									//if no more records in the current block, it perform another I/O and get the next block
+									//<this should never happen here>
 									reader.next(key);
 								}
 								//read record
 								reader.getCurrentRow(value);
-
 								//copy it to the buffer if there is enough space
 								int sizeOfNextTuple = getTupleSize(value) + 1;
 								if(sizeOfNextTuple + numBytes > len)
@@ -352,316 +779,387 @@
 								buffer[offset + numBytes + sizeOfNextTuple - 1] = (byte) EOL;
 								numBytes += sizeOfNextTuple;
 							}
-						}
-						//move to next tuple
-						currentTupleIdx++;
-					}	
-					//no more tuples in frame
-					return (numBytes == 0) ? -1 : numBytes;
-				}
+							else
+							{
+								//same file
+								//get the byte location
+								bbis.setByteBuffer(frameBuffer, tupleAccessor.getTupleStartOffset(currentTupleIdx) + tupleAccessor.getFieldSlotsLength() + tupleAccessor.getFieldStartOffset(currentTupleIdx, 1));
+								newByteLocation = ((AInt64) inRecDesc.getFields()[1].deserialize(dis)).getLongValue();
 
-				private void copyCurrentTuple(byte[] buffer, int offset) throws IOException {
-					int rcOffset = 0;
-					for(int i=0; i< value.size(); i++)
-					{
-						System.arraycopy(value.get(i).getData(), value.get(i).getStart(), buffer, offset + rcOffset, value.get(i).getLength());
-						rcOffset += value.get(i).getLength() + 1;
-						buffer[rcOffset - 1] = delimiter;
-					}
-				}
+								//check if same block
+								if(lastByteLocation != newByteLocation)
+								{
+									//new block
+									lastByteLocation = newByteLocation;
+									//seek
+									reader.seek(lastByteLocation);
+									//read and save rowNumber
+									bbis.setByteBuffer(frameBuffer, tupleAccessor.getTupleStartOffset(currentTupleIdx) + tupleAccessor.getFieldSlotsLength() + tupleAccessor.getFieldStartOffset(currentTupleIdx, 2));
+									lastRowNumber = ((AInt32)(inRecDesc.getFields()[2].deserialize(dis))).getIntegerValue();
+									//loop until row
+									for(int i=0; i < lastRowNumber; i++)
+									{
+										reader.next(key);
+									}
+									//read record
+									reader.getCurrentRow(value);
+									//copy it to the buffer if there is enough space
+									int sizeOfNextTuple = getTupleSize(value) + 1;
+									if(sizeOfNextTuple + numBytes > len)
+									{
+										//mark waiting value
+										pendingValue = true;
+										return numBytes;
+									}
+									copyCurrentTuple(buffer, offset + numBytes);
+									buffer[offset + numBytes + sizeOfNextTuple - 1] = (byte) EOL;
+									numBytes += sizeOfNextTuple;
+								}
+								else
+								{
+									//same block
+									//get the row number
+									bbis.setByteBuffer(frameBuffer, tupleAccessor.getTupleStartOffset(currentTupleIdx) + tupleAccessor.getFieldSlotsLength() + tupleAccessor.getFieldStartOffset(currentTupleIdx, 2));
+									newRowNumber = ((AInt32)(inRecDesc.getFields()[2].deserialize(dis))).getIntegerValue();
 
-				private int getTupleSize(BytesRefArrayWritable value2) {
-					int size=0;
-					//loop over rc column and add lengths
-					for(int i=0; i< value.size(); i++)
-					{
-						size += value.get(i).getLength();
-					}
-					//add delimeters bytes sizes
-					size += value.size() -1;
-					return size;
-				}
+									//calculate row difference
+									rowDifference = newRowNumber - lastRowNumber;
 
-				@Override
-				public int read() throws IOException {
-					throw new NotImplementedException("Use read(byte[], int, int");
-				}
-			};
-		}
-		else if (inputFormat.equals(HDFSAdapterFactory.INPUT_FORMAT_TEXT))
-		{
-			return new InputStream() {
-				private FSDataInputStream reader;
-				private String lastFileName = "";
-				private String newFileName;
-				private int EOL = "\n".getBytes()[0];
-				private int currentTupleIdx;
-				private int numberOfTuplesInCurrentFrame;
-				private long byteLocation;
-				private IFrameTupleAccessor tupleAccessor = new FrameTupleAccessor(ctx.getFrameSize(),inRecDesc);
-				private String value;
-				private String pendingValue = null;
-				private ByteBufferInputStream bbis = new ByteBufferInputStream();
-				private DataInputStream dis = new DataInputStream(bbis);
+									//update last row number
+									lastRowNumber = newRowNumber;
 
-				@Override
-				public int read(byte[] buffer, int offset, int len) throws IOException {
-					if(newFrame)
-					{
-						//first time called with this frame
-						//reset frame buffer
-						tupleAccessor.reset(frameBuffer);
-						//get number of tuples in frame
-						numberOfTuplesInCurrentFrame = tupleAccessor.getTupleCount();
-						//set tuple index to first tuple
-						currentTupleIdx = 0;
-						//set new frame to false
-						newFrame = false;
+									//move to the new row
+									for(int i=0; i < rowDifference; i++)
+									{
+										reader.next(key);
+									}
+									//read record
+									reader.getCurrentRow(value);
+
+									//copy it to the buffer if there is enough space
+									int sizeOfNextTuple = getTupleSize(value) + 1;
+									if(sizeOfNextTuple + numBytes > len)
+									{
+										//mark waiting value
+										pendingValue = true;
+										return numBytes;
+									}
+									copyCurrentTuple(buffer, offset + numBytes);
+									buffer[offset + numBytes + sizeOfNextTuple - 1] = (byte) EOL;
+									numBytes += sizeOfNextTuple;
+								}
+							}
+							//move to next tuple
+							currentTupleIdx++;
+						}	
+						//no more tuples in frame
+						return (numBytes == 0) ? -1 : numBytes;
 					}
 
-					//check and see if there is a pending value
-					int numBytes = 0;
-					if (pendingValue != null) {
-						//last value didn't fit into buffer
-						int sizeOfNextTuple = pendingValue.length() + 1;
-						if(sizeOfNextTuple > len)
+					private void copyCurrentTuple(byte[] buffer, int offset) throws IOException {
+						int rcOffset = 0;
+						for(int i=0; i< value.size(); i++)
 						{
-							return 0;
+							System.arraycopy(value.get(i).getData(), value.get(i).getStart(), buffer, offset + rcOffset, value.get(i).getLength());
+							rcOffset += value.get(i).getLength() + 1;
+							buffer[rcOffset - 1] = delimiter;
 						}
-						//there is enough space
-						System.arraycopy(pendingValue.getBytes(), 0, buffer, offset + numBytes, pendingValue.length());
-						buffer[offset + numBytes + pendingValue.length()] = (byte) EOL;
-						numBytes += sizeOfNextTuple;
-						//set pending to false
-						pendingValue = null;
-						//move to next tuple
-						currentTupleIdx++;
 					}
-					
-					//No pending value or done with pending value
-					//check if there are more tuples in the frame
-					while(currentTupleIdx < numberOfTuplesInCurrentFrame)
-					{
-						//get the fileName
-						bbis.setByteBuffer(frameBuffer, tupleAccessor.getTupleStartOffset(currentTupleIdx) + tupleAccessor.getFieldSlotsLength() + tupleAccessor.getFieldStartOffset(currentTupleIdx, 0));
-						newFileName = ((AString) inRecDesc.getFields()[0].deserialize(dis)).getStringValue();
-						//check if it is a new file
-						if(!lastFileName.equals(newFileName))
+
+					private int getTupleSize(BytesRefArrayWritable value2) {
+						int size=0;
+						//loop over rc column and add lengths
+						for(int i=0; i< value.size(); i++)
 						{
-							//new file
-							lastFileName = newFileName;
-							//close old file
-							if(reader != null)
+							size += value.get(i).getLength();
+						}
+						//add delimeters bytes sizes
+						size += value.size() -1;
+						return size;
+					}
+
+					@Override
+					public int read() throws IOException {
+						throw new NotImplementedException("Use read(byte[], int, int");
+					}
+				};
+			}
+			else if (inputFormat.equals(HDFSAdapterFactory.INPUT_FORMAT_TEXT))
+			{
+				return new InputStream() {
+					private FSDataInputStream reader;
+					private int lastFileNumber = -1;
+					private int newFileNumber = 0;
+					private int EOL = "\n".getBytes()[0];
+					private int currentTupleIdx;
+					private int numberOfTuplesInCurrentFrame;
+					private long byteLocation;
+					private IFrameTupleAccessor tupleAccessor = new FrameTupleAccessor(ctx.getFrameSize(),inRecDesc);
+					private String value;
+					private String pendingValue = null;
+					private ByteBufferInputStream bbis = new ByteBufferInputStream();
+					private DataInputStream dis = new DataInputStream(bbis);
+
+					@Override
+					public int read(byte[] buffer, int offset, int len) throws IOException {
+						if(newFrame)
+						{
+							//first time called with this frame
+							//reset frame buffer
+							tupleAccessor.reset(frameBuffer);
+							//get number of tuples in frame
+							numberOfTuplesInCurrentFrame = tupleAccessor.getTupleCount();
+							//set tuple index to first tuple
+							currentTupleIdx = 0;
+							//set new frame to false
+							newFrame = false;
+						}
+
+						//check and see if there is a pending value
+						int numBytes = 0;
+						if (pendingValue != null) {
+							//last value didn't fit into buffer
+							int sizeOfNextTuple = pendingValue.length() + 1;
+							if(sizeOfNextTuple > len)
+							{
+								return 0;
+							}
+							//there is enough space
+							System.arraycopy(pendingValue.getBytes(), 0, buffer, offset + numBytes, pendingValue.length());
+							buffer[offset + numBytes + pendingValue.length()] = (byte) EOL;
+							numBytes += sizeOfNextTuple;
+							//set pending to false
+							pendingValue = null;
+							//move to next tuple
+							currentTupleIdx++;
+						}
+
+						//No pending value or done with pending value
+						//check if there are more tuples in the frame
+						while(currentTupleIdx < numberOfTuplesInCurrentFrame)
+						{
+							//get the file number
+							bbis.setByteBuffer(frameBuffer, tupleAccessor.getTupleStartOffset(currentTupleIdx) + tupleAccessor.getFieldSlotsLength() + tupleAccessor.getFieldStartOffset(currentTupleIdx, 0));
+							newFileNumber = ((AInt32) inRecDesc.getFields()[0].deserialize(dis)).getIntegerValue();
+							//check if it is a new file
+							if(lastFileNumber != newFileNumber)
+							{
+								//new file
+								lastFileNumber = newFileNumber;
+								//close old file
+								if(reader != null)
+								{
+									reader.close();
+								}
+								
+								//open new file
+								reader = fs.open(new Path(files.get(newFileNumber)));
+								//read byte location
+								bbis.setByteBuffer(frameBuffer, tupleAccessor.getTupleStartOffset(currentTupleIdx) + tupleAccessor.getFieldSlotsLength() + tupleAccessor.getFieldStartOffset(currentTupleIdx, 1));
+								byteLocation = ((AInt64) inRecDesc.getFields()[1].deserialize(dis)).getLongValue();
+								//seek
+								reader.seek(byteLocation);
+								//read record
+								value = reader.readLine();
+								//copy it to the buffer if there is enough space
+								int sizeOfNextTuple = value.length() + 1;
+								if(sizeOfNextTuple + numBytes > len)
+								{
+									//mark waiting value
+									pendingValue = value;
+									return numBytes;
+								}
+								System.arraycopy(value.getBytes(), 0, buffer, offset + numBytes, value.length());
+								buffer[offset + numBytes + value.length()] = (byte) EOL;
+								numBytes += sizeOfNextTuple;
+							}
+							else
+							{
+								//same file, just seek and read
+								//read byte location
+								bbis.setByteBuffer(frameBuffer, tupleAccessor.getTupleStartOffset(currentTupleIdx) + tupleAccessor.getFieldSlotsLength() + tupleAccessor.getFieldStartOffset(currentTupleIdx, 1));
+								byteLocation = ((AInt64) inRecDesc.getFields()[1].deserialize(dis)).getLongValue();
+								//seek
+								reader.seek(byteLocation);
+								//read record
+								value = reader.readLine();
+								//copy it to the buffer if there is enough space
+								int sizeOfNextTuple = value.length() + 1;
+								if(sizeOfNextTuple + numBytes > len)
+								{
+									//mark waiting value
+									pendingValue = value;
+									return numBytes;
+								}
+								System.arraycopy(value.getBytes(), 0, buffer, offset + numBytes, value.length());
+								buffer[offset + numBytes + value.length()] = (byte) EOL;
+								numBytes += sizeOfNextTuple;
+							}
+							currentTupleIdx++;
+						}
+						return (numBytes == 0) ? -1 : numBytes;
+					}
+
+					@Override
+					public int read() throws IOException {
+						throw new NotImplementedException("Use read(byte[], int, int");
+					}
+
+					@Override
+					public void close(){
+						try {
+							if (reader != null)
 							{
 								reader.close();
 							}
-							//open new file
-							reader = fs.open(new Path(lastFileName));
-							//read byte location
-							bbis.setByteBuffer(frameBuffer, tupleAccessor.getTupleStartOffset(currentTupleIdx) + tupleAccessor.getFieldSlotsLength() + tupleAccessor.getFieldStartOffset(currentTupleIdx, 1));
-							byteLocation = ((AInt64) inRecDesc.getFields()[1].deserialize(dis)).getLongValue();
-							//seek
-							reader.seek(byteLocation);
-							//read record
-							value = reader.readLine();
-							//copy it to the buffer if there is enough space
-							int sizeOfNextTuple = value.length() + 1;
-							if(sizeOfNextTuple + numBytes > len)
+							super.close();
+						} catch (IOException e) {
+							e.printStackTrace();
+						}
+					}
+
+				};
+			}
+			else if (inputFormat.equals(HDFSAdapterFactory.INPUT_FORMAT_SEQUENCE))
+			{
+				return new InputStream() {
+					private SequenceFile.Reader reader;
+					private Writable key;
+					private Text value;
+					private int lastFileNumber = -1;
+					private int newFileNumber = 0;
+					private long byteLocation;
+					private int EOL = "\n".getBytes()[0];
+					private int currentTupleIdx;
+					private int numberOfTuplesInCurrentFrame;
+					private IFrameTupleAccessor tupleAccessor = new FrameTupleAccessor(ctx.getFrameSize(),inRecDesc);
+					private Text pendingValue = null;
+					private ByteBufferInputStream bbis = new ByteBufferInputStream();
+					private DataInputStream dis = new DataInputStream(bbis);
+
+					@Override
+					public int read(byte[] buffer, int offset, int len) throws IOException {
+
+						if(newFrame)
+						{
+							//first time called with this frame
+							//reset frame buffer
+							tupleAccessor.reset(frameBuffer);
+							//get number of tuples in frame
+							numberOfTuplesInCurrentFrame = tupleAccessor.getTupleCount();
+							//set tuple index to first tuple
+							currentTupleIdx = 0;
+							//set new frame to false
+							newFrame = false;
+						}
+
+						//check and see if there is a pending value
+						//Double check this
+						int numBytes = 0;
+						if (pendingValue != null) {
+							//last value didn't fit into buffer
+							int sizeOfNextTuple = pendingValue.getLength() + 1;
+							if(sizeOfNextTuple > len)
 							{
-								//mark waiting value
-								pendingValue = value;
-								return numBytes;
+								return 0;
 							}
-							System.arraycopy(value.getBytes(), 0, buffer, offset + numBytes, value.length());
-							buffer[offset + numBytes + value.length()] = (byte) EOL;
+							//there is enough space
+							System.arraycopy(pendingValue.getBytes(), 0, buffer, offset + numBytes, pendingValue.getLength());
+							buffer[offset + numBytes + pendingValue.getLength()] = (byte) EOL;
 							numBytes += sizeOfNextTuple;
+							//set pending to false
+							pendingValue = null;
+							//move to next tuple
+							currentTupleIdx++;
 						}
-						else
+
+						//No pending value or done with pending value
+						//check if there are more tuples in the frame
+						while(currentTupleIdx < numberOfTuplesInCurrentFrame)
 						{
-							//same file, just seek and read
-							//read byte location
-							bbis.setByteBuffer(frameBuffer, tupleAccessor.getTupleStartOffset(currentTupleIdx) + tupleAccessor.getFieldSlotsLength() + tupleAccessor.getFieldStartOffset(currentTupleIdx, 1));
-							byteLocation = ((AInt64) inRecDesc.getFields()[1].deserialize(dis)).getLongValue();
-							//seek
-							reader.seek(byteLocation);
-							//read record
-							value = reader.readLine();
-							//copy it to the buffer if there is enough space
-							int sizeOfNextTuple = value.length() + 1;
-							if(sizeOfNextTuple + numBytes > len)
+							//get the fileName]
+							bbis.setByteBuffer(frameBuffer, tupleAccessor.getTupleStartOffset(currentTupleIdx) + tupleAccessor.getFieldSlotsLength() + tupleAccessor.getFieldStartOffset(currentTupleIdx, 0));
+							newFileNumber = ((AInt32) inRecDesc.getFields()[0].deserialize(dis)).getIntegerValue();
+							//check if it is a new file
+							if(lastFileNumber != newFileNumber)
 							{
-								//mark waiting value
-								pendingValue = value;
-								return numBytes;
+								//new file
+								lastFileNumber = newFileNumber;
+								//close old file
+								if(reader != null)
+								{
+									reader.close();
+								}
+								//open new file
+								reader = new SequenceFile.Reader(fs,new Path(files.get(newFileNumber)),conf);
+								//read byte location
+								bbis.setByteBuffer(frameBuffer, tupleAccessor.getTupleStartOffset(currentTupleIdx) + tupleAccessor.getFieldSlotsLength() + tupleAccessor.getFieldStartOffset(currentTupleIdx, 1));
+								byteLocation = ((AInt64) inRecDesc.getFields()[1].deserialize(dis)).getLongValue();
+								//seek
+								reader.seek(byteLocation);
+								//read record
+								reader.next(key, value);
+								//copy it to the buffer if there is enough space
+								int sizeOfNextTuple = value.getLength() + 1;
+								if(sizeOfNextTuple + numBytes > len)
+								{
+									//mark waiting value
+									pendingValue = value;
+									return numBytes;
+								}
+								System.arraycopy(value.getBytes(), 0, buffer, offset + numBytes, value.getLength());
+								buffer[offset + numBytes + value.getLength()] = (byte) EOL;
+								numBytes += sizeOfNextTuple;
 							}
-							System.arraycopy(value.getBytes(), 0, buffer, offset + numBytes, value.length());
-							buffer[offset + numBytes + value.length()] = (byte) EOL;
-							numBytes += sizeOfNextTuple;
+							else
+							{
+								//same file, just seek and read
+								//read byte location
+								bbis.setByteBuffer(frameBuffer, tupleAccessor.getTupleStartOffset(currentTupleIdx) + tupleAccessor.getFieldSlotsLength() + tupleAccessor.getFieldStartOffset(currentTupleIdx, 1));
+								byteLocation = ((AInt64) inRecDesc.getFields()[1].deserialize(dis)).getLongValue();
+								//seek
+								reader.seek(byteLocation);
+								//read record
+								reader.next(key, value);
+								//copy it to the buffer if there is enough space
+								int sizeOfNextTuple = value.getLength() + 1;
+								if(sizeOfNextTuple + numBytes > len)
+								{
+									//mark waiting value
+									pendingValue = value;
+									return numBytes;
+								}
+								System.arraycopy(value.getBytes(), 0, buffer, offset + numBytes, value.getLength());
+								buffer[offset + numBytes + value.getLength()] = (byte) EOL;
+								numBytes += sizeOfNextTuple;
+							}
+							currentTupleIdx++;
 						}
-						currentTupleIdx++;
-					}
-							return (numBytes == 0) ? -1 : numBytes;
-				}
-
-				@Override
-				public int read() throws IOException {
-					throw new NotImplementedException("Use read(byte[], int, int");
-				}
-
-				@Override
-				public void close(){
-					try {
-						if (reader != null)
-						{
-							reader.close();
-						}
-						super.close();
-					} catch (IOException e) {
-						e.printStackTrace();
-					}
-				}
-
-			};
-		}
-		else if (inputFormat.equals(HDFSAdapterFactory.INPUT_FORMAT_SEQUENCE))
-		{
-			return new InputStream() {
-				private SequenceFile.Reader reader;
-				private Writable key;
-				private Text value;
-				private String lastFileName = "";
-				private String newFileName;
-				private long byteLocation;
-				private int EOL = "\n".getBytes()[0];
-				private int currentTupleIdx;
-				private int numberOfTuplesInCurrentFrame;
-				private IFrameTupleAccessor tupleAccessor = new FrameTupleAccessor(ctx.getFrameSize(),inRecDesc);
-				private Text pendingValue = null;
-				private ByteBufferInputStream bbis = new ByteBufferInputStream();
-				private DataInputStream dis = new DataInputStream(bbis);
-
-				@Override
-				public int read(byte[] buffer, int offset, int len) throws IOException {
-					
-					if(newFrame)
-					{
-						//first time called with this frame
-						//reset frame buffer
-						tupleAccessor.reset(frameBuffer);
-						//get number of tuples in frame
-						numberOfTuplesInCurrentFrame = tupleAccessor.getTupleCount();
-						//set tuple index to first tuple
-						currentTupleIdx = 0;
-						//set new frame to false
-						newFrame = false;
+						return (numBytes == 0) ? -1 : numBytes;
 					}
 
-					//check and see if there is a pending value
-					//Double check this
-					int numBytes = 0;
-					if (pendingValue != null) {
-						//last value didn't fit into buffer
-						int sizeOfNextTuple = pendingValue.getLength() + 1;
-						if(sizeOfNextTuple > len)
-						{
-							return 0;
-						}
-						//there is enough space
-						System.arraycopy(pendingValue.getBytes(), 0, buffer, offset + numBytes, pendingValue.getLength());
-						buffer[offset + numBytes + pendingValue.getLength()] = (byte) EOL;
-						numBytes += sizeOfNextTuple;
-						//set pending to false
-						pendingValue = null;
-						//move to next tuple
-						currentTupleIdx++;
+					@Override
+					public int read() throws IOException {
+						throw new NotImplementedException("Use read(byte[], int, int");
 					}
 
-					//No pending value or done with pending value
-					//check if there are more tuples in the frame
-					while(currentTupleIdx < numberOfTuplesInCurrentFrame)
-					{
-						//get the fileName]
-						bbis.setByteBuffer(frameBuffer, tupleAccessor.getTupleStartOffset(currentTupleIdx) + tupleAccessor.getFieldSlotsLength() + tupleAccessor.getFieldStartOffset(currentTupleIdx, 0));
-						newFileName = ((AString) inRecDesc.getFields()[0].deserialize(dis)).getStringValue();
-						//check if it is a new file
-						if(!lastFileName.equals(newFileName))
-						{
-							//new file
-							lastFileName = newFileName;
-							//close old file
-							if(reader != null)
+					@Override
+					public void close(){
+						try {
+							if (reader != null)
 							{
 								reader.close();
 							}
-							//open new file
-							reader = new SequenceFile.Reader(fs,new Path(lastFileName),conf);
-							//read byte location
-							bbis.setByteBuffer(frameBuffer, tupleAccessor.getTupleStartOffset(currentTupleIdx) + tupleAccessor.getFieldSlotsLength() + tupleAccessor.getFieldStartOffset(currentTupleIdx, 1));
-							byteLocation = ((AInt64) inRecDesc.getFields()[1].deserialize(dis)).getLongValue();
-							//seek
-							reader.seek(byteLocation);
-							//read record
-							reader.next(key, value);
-							//copy it to the buffer if there is enough space
-							int sizeOfNextTuple = value.getLength() + 1;
-							if(sizeOfNextTuple + numBytes > len)
-							{
-								//mark waiting value
-								pendingValue = value;
-								return numBytes;
-							}
-							System.arraycopy(value.getBytes(), 0, buffer, offset + numBytes, value.getLength());
-							buffer[offset + numBytes + value.getLength()] = (byte) EOL;
-							numBytes += sizeOfNextTuple;
+							super.close();
+						} catch (IOException e) {
+							e.printStackTrace();
 						}
-						else
-						{
-							//same file, just seek and read
-							//read byte location
-							bbis.setByteBuffer(frameBuffer, tupleAccessor.getTupleStartOffset(currentTupleIdx) + tupleAccessor.getFieldSlotsLength() + tupleAccessor.getFieldStartOffset(currentTupleIdx, 1));
-							byteLocation = ((AInt64) inRecDesc.getFields()[1].deserialize(dis)).getLongValue();
-							//seek
-							reader.seek(byteLocation);
-							//read record
-							reader.next(key, value);
-							//copy it to the buffer if there is enough space
-							int sizeOfNextTuple = value.getLength() + 1;
-							if(sizeOfNextTuple + numBytes > len)
-							{
-								//mark waiting value
-								pendingValue = value;
-								return numBytes;
-							}
-							System.arraycopy(value.getBytes(), 0, buffer, offset + numBytes, value.getLength());
-							buffer[offset + numBytes + value.getLength()] = (byte) EOL;
-							numBytes += sizeOfNextTuple;
-						}
-						currentTupleIdx++;
 					}
-							return (numBytes == 0) ? -1 : numBytes;
-				}
-
-				@Override
-				public int read() throws IOException {
-					throw new NotImplementedException("Use read(byte[], int, int");
-				}
-
-				@Override
-				public void close(){
-					try {
-						if (reader != null)
-						{
-							reader.close();
-						}
-						super.close();
-					} catch (IOException e) {
-						e.printStackTrace();
-					}
-				}
-			};
+				};
+			}
+			//unknow format
+			throw new IOException("Unknown input format");
 		}
-		//unknow format
-		throw new IOException("Unknown input format");
 	}
 
 	@Override
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSIndexingAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSIndexingAdapter.java
index 240621b..59b39c5 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSIndexingAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSIndexingAdapter.java
@@ -16,12 +16,11 @@
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
 import org.apache.hadoop.mapred.TextInputFormat;
 import org.apache.hadoop.mapred.Counters.Counter;
-import org.apache.log4j.Logger;
-
 import edu.uci.ics.asterix.om.types.IAType;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 
 
 /**
@@ -34,34 +33,33 @@
 public class HDFSIndexingAdapter extends FileSystemBasedAdapter {
 
 	private static final long serialVersionUID = 1L;
-	public static final Logger LOGGER = Logger.getLogger(HDFSIndexingAdapter.class.getName());
 	private transient String[] readSchedule;
 	private transient boolean executed[];
 	private transient InputSplit[] inputSplits;
 	private transient JobConf conf;
 	private transient AlgebricksPartitionConstraint clusterLocations;
-
+	private final Map<String,Integer> files;
 	private transient String nodeName;
-
 	public static final byte[] fileNameFieldNameWithRecOpeningBraces = "{\"_file-name\":\"".getBytes();
+	public static final byte[] fileNameFieldClosingQuotation = "\"".getBytes();
+	public static final byte[] fileNumberFieldNameWithRecOpeningBraces = "{\"_file-number\":".getBytes();
 	public static final byte[] bytelocationFieldName = ",\"_byte-location\":".getBytes();
 	public static final byte[] bytelocationValueEnd = "i64,".getBytes();
 
 	public HDFSIndexingAdapter(IAType atype, String[] readSchedule, boolean[] executed, InputSplit[] inputSplits, JobConf conf,
-			AlgebricksPartitionConstraint clusterLocations) {
+			AlgebricksPartitionConstraint clusterLocations, Map<String,Integer> files) {
 		super(atype);
 		this.readSchedule = readSchedule;
 		this.executed = executed;
 		this.inputSplits = inputSplits;
 		this.conf = conf;
 		this.clusterLocations = clusterLocations;
+		this.files = files;
 	}
 
 	@Override
 	public void configure(Map<String, Object> arguments) throws Exception {
-		LOGGER.info("Configuring the adapter, why does it disappear");
 		this.configuration = arguments;
-		LOGGER.info("Configuring format");
 		configureFormat();
 	}
 
@@ -115,147 +113,84 @@
 
 	@Override
 	public InputStream getInputStream(int partition) throws IOException {
-		LOGGER.info("Creating the input stream in node: "+ nodeName);
-		//LOGGER.info("Number of input splits found = "+ inputSplits.length);
-		if(conf.getInputFormat() instanceof RCFileInputFormat)
+		if(files == null)
 		{
-			//indexing rc input format
-			return new InputStream() {
+			if(conf.getInputFormat() instanceof RCFileInputFormat)
+			{
+				//indexing rc input format
+				return new InputStream() {
 
-				private RecordReader<LongWritable, BytesRefArrayWritable> reader;
-				private LongWritable key;
-				private BytesRefArrayWritable value;
-				private boolean hasMore = false;
-				private int EOL = "\n".getBytes()[0];
-				private byte delimiter = 0x01;
-				private boolean pendingValue = false;
-				private int currentSplitIndex = 0;
-				private byte[] fileName;
-				private byte[] byteLocation;
-				private byte[] rowNumberBytes;
-				private long blockByteLocation;
-				private long NextblockByteLocation;
-				private int rowNumber;
-				
-				@SuppressWarnings("unchecked")
-				private boolean moveToNext() throws IOException {
-					for (; currentSplitIndex < inputSplits.length; currentSplitIndex++) {
-						/**
-						 * read all the partitions scheduled to the current node
-						 */
-						if (readSchedule[currentSplitIndex].equals(nodeName)) {
+					private RecordReader<LongWritable, BytesRefArrayWritable> reader;
+					private LongWritable key;
+					private BytesRefArrayWritable value;
+					private boolean hasMore = false;
+					private int EOL = "\n".getBytes()[0];
+					private byte delimiter = 0x01;
+					private boolean pendingValue = false;
+					private int currentSplitIndex = 0;
+					private byte[] fileName;
+					private byte[] byteLocation;
+					private byte[] rowNumberBytes;
+					private long blockByteLocation;
+					private long NextblockByteLocation;
+					private int rowNumber;
+
+					@SuppressWarnings("unchecked")
+					private boolean moveToNext() throws IOException {
+						for (; currentSplitIndex < inputSplits.length; currentSplitIndex++) {
 							/**
-							 * pick an unread split to read
-							 * synchronize among simultaneous partitions in the same machine
+							 * read all the partitions scheduled to the current node
 							 */
-							synchronized (executed) {
-								if (executed[currentSplitIndex] == false) {
-									executed[currentSplitIndex] = true;
-								} else {
-									continue;
+							if (readSchedule[currentSplitIndex].equals(nodeName)) {
+								/**
+								 * pick an unread split to read
+								 * synchronize among simultaneous partitions in the same machine
+								 */
+								synchronized (executed) {
+									if (executed[currentSplitIndex] == false) {
+										executed[currentSplitIndex] = true;
+									} else {
+										continue;
+									}
 								}
-							}
 
-							/**
-							 * read the split
-							 */
-							reader = getRecordReader(currentSplitIndex);
-							key = reader.createKey();
-							value = reader.createValue();
-							fileName = ((FileSplit)(inputSplits[currentSplitIndex])).getPath().toUri().getPath().getBytes();
-							blockByteLocation = reader.getPos();
-							pendingValue = reader.next(key, value);
-							NextblockByteLocation = reader.getPos();
-							rowNumber = 1;
-							byteLocation = String.valueOf(blockByteLocation).getBytes("UTF-8");
-							rowNumberBytes = String.valueOf(rowNumber).getBytes("UTF-8");
-							return true;
-						}
-					}
-					return false;
-				}
-
-				@Override
-				public int read(byte[] buffer, int offset, int len) throws IOException {
-					if (reader == null) {
-						if (!moveToNext()) {
-							//nothing to read
-							return -1;
-						}
-					}
-
-					int numBytes = 0;
-					if (pendingValue) {
-						//last value didn't fit into buffer
-						// 1 for EOL
-						int sizeOfNextTuple = getTupleSize(value) + 1;
-						//fileName.length + byteLocation.length + rowNumberBytes.length;
-
-						//copy filename
-						System.arraycopy(fileName, 0, buffer, offset + numBytes, fileName.length);
-						buffer[offset + numBytes + fileName.length] = delimiter;
-						numBytes += fileName.length + 1;
-
-						//copy byte location
-						System.arraycopy(byteLocation, 0, buffer, offset + numBytes, byteLocation.length);
-						buffer[offset + numBytes + byteLocation.length] = delimiter;
-						numBytes += byteLocation.length + 1;
-
-						//copy row number
-						System.arraycopy(rowNumberBytes, 0, buffer, offset + numBytes, rowNumberBytes.length);
-						buffer[offset + numBytes + rowNumberBytes.length] = delimiter;
-						numBytes += rowNumberBytes.length + 1;
-
-						copyCurrentTuple(buffer, offset + numBytes);
-						buffer[offset + numBytes + sizeOfNextTuple - 1] = (byte) EOL;
-						numBytes += sizeOfNextTuple;
-						//set pending to false
-						pendingValue = false;
-					}
-
-					while (numBytes < len) {
-						hasMore = reader.next(key, value);
-						if (!hasMore) {
-							while (moveToNext()) {
-								hasMore = reader.next(key, value);
-								if (hasMore) {
-									//move to the next non-empty split
-									break;
-								}
+								/**
+								 * read the split
+								 */
+								reader = getRecordReader(currentSplitIndex);
+								key = reader.createKey();
+								value = reader.createValue();
+								fileName = ((FileSplit)(inputSplits[currentSplitIndex])).getPath().toUri().getPath().getBytes();
+								blockByteLocation = reader.getPos();
+								pendingValue = reader.next(key, value);
+								NextblockByteLocation = reader.getPos();
+								rowNumber = 1;
+								byteLocation = String.valueOf(blockByteLocation).getBytes("UTF-8");
+								rowNumberBytes = String.valueOf(rowNumber).getBytes("UTF-8");
+								return true;
 							}
 						}
-						if (!hasMore) {
-							return (numBytes == 0) ? -1 : numBytes;
+						return false;
+					}
+
+					@Override
+					public int read(byte[] buffer, int offset, int len) throws IOException {
+						if (reader == null) {
+							if (!moveToNext()) {
+								//nothing to read
+								return -1;
+							}
 						}
 
-						//check if moved to next block
-						blockByteLocation = reader.getPos();
-						if(blockByteLocation != NextblockByteLocation)
-						{
-							//moved to a new block, reset stuff
-							//row number
-							rowNumber = 1;
-							rowNumberBytes = String.valueOf(rowNumber).getBytes("UTF-8");
+						int numBytes = 0;
+						if (pendingValue) {
+							//last value didn't fit into buffer
+							// 1 for EOL
+							int sizeOfNextTuple = getTupleSize(value) + 1;
+							if (numBytes + sizeOfNextTuple +  rowNumberBytes.length + byteLocation.length + fileName.length + 3 > len) {
+								return 0;
+							}
 
-							//block location
-							byteLocation = String.valueOf(NextblockByteLocation).getBytes("UTF-8");
-							NextblockByteLocation = blockByteLocation;
-						}
-						else
-						{
-							rowNumber += 1;
-							rowNumberBytes = String.valueOf(rowNumber).getBytes("UTF-8");
-						}
-
-						int sizeOfNextTuple = getTupleSize(value) + 1;
-						if (numBytes + sizeOfNextTuple +  rowNumberBytes.length + byteLocation.length + fileName.length + 3 > len) {
-							// cannot add tuple to current buffer
-							// but the reader has moved pass the fetched tuple
-							// we need to store this for a subsequent read call.
-							// and return this then.
-							pendingValue = true;
-							break;
-						} else {
 							//copy filename
 							System.arraycopy(fileName, 0, buffer, offset + numBytes, fileName.length);
 							buffer[offset + numBytes + fileName.length] = delimiter;
@@ -274,140 +209,14 @@
 							copyCurrentTuple(buffer, offset + numBytes);
 							buffer[offset + numBytes + sizeOfNextTuple - 1] = (byte) EOL;
 							numBytes += sizeOfNextTuple;
-						}
-					}
-					return numBytes;
-				}
-
-				private void copyCurrentTuple(byte[] buffer, int offset) throws IOException {
-					int rcOffset = 0;
-					for(int i=0; i< value.size(); i++)
-					{
-						System.arraycopy(value.get(i).getData(), value.get(i).getStart(), buffer, offset + rcOffset, value.get(i).getLength());
-						rcOffset += value.get(i).getLength() + 1;
-						buffer[rcOffset - 1] = delimiter;
-					}
-				}
-
-				private int getTupleSize(BytesRefArrayWritable value2) {
-					int size=0;
-					//loop over rc column and add lengths
-					for(int i=0; i< value.size(); i++)
-					{
-						size += value.get(i).getLength();
-					}
-					//add delimeters bytes sizes
-					size += value.size() -1;
-					return size;
-				}
-
-				@Override
-				public int read() throws IOException {
-					throw new NotImplementedException("Use read(byte[], int, int");
-				}
-
-				private RecordReader getRecordReader(int slitIndex) throws IOException {
-					RCFileInputFormat format = (RCFileInputFormat) conf.getInputFormat();
-					RecordReader reader = format.getRecordReader(
-							(org.apache.hadoop.mapred.FileSplit) inputSplits[slitIndex], conf, getReporter());
-					return reader;
-				}
-
-			};
-		}
-		else
-		{
-			//get content format
-			if(configuration.get(KEY_FORMAT).equals(FORMAT_DELIMITED_TEXT))
-			{
-				LOGGER.info("Creating the indexing input stream with delimiter = "+ configuration.get(KEY_DELIMITER));
-				//reading data and RIDs for delimited text
-				return new InputStream() {
-
-					private RecordReader<Object, Text> reader;
-					private Object key;
-					private Text value;
-					private boolean hasMore = false;
-					private int EOL = "\n".getBytes()[0];
-					private Text pendingValue = null;
-					private int currentSplitIndex = 0;
-					private byte[] fileName;
-					private byte[] byteLocation;
-					private byte delimiter = ((String)configuration.get(KEY_DELIMITER)).getBytes()[0];
-					
-					@SuppressWarnings("unchecked")
-					private boolean moveToNext() throws IOException {
-						for (; currentSplitIndex < inputSplits.length; currentSplitIndex++) {
-							/**
-							 * read all the partitions scheduled to the current node
-							 */
-							if (readSchedule[currentSplitIndex].equals(nodeName)) {
-								/**
-								 * pick an unread split to read
-								 * synchronize among simultaneous partitions in the same machine
-								 */
-								synchronized (executed) {
-									if (executed[currentSplitIndex] == false) {
-										executed[currentSplitIndex] = true;
-									} else {
-										continue;
-									}
-								}
-
-								/**
-								 * read the split
-								 */
-								reader = getRecordReader(currentSplitIndex);
-								key = reader.createKey();
-								value = (Text) reader.createValue();
-								fileName = ((FileSplit)(inputSplits[currentSplitIndex])).getPath().toUri().getPath().getBytes();
-								return true;
-							}
-						}
-						return false;
-					}
-
-					@Override
-					public int read(byte[] buffer, int offset, int len) throws IOException {
-						if (reader == null) {
-							if (!moveToNext()) {
-								//nothing to read
-								return -1;
-							}
-						}
-
-						int numBytes = 0;
-						if (pendingValue != null) {
-							int sizeOfNextTuple = pendingValue.getLength() + 1;
-							if (numBytes + sizeOfNextTuple +byteLocation.length + fileName.length + 2> len)
-							{
-								return numBytes;
-							}
-							//copy filename
-							System.arraycopy(fileName, 0, buffer, offset + numBytes, fileName.length);
-							buffer[offset + numBytes + fileName.length] = delimiter;
-							numBytes += fileName.length + 1;
-
-							//copy byte location
-							System.arraycopy(byteLocation, 0, buffer, offset + numBytes, byteLocation.length);
-							buffer[offset + numBytes + byteLocation.length] = delimiter;
-							numBytes += byteLocation.length + 1;
-
-							//copy actual value
-							System.arraycopy(pendingValue.getBytes(), 0, buffer, offset + numBytes, pendingValue.getLength());
-							buffer[offset + numBytes + pendingValue.getLength()] = (byte) EOL;
-							numBytes += pendingValue.getLength() + 1;
-							pendingValue = null;
+							//set pending to false
+							pendingValue = false;
 						}
 
 						while (numBytes < len) {
-							//get reader position before you actually read
-							byteLocation = String.valueOf(reader.getPos()).getBytes();
 							hasMore = reader.next(key, value);
 							if (!hasMore) {
 								while (moveToNext()) {
-									//get reader position before you actually read
-									byteLocation = String.valueOf(reader.getPos()).getBytes("UTF-8");
 									hasMore = reader.next(key, value);
 									if (hasMore) {
 										//move to the next non-empty split
@@ -418,145 +227,198 @@
 							if (!hasMore) {
 								return (numBytes == 0) ? -1 : numBytes;
 							}
-							int sizeOfNextTuple = value.getLength() + 1;
-							if (numBytes + sizeOfNextTuple +byteLocation.length + fileName.length + 2> len) {
+
+							//check if moved to next block
+							blockByteLocation = reader.getPos();
+							if(blockByteLocation != NextblockByteLocation)
+							{
+								//moved to a new block, reset stuff
+								//row number
+								rowNumber = 1;
+								rowNumberBytes = String.valueOf(rowNumber).getBytes("UTF-8");
+
+								//block location
+								byteLocation = String.valueOf(NextblockByteLocation).getBytes("UTF-8");
+								NextblockByteLocation = blockByteLocation;
+							}
+							else
+							{
+								rowNumber += 1;
+								rowNumberBytes = String.valueOf(rowNumber).getBytes("UTF-8");
+							}
+
+							int sizeOfNextTuple = getTupleSize(value) + 1;
+							if (numBytes + sizeOfNextTuple +  rowNumberBytes.length + byteLocation.length + fileName.length + 3 > len) {
 								// cannot add tuple to current buffer
 								// but the reader has moved pass the fetched tuple
 								// we need to store this for a subsequent read call.
 								// and return this then.
-								pendingValue = value;
+								pendingValue = true;
 								break;
 							} else {
 								//copy filename
 								System.arraycopy(fileName, 0, buffer, offset + numBytes, fileName.length);
 								buffer[offset + numBytes + fileName.length] = delimiter;
 								numBytes += fileName.length + 1;
-								
+
 								//copy byte location
 								System.arraycopy(byteLocation, 0, buffer, offset + numBytes, byteLocation.length);
 								buffer[offset + numBytes + byteLocation.length] = delimiter;
 								numBytes += byteLocation.length + 1;
 
-								//Copy actual value
-								System.arraycopy(value.getBytes(), 0, buffer, offset + numBytes, value.getLength());
-								buffer[offset + numBytes + value.getLength()] = (byte) EOL;
+								//copy row number
+								System.arraycopy(rowNumberBytes, 0, buffer, offset + numBytes, rowNumberBytes.length);
+								buffer[offset + numBytes + rowNumberBytes.length] = delimiter;
+								numBytes += rowNumberBytes.length + 1;
+
+								copyCurrentTuple(buffer, offset + numBytes);
+								buffer[offset + numBytes + sizeOfNextTuple - 1] = (byte) EOL;
 								numBytes += sizeOfNextTuple;
 							}
 						}
 						return numBytes;
 					}
 
+					private void copyCurrentTuple(byte[] buffer, int offset) throws IOException {
+						int rcOffset = 0;
+						for(int i=0; i< value.size(); i++)
+						{
+							System.arraycopy(value.get(i).getData(), value.get(i).getStart(), buffer, offset + rcOffset, value.get(i).getLength());
+							rcOffset += value.get(i).getLength() + 1;
+							buffer[rcOffset - 1] = delimiter;
+						}
+					}
+
+					private int getTupleSize(BytesRefArrayWritable value2) {
+						int size=0;
+						//loop over rc column and add lengths
+						for(int i=0; i< value.size(); i++)
+						{
+							size += value.get(i).getLength();
+						}
+						//add delimeters bytes sizes
+						size += value.size() -1;
+						return size;
+					}
+
 					@Override
 					public int read() throws IOException {
 						throw new NotImplementedException("Use read(byte[], int, int");
 					}
 
 					private RecordReader getRecordReader(int slitIndex) throws IOException {
-						if (conf.getInputFormat() instanceof SequenceFileInputFormat) {
-							SequenceFileInputFormat format = (SequenceFileInputFormat) conf.getInputFormat();
-							RecordReader reader = format.getRecordReader(
-									(org.apache.hadoop.mapred.FileSplit) inputSplits[slitIndex], conf, getReporter());
-							return reader;
-						} else {
-							TextInputFormat format = (TextInputFormat) conf.getInputFormat();
-							RecordReader reader = format.getRecordReader(
-									(org.apache.hadoop.mapred.FileSplit) inputSplits[slitIndex], conf, getReporter());
-							return reader;
-						}
+						RCFileInputFormat format = (RCFileInputFormat) conf.getInputFormat();
+						RecordReader reader = format.getRecordReader(
+								(org.apache.hadoop.mapred.FileSplit) inputSplits[slitIndex], conf, getReporter());
+						return reader;
 					}
 
 				};
 			}
-			else if((configuration.get(KEY_FORMAT).equals(FORMAT_ADM)))
+			else
 			{
-				//reading data and RIDs for adm formatted data
-				return new InputStream() {
+				//get content format
+				if(configuration.get(KEY_FORMAT).equals(FORMAT_DELIMITED_TEXT))
+				{
+					//reading data and RIDs for delimited text
+					return new InputStream() {
 
-					private RecordReader<Object, Text> reader;
-					private Object key;
-					private Text value;
-					private boolean hasMore = false;
-					private int EOL = "\n".getBytes()[0];
-					private Text pendingValue = null;
-					private int currentSplitIndex = 0;
-					private byte[] fileName;
-					private byte[] byteLocation;
+						private RecordReader<Object, Text> reader;
+						private Object key;
+						private Text value;
+						private boolean hasMore = false;
+						private int EOL = "\n".getBytes()[0];
+						private Text pendingValue = null;
+						private int currentSplitIndex = 0;
+						private byte[] fileName;
+						private byte[] byteLocation;
+						private byte delimiter = ((String)configuration.get(KEY_DELIMITER)).getBytes()[0];
 
-					@SuppressWarnings("unchecked")
-					private boolean moveToNext() throws IOException {
-						for (; currentSplitIndex < inputSplits.length; currentSplitIndex++) {
-							/**
-							 * read all the partitions scheduled to the current node
-							 */
-							if (readSchedule[currentSplitIndex].equals(nodeName)) {
+						@SuppressWarnings("unchecked")
+						private boolean moveToNext() throws IOException {
+							for (; currentSplitIndex < inputSplits.length; currentSplitIndex++) {
 								/**
-								 * pick an unread split to read
-								 * synchronize among simultaneous partitions in the same machine
+								 * read all the partitions scheduled to the current node
 								 */
-								synchronized (executed) {
-									if (executed[currentSplitIndex] == false) {
-										executed[currentSplitIndex] = true;
-									} else {
-										continue;
+								if (readSchedule[currentSplitIndex].equals(nodeName)) {
+									/**
+									 * pick an unread split to read
+									 * synchronize among simultaneous partitions in the same machine
+									 */
+									synchronized (executed) {
+										if (executed[currentSplitIndex] == false) {
+											executed[currentSplitIndex] = true;
+										} else {
+											continue;
+										}
 									}
-								}
 
-								/**
-								 * read the split
-								 */
-								reader = getRecordReader(currentSplitIndex);
-								key = reader.createKey();
-								value = (Text) reader.createValue();
-								fileName = ((FileSplit)(inputSplits[currentSplitIndex])).getPath().toUri().getPath().getBytes();
-								return true;
-							}
-						}
-						return false;
-					}
-
-					@Override
-					public int read(byte[] buffer, int offset, int len) throws IOException {
-						if (reader == null) {
-							if (!moveToNext()) {
-								//nothing to read
-								return -1;
-							}
-						}
-
-						int numBytes = 0;
-						if (pendingValue != null) {
-							System.arraycopy(pendingValue.getBytes(), 0, buffer, offset + numBytes, pendingValue.getLength());
-							buffer[offset + numBytes + pendingValue.getLength()] = (byte) EOL;
-							numBytes += pendingValue.getLength() + 1;
-							pendingValue = null;
-						}
-
-						while (numBytes < len) {
-							//get reader position before you actually read
-							byteLocation = String.valueOf(reader.getPos()).getBytes("UTF-8");
-							hasMore = reader.next(key, value);
-							if (!hasMore) {
-								while (moveToNext()) {
-									//get reader position before you actually read
-									byteLocation = String.valueOf(reader.getPos()).getBytes("UTF-8");
-									hasMore = reader.next(key, value);
-									if (hasMore) {
-										//move to the next non-empty split
-										break;
-									}
+									/**
+									 * read the split
+									 */
+									reader = getRecordReader(currentSplitIndex);
+									key = reader.createKey();
+									value = (Text) reader.createValue();
+									fileName = ((FileSplit)(inputSplits[currentSplitIndex])).getPath().toUri().getPath().getBytes();
+									return true;
 								}
 							}
-							if (!hasMore) {
-								return (numBytes == 0) ? -1 : numBytes;
+							return false;
+						}
+
+						@Override
+						public int read(byte[] buffer, int offset, int len) throws IOException {
+							if (reader == null) {
+								if (!moveToNext()) {
+									//nothing to read
+									return -1;
+								}
 							}
-							//get the index of the first field name
-							int firstFieldLocation = value.find("\"");
-							int admValueSize = value.getLength();
-							if(firstFieldLocation >= 0)
-							{
-								int sizeOfNextTuple = value.getLength() - firstFieldLocation + 1;
-								int sizeOfNextTupleAndRID = fileNameFieldNameWithRecOpeningBraces.length + fileName.length + bytelocationFieldName.length  + byteLocation.length + bytelocationValueEnd.length + sizeOfNextTuple;
-								if (numBytes + sizeOfNextTupleAndRID > len) {
+
+							int numBytes = 0;
+							if (pendingValue != null) {
+								int sizeOfNextTuple = pendingValue.getLength() + 1;
+								if (numBytes + sizeOfNextTuple +byteLocation.length + fileName.length + 2> len)
+								{
+									return numBytes;
+								}
+								//copy filename
+								System.arraycopy(fileName, 0, buffer, offset + numBytes, fileName.length);
+								buffer[offset + numBytes + fileName.length] = delimiter;
+								numBytes += fileName.length + 1;
+
+								//copy byte location
+								System.arraycopy(byteLocation, 0, buffer, offset + numBytes, byteLocation.length);
+								buffer[offset + numBytes + byteLocation.length] = delimiter;
+								numBytes += byteLocation.length + 1;
+
+								//copy actual value
+								System.arraycopy(pendingValue.getBytes(), 0, buffer, offset + numBytes, pendingValue.getLength());
+								buffer[offset + numBytes + pendingValue.getLength()] = (byte) EOL;
+								numBytes += pendingValue.getLength() + 1;
+								pendingValue = null;
+							}
+
+							while (numBytes < len) {
+								//get reader position before you actually read
+								byteLocation = String.valueOf(reader.getPos()).getBytes();
+								hasMore = reader.next(key, value);
+								if (!hasMore) {
+									while (moveToNext()) {
+										//get reader position before you actually read
+										byteLocation = String.valueOf(reader.getPos()).getBytes("UTF-8");
+										hasMore = reader.next(key, value);
+										if (hasMore) {
+											//move to the next non-empty split
+											break;
+										}
+									}
+								}
+								if (!hasMore) {
+									return (numBytes == 0) ? -1 : numBytes;
+								}
+								int sizeOfNextTuple = value.getLength() + 1;
+								if (numBytes + sizeOfNextTuple +byteLocation.length + fileName.length + 2> len) {
 									// cannot add tuple to current buffer
 									// but the reader has moved pass the fetched tuple
 									// we need to store this for a subsequent read call.
@@ -564,58 +426,779 @@
 									pendingValue = value;
 									break;
 								} else {
-									//copy fileNameFieldNameWithRecOpeningBraces
-									System.arraycopy(fileNameFieldNameWithRecOpeningBraces, 0, buffer, offset + numBytes,fileNameFieldNameWithRecOpeningBraces.length);
-									numBytes += fileNameFieldNameWithRecOpeningBraces.length;
-									//copy fileName
-									System.arraycopy(fileName, 0, buffer, offset + numBytes,fileName.length);
-									numBytes += fileName.length;
-									//copy bytelocationFieldName
-									System.arraycopy(bytelocationFieldName, 0, buffer, offset + numBytes,bytelocationFieldName.length);
-									numBytes += bytelocationFieldName.length;
-									//copy byte location value
-									System.arraycopy(byteLocation, 0, buffer, offset + numBytes,byteLocation.length);
-									numBytes += byteLocation.length;
-									//copy byte location field end 
-									System.arraycopy(bytelocationValueEnd, 0, buffer, offset + numBytes,bytelocationValueEnd.length);
-									numBytes += bytelocationValueEnd.length;
-									//copy the actual adm instance
-									System.arraycopy(value.getBytes(), firstFieldLocation+1, buffer, offset + numBytes,admValueSize - firstFieldLocation - 1);
-									buffer[offset + numBytes + admValueSize - firstFieldLocation] = (byte) EOL;
-									numBytes += admValueSize - firstFieldLocation;
+									//copy filename
+									System.arraycopy(fileName, 0, buffer, offset + numBytes, fileName.length);
+									buffer[offset + numBytes + fileName.length] = delimiter;
+									numBytes += fileName.length + 1;
+
+									//copy byte location
+									System.arraycopy(byteLocation, 0, buffer, offset + numBytes, byteLocation.length);
+									buffer[offset + numBytes + byteLocation.length] = delimiter;
+									numBytes += byteLocation.length + 1;
+
+									//Copy actual value
+									System.arraycopy(value.getBytes(), 0, buffer, offset + numBytes, value.getLength());
+									buffer[offset + numBytes + value.getLength()] = (byte) EOL;
+									numBytes += sizeOfNextTuple;
 								}
 							}
+							return numBytes;
+						}
+
+						@Override
+						public int read() throws IOException {
+							throw new NotImplementedException("Use read(byte[], int, int");
+						}
+
+						private RecordReader getRecordReader(int slitIndex) throws IOException {
+							if (conf.getInputFormat() instanceof SequenceFileInputFormat) {
+								SequenceFileInputFormat format = (SequenceFileInputFormat) conf.getInputFormat();
+								RecordReader reader = format.getRecordReader(
+										(org.apache.hadoop.mapred.FileSplit) inputSplits[slitIndex], conf, getReporter());
+								return reader;
+							} else {
+								TextInputFormat format = (TextInputFormat) conf.getInputFormat();
+								RecordReader reader = format.getRecordReader(
+										(org.apache.hadoop.mapred.FileSplit) inputSplits[slitIndex], conf, getReporter());
+								return reader;
+							}
+						}
+
+					};
+				}
+				else if((configuration.get(KEY_FORMAT).equals(FORMAT_ADM)))
+				{
+					//reading data and RIDs for adm formatted data
+					return new InputStream() {
+
+						private RecordReader<Object, Text> reader;
+						private Object key;
+						private Text value;
+						private boolean hasMore = false;
+						private int EOL = "\n".getBytes()[0];
+						private Text pendingValue = null;
+						private int currentSplitIndex = 0;
+						private byte[] fileName;
+						private byte[] byteLocation;
+
+						@SuppressWarnings("unchecked")
+						private boolean moveToNext() throws IOException {
+							for (; currentSplitIndex < inputSplits.length; currentSplitIndex++) {
+								/**
+								 * read all the partitions scheduled to the current node
+								 */
+								if (readSchedule[currentSplitIndex].equals(nodeName)) {
+									/**
+									 * pick an unread split to read
+									 * synchronize among simultaneous partitions in the same machine
+									 */
+									synchronized (executed) {
+										if (executed[currentSplitIndex] == false) {
+											executed[currentSplitIndex] = true;
+										} else {
+											continue;
+										}
+									}
+
+									/**
+									 * read the split
+									 */
+									reader = getRecordReader(currentSplitIndex);
+									key = reader.createKey();
+									value = (Text) reader.createValue();
+									fileName = ((FileSplit)(inputSplits[currentSplitIndex])).getPath().toUri().getPath().getBytes();
+									return true;
+								}
+							}
+							return false;
+						}
+
+						@Override
+						public int read(byte[] buffer, int offset, int len) throws IOException {
+							if (reader == null) {
+								if (!moveToNext()) {
+									//nothing to read
+									return -1;
+								}
+							}
+
+							int numBytes = 0;
+							if (pendingValue != null) {
+								int firstFieldLocation = value.find("\"");
+								int admValueSize = value.getLength();
+								if(firstFieldLocation >= 0)
+								{
+									int sizeOfNextTuple = value.getLength() - firstFieldLocation + 1;
+									int sizeOfNextTupleAndRID = fileNameFieldNameWithRecOpeningBraces.length + fileName.length + fileNameFieldClosingQuotation.length + bytelocationFieldName.length  + byteLocation.length + bytelocationValueEnd.length + sizeOfNextTuple;
+									if (numBytes + sizeOfNextTupleAndRID > len) {
+										// still cannot add tuple to current buffer
+										// return 0 so parser would double the buffer size.
+										return 0;
+									} else {
+										//copy fileNameFieldNameWithRecOpeningBraces
+										System.arraycopy(fileNameFieldNameWithRecOpeningBraces, 0, buffer, offset + numBytes,fileNameFieldNameWithRecOpeningBraces.length);
+										numBytes += fileNameFieldNameWithRecOpeningBraces.length;
+										//copy fileName
+										System.arraycopy(fileName, 0, buffer, offset + numBytes,fileName.length);
+										numBytes += fileName.length;
+										//copy fileName closing quotation
+										System.arraycopy(fileNameFieldClosingQuotation, 0, buffer, offset + numBytes,fileNameFieldClosingQuotation.length);
+										numBytes += fileNameFieldClosingQuotation.length;
+										//copy bytelocationFieldName
+										System.arraycopy(bytelocationFieldName, 0, buffer, offset + numBytes,bytelocationFieldName.length);
+										numBytes += bytelocationFieldName.length;
+										//copy byte location value
+										System.arraycopy(byteLocation, 0, buffer, offset + numBytes,byteLocation.length);
+										numBytes += byteLocation.length;
+										//copy byte location field end 
+										System.arraycopy(bytelocationValueEnd, 0, buffer, offset + numBytes,bytelocationValueEnd.length);
+										numBytes += bytelocationValueEnd.length;
+										//copy the actual adm instance
+										System.arraycopy(value.getBytes(), firstFieldLocation, buffer, offset + numBytes,admValueSize - firstFieldLocation);
+										buffer[offset + numBytes + admValueSize - firstFieldLocation] = (byte) EOL;
+										numBytes += admValueSize - firstFieldLocation +1;
+									}
+								}
+								pendingValue = null;
+							}
+
+							while (numBytes < len) {
+								//get reader position before you actually read
+								byteLocation = String.valueOf(reader.getPos()).getBytes("UTF-8");
+								hasMore = reader.next(key, value);
+								if (!hasMore) {
+									while (moveToNext()) {
+										//get reader position before you actually read
+										byteLocation = String.valueOf(reader.getPos()).getBytes("UTF-8");
+										hasMore = reader.next(key, value);
+										if (hasMore) {
+											//move to the next non-empty split
+											break;
+										}
+									}
+								}
+								if (!hasMore) {
+									return (numBytes == 0) ? -1 : numBytes;
+								}
+								//get the index of the first field name
+								int firstFieldLocation = value.find("\"");
+								int admValueSize = value.getLength();
+								if(firstFieldLocation >= 0)
+								{
+									int sizeOfNextTuple = value.getLength() - firstFieldLocation + 1;
+									int sizeOfNextTupleAndRID = fileNameFieldNameWithRecOpeningBraces.length + fileName.length + fileNameFieldClosingQuotation.length + bytelocationFieldName.length  + byteLocation.length + bytelocationValueEnd.length + sizeOfNextTuple;
+									if (numBytes + sizeOfNextTupleAndRID > len) {
+										// cannot add tuple to current buffer
+										// but the reader has moved pass the fetched tuple
+										// we need to store this for a subsequent read call.
+										// and return this then.
+										pendingValue = value;
+										break;
+									} else {
+										//copy fileNameFieldNameWithRecOpeningBraces
+										System.arraycopy(fileNameFieldNameWithRecOpeningBraces, 0, buffer, offset + numBytes,fileNameFieldNameWithRecOpeningBraces.length);
+										numBytes += fileNameFieldNameWithRecOpeningBraces.length;
+										//copy fileName
+										System.arraycopy(fileName, 0, buffer, offset + numBytes,fileName.length);
+										numBytes += fileName.length;
+										//copy fileName closing quotation
+										System.arraycopy(fileNameFieldClosingQuotation, 0, buffer, offset + numBytes,fileNameFieldClosingQuotation.length);
+										numBytes += fileNameFieldClosingQuotation.length;
+										//copy bytelocationFieldName
+										System.arraycopy(bytelocationFieldName, 0, buffer, offset + numBytes,bytelocationFieldName.length);
+										numBytes += bytelocationFieldName.length;
+										//copy byte location value
+										System.arraycopy(byteLocation, 0, buffer, offset + numBytes,byteLocation.length);
+										numBytes += byteLocation.length;
+										//copy byte location field end 
+										System.arraycopy(bytelocationValueEnd, 0, buffer, offset + numBytes,bytelocationValueEnd.length);
+										numBytes += bytelocationValueEnd.length;
+										//copy the actual adm instance
+										System.arraycopy(value.getBytes(), firstFieldLocation, buffer, offset + numBytes,admValueSize - firstFieldLocation);
+										buffer[offset + numBytes + admValueSize - firstFieldLocation] = (byte) EOL;
+										numBytes += admValueSize - firstFieldLocation +1;
+									}
+								}
+							}
+							return numBytes;
+						}
+
+						@Override
+						public int read() throws IOException {
+							throw new NotImplementedException("Use read(byte[], int, int");
+						}
+
+						private RecordReader getRecordReader(int slitIndex) throws IOException {
+							if (conf.getInputFormat() instanceof SequenceFileInputFormat) {
+								SequenceFileInputFormat format = (SequenceFileInputFormat) conf.getInputFormat();
+								RecordReader reader = format.getRecordReader(
+										(org.apache.hadoop.mapred.FileSplit) inputSplits[slitIndex], conf, getReporter());
+								return reader;
+							} else {
+								TextInputFormat format = (TextInputFormat) conf.getInputFormat();
+								RecordReader reader = format.getRecordReader(
+										(org.apache.hadoop.mapred.FileSplit) inputSplits[slitIndex], conf, getReporter());
+								return reader;
+							}
+						}
+
+					};
+				}
+				else
+				{
+					throw new IOException("Can't index " +configuration.get(KEY_FORMAT)+" input");
+				}
+			}
+		}
+		else
+		{
+			if(conf.getInputFormat() instanceof RCFileInputFormat)
+			{
+				//indexing rc input format
+				return new InputStream() {
+
+					private RecordReader<LongWritable, BytesRefArrayWritable> reader;
+					private LongWritable key;
+					private BytesRefArrayWritable value;
+					private boolean hasMore = false;
+					private int EOL = "\n".getBytes()[0];
+					private byte delimiter = 0x01;
+					private boolean pendingValue = false;
+					private int currentSplitIndex = 0;
+					private byte[] fileNumber;
+					private byte[] byteLocation;
+					private byte[] rowNumberBytes;
+					private Integer file;
+					private long blockByteLocation;
+					private long NextblockByteLocation;
+					private int rowNumber;
+
+					@SuppressWarnings("unchecked")
+					private boolean moveToNext() throws IOException {
+						for (; currentSplitIndex < inputSplits.length; currentSplitIndex++) {
+							/**
+							 * read all the partitions scheduled to the current node
+							 */
+							if (readSchedule[currentSplitIndex].equals(nodeName)) {
+								/**
+								 * pick an unread split to read
+								 * synchronize among simultaneous partitions in the same machine
+								 */
+								synchronized (executed) {
+									if (executed[currentSplitIndex] == false) {
+										executed[currentSplitIndex] = true;
+									} else {
+										continue;
+									}
+								}
+
+								/**
+								 * read the split
+								 */
+								reader = getRecordReader(currentSplitIndex);
+								key = reader.createKey();
+								value = reader.createValue();
+								//getting the file number
+								file = files.get(((FileSplit)(inputSplits[currentSplitIndex])).getPath().toUri().getPath());
+								if(file == null)
+								{
+									throw new HyracksException("a file was not found in the map while indexing");
+								}
+								fileNumber = String.valueOf(file).getBytes("UTF-8");
+								blockByteLocation = reader.getPos();
+								pendingValue = reader.next(key, value);
+								NextblockByteLocation = reader.getPos();
+								rowNumber = 1;
+								byteLocation = String.valueOf(blockByteLocation).getBytes("UTF-8");
+								rowNumberBytes = String.valueOf(rowNumber).getBytes("UTF-8");
+								return true;
+							}
+						}
+						return false;
+					}
+
+					@Override
+					public int read(byte[] buffer, int offset, int len) throws IOException {
+						if (reader == null) {
+							if (!moveToNext()) {
+								//nothing to read
+								return -1;
+							}
+						}
+
+						int numBytes = 0;
+						if (pendingValue) {
+							//last value didn't fit into buffer
+							// 1 for EOL
+							int sizeOfNextTuple = getTupleSize(value) + 1;
+							if (numBytes + sizeOfNextTuple +  rowNumberBytes.length + byteLocation.length + fileNumber.length + 3 > len) {
+								return 0;
+							}
+							//copy file number
+							System.arraycopy(fileNumber, 0, buffer, offset + numBytes, fileNumber.length);
+							buffer[offset + numBytes + fileNumber.length] = delimiter;
+							numBytes += fileNumber.length + 1;
+
+							//copy byte location
+							System.arraycopy(byteLocation, 0, buffer, offset + numBytes, byteLocation.length);
+							buffer[offset + numBytes + byteLocation.length] = delimiter;
+							numBytes += byteLocation.length + 1;
+
+							//copy row number
+							System.arraycopy(rowNumberBytes, 0, buffer, offset + numBytes, rowNumberBytes.length);
+							buffer[offset + numBytes + rowNumberBytes.length] = delimiter;
+							numBytes += rowNumberBytes.length + 1;
+
+							copyCurrentTuple(buffer, offset + numBytes);
+							buffer[offset + numBytes + sizeOfNextTuple - 1] = (byte) EOL;
+							numBytes += sizeOfNextTuple;
+							//set pending to false
+							pendingValue = false;
+						}
+
+						while (numBytes < len) {
+							hasMore = reader.next(key, value);
+							if (!hasMore) {
+								while (moveToNext()) {
+									hasMore = reader.next(key, value);
+									if (hasMore) {
+										//move to the next non-empty split
+										break;
+									}
+								}
+							}
+							if (!hasMore) {
+								return (numBytes == 0) ? -1 : numBytes;
+							}
+
+							//check if moved to next block
+							blockByteLocation = reader.getPos();
+							if(blockByteLocation != NextblockByteLocation)
+							{
+								//moved to a new block, reset stuff
+								//row number
+								rowNumber = 1;
+								rowNumberBytes = String.valueOf(rowNumber).getBytes("UTF-8");
+
+								//block location
+								byteLocation = String.valueOf(NextblockByteLocation).getBytes("UTF-8");
+								NextblockByteLocation = blockByteLocation;
+							}
+							else
+							{
+								rowNumber += 1;
+								rowNumberBytes = String.valueOf(rowNumber).getBytes("UTF-8");
+							}
+
+							int sizeOfNextTuple = getTupleSize(value) + 1;
+							if (numBytes + sizeOfNextTuple +  rowNumberBytes.length + byteLocation.length + fileNumber.length + 3 > len) {
+								// cannot add tuple to current buffer
+								// but the reader has moved pass the fetched tuple
+								// we need to store this for a subsequent read call.
+								// and return this then.
+								pendingValue = true;
+								break;
+							} else {
+								//copy file number
+								System.arraycopy(fileNumber, 0, buffer, offset + numBytes, fileNumber.length);
+								buffer[offset + numBytes + fileNumber.length] = delimiter;
+								numBytes += fileNumber.length + 1;
+
+								//copy byte location
+								System.arraycopy(byteLocation, 0, buffer, offset + numBytes, byteLocation.length);
+								buffer[offset + numBytes + byteLocation.length] = delimiter;
+								numBytes += byteLocation.length + 1;
+
+								//copy row number
+								System.arraycopy(rowNumberBytes, 0, buffer, offset + numBytes, rowNumberBytes.length);
+								buffer[offset + numBytes + rowNumberBytes.length] = delimiter;
+								numBytes += rowNumberBytes.length + 1;
+
+								copyCurrentTuple(buffer, offset + numBytes);
+								buffer[offset + numBytes + sizeOfNextTuple - 1] = (byte) EOL;
+								numBytes += sizeOfNextTuple;
+							}
 						}
 						return numBytes;
 					}
 
+					private void copyCurrentTuple(byte[] buffer, int offset) throws IOException {
+						int rcOffset = 0;
+						for(int i=0; i< value.size(); i++)
+						{
+							System.arraycopy(value.get(i).getData(), value.get(i).getStart(), buffer, offset + rcOffset, value.get(i).getLength());
+							rcOffset += value.get(i).getLength() + 1;
+							buffer[rcOffset - 1] = delimiter;
+						}
+					}
+
+					private int getTupleSize(BytesRefArrayWritable value2) {
+						int size=0;
+						//loop over rc column and add lengths
+						for(int i=0; i< value.size(); i++)
+						{
+							size += value.get(i).getLength();
+						}
+						//add delimeters bytes sizes
+						size += value.size() -1;
+						return size;
+					}
+
 					@Override
 					public int read() throws IOException {
 						throw new NotImplementedException("Use read(byte[], int, int");
 					}
 
 					private RecordReader getRecordReader(int slitIndex) throws IOException {
-						if (conf.getInputFormat() instanceof SequenceFileInputFormat) {
-							SequenceFileInputFormat format = (SequenceFileInputFormat) conf.getInputFormat();
-							RecordReader reader = format.getRecordReader(
-									(org.apache.hadoop.mapred.FileSplit) inputSplits[slitIndex], conf, getReporter());
-							return reader;
-						} else {
-							TextInputFormat format = (TextInputFormat) conf.getInputFormat();
-							RecordReader reader = format.getRecordReader(
-									(org.apache.hadoop.mapred.FileSplit) inputSplits[slitIndex], conf, getReporter());
-							return reader;
-						}
+						RCFileInputFormat format = (RCFileInputFormat) conf.getInputFormat();
+						RecordReader reader = format.getRecordReader(
+								(org.apache.hadoop.mapred.FileSplit) inputSplits[slitIndex], conf, getReporter());
+						return reader;
 					}
 
 				};
 			}
 			else
 			{
-				throw new IOException("Can't index " +configuration.get(KEY_FORMAT)+" input");
+				//get content format
+				if(configuration.get(KEY_FORMAT).equals(FORMAT_DELIMITED_TEXT))
+				{
+					//reading data and RIDs for delimited text
+					return new InputStream() {
+
+						private RecordReader<Object, Text> reader;
+						private Object key;
+						private Text value;
+						private boolean hasMore = false;
+						private int EOL = "\n".getBytes()[0];
+						private Text pendingValue = null;
+						private int currentSplitIndex = 0;
+						private Integer file;
+						private byte[] fileNumber;
+						private byte[] byteLocation;
+						private byte delimiter = ((String)configuration.get(KEY_DELIMITER)).getBytes()[0];
+
+						@SuppressWarnings("unchecked")
+						private boolean moveToNext() throws IOException {
+							for (; currentSplitIndex < inputSplits.length; currentSplitIndex++) {
+								/**
+								 * read all the partitions scheduled to the current node
+								 */
+								if (readSchedule[currentSplitIndex].equals(nodeName)) {
+									/**
+									 * pick an unread split to read
+									 * synchronize among simultaneous partitions in the same machine
+									 */
+									synchronized (executed) {
+										if (executed[currentSplitIndex] == false) {
+											executed[currentSplitIndex] = true;
+										} else {
+											continue;
+										}
+									}
+
+									/**
+									 * read the split
+									 */
+									reader = getRecordReader(currentSplitIndex);
+									key = reader.createKey();
+									value = (Text) reader.createValue();
+									file = files.get(((FileSplit)(inputSplits[currentSplitIndex])).getPath().toUri().getPath());
+									if(file == null)
+									{
+										throw new HyracksException("The file:"+((FileSplit)(inputSplits[currentSplitIndex])).getPath().toUri().getPath()+" was not found in the map while indexing");
+									}
+									fileNumber = String.valueOf(file).getBytes("UTF-8");
+									return true;
+								}
+							}
+							return false;
+						}
+
+						@Override
+						public int read(byte[] buffer, int offset, int len) throws IOException {
+							if (reader == null) {
+								if (!moveToNext()) {
+									//nothing to read
+									return -1;
+								}
+							}
+
+							int numBytes = 0;
+							if (pendingValue != null) {
+								int sizeOfNextTuple = pendingValue.getLength() + 1;
+								if (numBytes + sizeOfNextTuple +byteLocation.length + fileNumber.length + 2> len)
+								{
+									return numBytes;
+								}
+								//copy file number
+								System.arraycopy(fileNumber, 0, buffer, offset + numBytes, fileNumber.length);
+								buffer[offset + numBytes + fileNumber.length] = delimiter;
+								numBytes += fileNumber.length + 1;
+
+								//copy byte location
+								System.arraycopy(byteLocation, 0, buffer, offset + numBytes, byteLocation.length);
+								buffer[offset + numBytes + byteLocation.length] = delimiter;
+								numBytes += byteLocation.length + 1;
+
+								//copy actual value
+								System.arraycopy(pendingValue.getBytes(), 0, buffer, offset + numBytes, pendingValue.getLength());
+								buffer[offset + numBytes + pendingValue.getLength()] = (byte) EOL;
+								numBytes += pendingValue.getLength() + 1;
+								pendingValue = null;
+							}
+
+							while (numBytes < len) {
+								//get reader position before you actually read
+								byteLocation = String.valueOf(reader.getPos()).getBytes();
+								hasMore = reader.next(key, value);
+								if (!hasMore) {
+									while (moveToNext()) {
+										//get reader position before you actually read
+										byteLocation = String.valueOf(reader.getPos()).getBytes("UTF-8");
+										hasMore = reader.next(key, value);
+										if (hasMore) {
+											//move to the next non-empty split
+											break;
+										}
+									}
+								}
+								if (!hasMore) {
+									return (numBytes == 0) ? -1 : numBytes;
+								}
+								int sizeOfNextTuple = value.getLength() + 1;
+								if (numBytes + sizeOfNextTuple +byteLocation.length + fileNumber.length + 2> len) {
+									// cannot add tuple to current buffer
+									// but the reader has moved pass the fetched tuple
+									// we need to store this for a subsequent read call.
+									// and return this then.
+									pendingValue = value;
+									break;
+								} else {
+									//copy file number
+									System.arraycopy(fileNumber, 0, buffer, offset + numBytes, fileNumber.length);
+									buffer[offset + numBytes + fileNumber.length] = delimiter;
+									numBytes += fileNumber.length + 1;
+
+									//copy byte location
+									System.arraycopy(byteLocation, 0, buffer, offset + numBytes, byteLocation.length);
+									buffer[offset + numBytes + byteLocation.length] = delimiter;
+									numBytes += byteLocation.length + 1;
+
+									//Copy actual value
+									System.arraycopy(value.getBytes(), 0, buffer, offset + numBytes, value.getLength());
+									buffer[offset + numBytes + value.getLength()] = (byte) EOL;
+									numBytes += sizeOfNextTuple;
+								}
+							}
+							return numBytes;
+						}
+
+						@Override
+						public int read() throws IOException {
+							throw new NotImplementedException("Use read(byte[], int, int");
+						}
+
+						private RecordReader getRecordReader(int slitIndex) throws IOException {
+							if (conf.getInputFormat() instanceof SequenceFileInputFormat) {
+								SequenceFileInputFormat format = (SequenceFileInputFormat) conf.getInputFormat();
+								RecordReader reader = format.getRecordReader(
+										(org.apache.hadoop.mapred.FileSplit) inputSplits[slitIndex], conf, getReporter());
+								return reader;
+							} else {
+								TextInputFormat format = (TextInputFormat) conf.getInputFormat();
+								RecordReader reader = format.getRecordReader(
+										(org.apache.hadoop.mapred.FileSplit) inputSplits[slitIndex], conf, getReporter());
+								return reader;
+							}
+						}
+
+					};
+				}
+				else if((configuration.get(KEY_FORMAT).equals(FORMAT_ADM)))
+				{
+					//reading data and RIDs for adm formatted data
+					return new InputStream() {
+
+						private RecordReader<Object, Text> reader;
+						private Object key;
+						private Text value;
+						private boolean hasMore = false;
+						private int EOL = "\n".getBytes()[0];
+						private Text pendingValue = null;
+						private int currentSplitIndex = 0;
+						private Integer file;
+						private byte[] fileNumber;
+						private byte[] byteLocation;
+
+						@SuppressWarnings("unchecked")
+						private boolean moveToNext() throws IOException {
+							for (; currentSplitIndex < inputSplits.length; currentSplitIndex++) {
+								/**
+								 * read all the partitions scheduled to the current node
+								 */
+								if (readSchedule[currentSplitIndex].equals(nodeName)) {
+									/**
+									 * pick an unread split to read
+									 * synchronize among simultaneous partitions in the same machine
+									 */
+									synchronized (executed) {
+										if (executed[currentSplitIndex] == false) {
+											executed[currentSplitIndex] = true;
+										} else {
+											continue;
+										}
+									}
+
+									/**
+									 * read the split
+									 */
+									reader = getRecordReader(currentSplitIndex);
+									key = reader.createKey();
+									value = (Text) reader.createValue();
+									file = files.get(((FileSplit)(inputSplits[currentSplitIndex])).getPath().toUri().getPath());
+									if(file == null)
+									{
+										throw new HyracksException("a file was not found in the map while indexing");
+									}
+									fileNumber = String.valueOf(file).getBytes("UTF-8");
+									return true;
+								}
+							}
+							return false;
+						}
+
+						@Override
+						public int read(byte[] buffer, int offset, int len) throws IOException {
+							if (reader == null) {
+								if (!moveToNext()) {
+									//nothing to read
+									return -1;
+								}
+							}
+
+							int numBytes = 0;
+							if (pendingValue != null) {
+								int firstFieldLocation = value.find("\"");
+								int admValueSize = value.getLength();
+								if(firstFieldLocation >= 0)
+								{
+									int sizeOfNextTuple = value.getLength() - firstFieldLocation + 1;
+									int sizeOfNextTupleAndRID = fileNumberFieldNameWithRecOpeningBraces.length + fileNumber.length + bytelocationFieldName.length  + byteLocation.length + bytelocationValueEnd.length + sizeOfNextTuple;
+									if (numBytes + sizeOfNextTupleAndRID > len) {
+										// still cannot add tuple to current buffer
+										// return 0 so parser would double the buffer size.
+										return 0;
+									} else {
+										//copy fileNumberFieldNameWithRecOpeningBraces
+										System.arraycopy(fileNumberFieldNameWithRecOpeningBraces, 0, buffer, offset + numBytes,fileNumberFieldNameWithRecOpeningBraces.length);
+										numBytes += fileNumberFieldNameWithRecOpeningBraces.length;
+										//copy file Number
+										System.arraycopy(fileNumber, 0, buffer, offset + numBytes,fileNumber.length);
+										numBytes += fileNumber.length;
+										//copy bytelocationFieldName
+										System.arraycopy(bytelocationFieldName, 0, buffer, offset + numBytes,bytelocationFieldName.length);
+										numBytes += bytelocationFieldName.length;
+										//copy byte location value
+										System.arraycopy(byteLocation, 0, buffer, offset + numBytes,byteLocation.length);
+										numBytes += byteLocation.length;
+										//copy byte location field end 
+										System.arraycopy(bytelocationValueEnd, 0, buffer, offset + numBytes,bytelocationValueEnd.length);
+										numBytes += bytelocationValueEnd.length;
+										//copy the actual adm instance
+										System.arraycopy(value.getBytes(), firstFieldLocation, buffer, offset + numBytes,admValueSize - firstFieldLocation);
+										buffer[offset + numBytes + admValueSize - firstFieldLocation] = (byte) EOL;
+										numBytes += admValueSize - firstFieldLocation +1;
+									}
+								}
+								pendingValue = null;
+							}
+
+							while (numBytes < len) {
+								//get reader position before you actually read
+								byteLocation = String.valueOf(reader.getPos()).getBytes("UTF-8");
+								hasMore = reader.next(key, value);
+								if (!hasMore) {
+									while (moveToNext()) {
+										//get reader position before you actually read
+										byteLocation = String.valueOf(reader.getPos()).getBytes("UTF-8");
+										hasMore = reader.next(key, value);
+										if (hasMore) {
+											//move to the next non-empty split
+											break;
+										}
+									}
+								}
+								if (!hasMore) {
+									return (numBytes == 0) ? -1 : numBytes;
+								}
+								//get the index of the first field name
+								int firstFieldLocation = value.find("\"");
+								int admValueSize = value.getLength();
+								if(firstFieldLocation >= 0)
+								{
+									int sizeOfNextTuple = value.getLength() - firstFieldLocation + 1;
+									int sizeOfNextTupleAndRID = fileNumberFieldNameWithRecOpeningBraces.length + fileNumber.length + bytelocationFieldName.length  + byteLocation.length + bytelocationValueEnd.length + sizeOfNextTuple;
+									if (numBytes + sizeOfNextTupleAndRID > len) {
+										// cannot add tuple to current buffer
+										// but the reader has moved pass the fetched tuple
+										// we need to store this for a subsequent read call.
+										// and return this then.
+										pendingValue = value;
+										break;
+									} else {
+										//copy fileNumberFieldNameWithRecOpeningBraces
+										System.arraycopy(fileNumberFieldNameWithRecOpeningBraces, 0, buffer, offset + numBytes,fileNumberFieldNameWithRecOpeningBraces.length);
+										numBytes += fileNumberFieldNameWithRecOpeningBraces.length;
+										//copy fileNumber
+										System.arraycopy(fileNumber, 0, buffer, offset + numBytes,fileNumber.length);
+										numBytes += fileNumber.length;
+										//copy bytelocationFieldName
+										System.arraycopy(bytelocationFieldName, 0, buffer, offset + numBytes,bytelocationFieldName.length);
+										numBytes += bytelocationFieldName.length;
+										//copy byte location value
+										System.arraycopy(byteLocation, 0, buffer, offset + numBytes,byteLocation.length);
+										numBytes += byteLocation.length;
+										//copy byte location field end 
+										System.arraycopy(bytelocationValueEnd, 0, buffer, offset + numBytes,bytelocationValueEnd.length);
+										numBytes += bytelocationValueEnd.length;
+										//copy the actual adm instance
+										System.arraycopy(value.getBytes(), firstFieldLocation, buffer, offset + numBytes,admValueSize - firstFieldLocation);
+										buffer[offset + numBytes + admValueSize - firstFieldLocation] = (byte) EOL;
+										numBytes += admValueSize - firstFieldLocation +1;
+									}
+								}
+							}
+							return numBytes;
+						}
+
+						@Override
+						public int read() throws IOException {
+							throw new NotImplementedException("Use read(byte[], int, int");
+						}
+
+						private RecordReader getRecordReader(int slitIndex) throws IOException {
+							if (conf.getInputFormat() instanceof SequenceFileInputFormat) {
+								SequenceFileInputFormat format = (SequenceFileInputFormat) conf.getInputFormat();
+								RecordReader reader = format.getRecordReader(
+										(org.apache.hadoop.mapred.FileSplit) inputSplits[slitIndex], conf, getReporter());
+								return reader;
+							} else {
+								TextInputFormat format = (TextInputFormat) conf.getInputFormat();
+								RecordReader reader = format.getRecordReader(
+										(org.apache.hadoop.mapred.FileSplit) inputSplits[slitIndex], conf, getReporter());
+								return reader;
+							}
+						}
+
+					};
+				}
+				else
+				{
+					throw new IOException("Can't index " +configuration.get(KEY_FORMAT)+" input");
+				}
 			}
 		}
-
 	}
 
 	@Override
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HiveIndexingAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HiveIndexingAdapter.java
index 552e5ab..178b106 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HiveIndexingAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HiveIndexingAdapter.java
@@ -28,8 +28,8 @@
     private HDFSIndexingAdapter hdfsIndexingAdapter;
 
     public HiveIndexingAdapter(IAType atype, String[] readSchedule, boolean[] executed, InputSplit[] inputSplits, JobConf conf,
-            AlgebricksPartitionConstraint clusterLocations) {
-        this.hdfsIndexingAdapter = new HDFSIndexingAdapter(atype, readSchedule, executed, inputSplits, conf, clusterLocations);
+            AlgebricksPartitionConstraint clusterLocations, Map<String,Integer> files) {
+        this.hdfsIndexingAdapter = new HDFSIndexingAdapter(atype, readSchedule, executed, inputSplits, conf, clusterLocations, files);
         this.atype = atype;
     }
 
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/util/ExternalDataFilesMetadataProvider.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/util/ExternalDataFilesMetadataProvider.java
new file mode 100644
index 0000000..47550a4
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/util/ExternalDataFilesMetadataProvider.java
@@ -0,0 +1,38 @@
+package edu.uci.ics.asterix.external.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import edu.uci.ics.asterix.external.adapter.factory.HDFSAdapterFactory;
+import edu.uci.ics.asterix.external.dataset.adapter.AbstractDatasourceAdapter;
+import edu.uci.ics.asterix.external.dataset.adapter.HDFSAdapter;
+
+public class ExternalDataFilesMetadataProvider {
+	public static ArrayList<FileStatus> getHDFSFileStatus(AbstractDatasourceAdapter adapter) throws IOException
+	{
+		ArrayList<FileStatus> files = new ArrayList<FileStatus>();
+		//Configure hadoop connection
+		Configuration conf = HDFSAdapterFactory.configureHadoopConnection(adapter.getConfiguration());
+		FileSystem fs = FileSystem.get(conf);
+		//get the list of paths from the adapter
+		StringTokenizer tokenizer = new StringTokenizer(((String)adapter.getConfiguration().get(HDFSAdapter.KEY_PATH)),",");
+		Path inputPath = null;
+		FileStatus[] fileStatuses;
+		while(tokenizer.hasMoreTokens())
+		{
+			inputPath = new Path(tokenizer.nextToken().trim());
+			fileStatuses = fs.listStatus(inputPath);
+			for(int i=0; i < fileStatuses.length; i++)
+			{
+				files.add(fileStatuses[i]);
+			}
+		}
+		return files;
+	}
+}
\ No newline at end of file
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java
index 8f0eedb..2de5d78 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java
@@ -31,6 +31,7 @@
 import edu.uci.ics.asterix.metadata.entities.DatasourceAdapter;
 import edu.uci.ics.asterix.metadata.entities.Datatype;
 import edu.uci.ics.asterix.metadata.entities.Dataverse;
+import edu.uci.ics.asterix.metadata.entities.ExternalFile;
 import edu.uci.ics.asterix.metadata.entities.Function;
 import edu.uci.ics.asterix.metadata.entities.Index;
 import edu.uci.ics.asterix.metadata.entities.Node;
@@ -268,6 +269,39 @@
         }
         return dataset;
     }
+    
+    @Override
+   	public List<ExternalFile> getDatasetExternalFiles(
+   			MetadataTransactionContext mdTxnCtx, Dataset dataset)
+   			throws MetadataException {
+       	List<ExternalFile> externalFiles;
+           try {
+           	externalFiles = metadataNode.getExternalDatasetFiles(mdTxnCtx.getJobId(), dataset);
+           } catch (RemoteException e) {
+               throw new MetadataException(e);
+           }
+           return externalFiles;
+   	}
+    
+    @Override
+	public void addExternalFile(MetadataTransactionContext mdTxnCtx,
+			ExternalFile externalFile) throws MetadataException {
+    	try {
+            metadataNode.addExternalDatasetFile(mdTxnCtx.getJobId(), externalFile);
+        } catch (RemoteException e) {
+            throw new MetadataException(e);
+        }
+	}
+    
+    @Override
+	public void dropExternalFile(MetadataTransactionContext mdTxnCtx,
+			ExternalFile externalFile) throws MetadataException {
+        try {
+            metadataNode.dropExternalFile(mdTxnCtx.getJobId(), externalFile.getDataverseName(), externalFile.getDatasetName(), externalFile.getFileNumber());
+        } catch (RemoteException e) {
+            throw new MetadataException(e);
+        }
+	}
 
     @Override
     public List<Index> getDatasetIndexes(MetadataTransactionContext ctx, String dataverseName, String datasetName)
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
index 14a0695..d9fed5b 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
@@ -43,6 +43,7 @@
 import edu.uci.ics.asterix.metadata.entities.Datatype;
 import edu.uci.ics.asterix.metadata.entities.Dataverse;
 import edu.uci.ics.asterix.metadata.entities.ExternalDatasetDetails;
+import edu.uci.ics.asterix.metadata.entities.ExternalFile;
 import edu.uci.ics.asterix.metadata.entities.Function;
 import edu.uci.ics.asterix.metadata.entities.Index;
 import edu.uci.ics.asterix.metadata.entities.InternalDatasetDetails;
@@ -52,6 +53,7 @@
 import edu.uci.ics.asterix.metadata.entitytupletranslators.DatasourceAdapterTupleTranslator;
 import edu.uci.ics.asterix.metadata.entitytupletranslators.DatatypeTupleTranslator;
 import edu.uci.ics.asterix.metadata.entitytupletranslators.DataverseTupleTranslator;
+import edu.uci.ics.asterix.metadata.entitytupletranslators.ExternalFileTupleTranslator;
 import edu.uci.ics.asterix.metadata.entitytupletranslators.FunctionTupleTranslator;
 import edu.uci.ics.asterix.metadata.entitytupletranslators.IndexTupleTranslator;
 import edu.uci.ics.asterix.metadata.entitytupletranslators.NodeGroupTupleTranslator;
@@ -61,6 +63,7 @@
 import edu.uci.ics.asterix.metadata.valueextractors.MetadataEntityValueExtractor;
 import edu.uci.ics.asterix.metadata.valueextractors.NestedDatatypeNameValueExtractor;
 import edu.uci.ics.asterix.metadata.valueextractors.TupleCopyValueExtractor;
+import edu.uci.ics.asterix.om.base.AInt32;
 import edu.uci.ics.asterix.om.base.AMutableString;
 import edu.uci.ics.asterix.om.base.AString;
 import edu.uci.ics.asterix.om.types.BuiltinType;
@@ -1153,6 +1156,113 @@
 			throw new MetadataException(e);
 		}
 	}
+	
+	@Override
+	public void addExternalDatasetFile(JobId jobId, ExternalFile externalFile)
+			throws MetadataException, RemoteException {
+		try {
+			// Insert into the 'externalFiles' dataset.
+			ExternalFileTupleTranslator tupleReaderWriter = new ExternalFileTupleTranslator(true);
+			ITupleReference externalFileTuple = tupleReaderWriter.getTupleFromMetadataEntity(externalFile);
+			insertTupleIntoIndex(jobId, MetadataPrimaryIndexes.EXTERNAL_FILE_DATASET, externalFileTuple);
+		} catch (TreeIndexDuplicateKeyException e) {
+			throw new MetadataException("An externalFile with this number " + externalFile.getFileNumber()
+					+ " already exists in dataset '" + externalFile.getDatasetName() + "' in dataverse '"+externalFile.getDataverseName()+"'.", e);
+		} catch (Exception e) {
+			throw new MetadataException(e);
+		}
+	}
+
+	@Override
+	public List<ExternalFile> getExternalDatasetFiles(JobId jobId,
+			Dataset dataset) throws MetadataException, RemoteException {
+		try {
+			ITupleReference searchKey = createTuple(dataset.getDataverseName(),dataset.getDatasetName());
+			ExternalFileTupleTranslator tupleReaderWriter = new ExternalFileTupleTranslator(false);
+			IValueExtractor<ExternalFile> valueExtractor = new MetadataEntityValueExtractor<ExternalFile>(
+					tupleReaderWriter);
+			List<ExternalFile> results = new ArrayList<ExternalFile>();
+			searchIndex(jobId, MetadataPrimaryIndexes.EXTERNAL_FILE_DATASET, searchKey, valueExtractor, results);
+			return results;
+		} catch (Exception e) {
+			throw new MetadataException(e);
+		}
+	}
+
+	@SuppressWarnings("unchecked")
+	public ITupleReference createExternalFileSearchTuple(String dataverseName, String datasetName, int fileNumber) throws HyracksDataException {
+		ISerializerDeserializer<AString> stringSerde = AqlSerializerDeserializerProvider.INSTANCE
+				.getSerializerDeserializer(BuiltinType.ASTRING);
+		ISerializerDeserializer<AInt32> intSerde = AqlSerializerDeserializerProvider.INSTANCE
+				.getSerializerDeserializer(BuiltinType.AINT32);
+
+		AMutableString aString = new AMutableString("");
+		ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(3);
+
+		//dataverse field
+		aString.setValue(dataverseName);
+		stringSerde.serialize(aString, tupleBuilder.getDataOutput());
+		tupleBuilder.addFieldEndOffset();
+
+		//dataset field
+		aString.setValue(datasetName);
+		stringSerde.serialize(aString, tupleBuilder.getDataOutput());
+		tupleBuilder.addFieldEndOffset();
+
+		//file number field
+		intSerde.serialize(new AInt32(fileNumber), tupleBuilder.getDataOutput());
+		tupleBuilder.addFieldEndOffset();
+
+		ArrayTupleReference tuple = new ArrayTupleReference();
+		tuple.reset(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray());
+		return tuple;
+		}
+
+	public ExternalFile getExternalDatasetFile(JobId jobId,String dataverseName, String datasetName,
+			int fileNumber) throws MetadataException, RemoteException {
+		try {
+			//create the search key
+			ITupleReference searchKey = createExternalFileSearchTuple(dataverseName, datasetName, fileNumber);
+			ExternalFileTupleTranslator tupleReaderWriter = new ExternalFileTupleTranslator(false);
+			IValueExtractor<ExternalFile> valueExtractor = new MetadataEntityValueExtractor<ExternalFile>(
+					tupleReaderWriter);
+			List<ExternalFile> results = new ArrayList<ExternalFile>();
+			searchIndex(jobId, MetadataPrimaryIndexes.EXTERNAL_FILE_DATASET, searchKey, valueExtractor, results);
+			return results.get(0);
+		} catch (Exception e) {
+			throw new MetadataException(e);
+		}
+	}
+	
+	@Override
+	public void dropExternalFile(JobId jobId, String dataverseName,
+			String datasetName, int fileNumber) throws MetadataException,
+			RemoteException {
+		ExternalFile externalFile;
+		try {
+			externalFile = getExternalDatasetFile(jobId, dataverseName, datasetName,fileNumber);
+		} catch (Exception e) {
+			throw new MetadataException(e);
+		}
+		if (externalFile == null) {
+			throw new MetadataException("Cannot drop external file because it doesn't exist.");
+		}
+		try {
+			// Delete entry from the 'ExternalFile' dataset.
+			ITupleReference searchKey = createExternalFileSearchTuple(dataverseName, datasetName, fileNumber);
+			// Searches the index for the tuple to be deleted. Acquires an S
+			// lock on the 'ExternalFile' dataset.
+			ITupleReference datasetTuple = getTupleToBeDeleted(jobId,
+					MetadataPrimaryIndexes.EXTERNAL_FILE_DATASET, searchKey);
+			deleteTupleFromIndex(jobId, MetadataPrimaryIndexes.EXTERNAL_FILE_DATASET, datasetTuple);
+
+		} catch (TreeIndexException e) {
+			throw new MetadataException("Couldn't drop externalFile.", e);
+		} catch (Exception e) {
+			throw new MetadataException(e);
+		}
+	}
+
 
 	@Override
 	public int getMostRecentDatasetId() throws MetadataException, RemoteException {
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataManager.java
index 22c5e46..53f72dd 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataManager.java
@@ -26,6 +26,7 @@
 import edu.uci.ics.asterix.metadata.entities.DatasourceAdapter;
 import edu.uci.ics.asterix.metadata.entities.Datatype;
 import edu.uci.ics.asterix.metadata.entities.Dataverse;
+import edu.uci.ics.asterix.metadata.entities.ExternalFile;
 import edu.uci.ics.asterix.metadata.entities.Function;
 import edu.uci.ics.asterix.metadata.entities.Index;
 import edu.uci.ics.asterix.metadata.entities.Node;
@@ -439,6 +440,36 @@
      */
     public List<Function> getDataverseFunctions(MetadataTransactionContext ctx, String dataverseName)
             throws MetadataException;
+    
+    /**
+     * @param mdTxnCtx
+     *            MetadataTransactionContext of an active metadata transaction.
+     * @param externalFile
+     *            An instance of type ExternalFile that represents the external file being
+     *            added
+     * @throws MetadataException
+     */
+    public void addExternalFile(MetadataTransactionContext mdTxnCtx, ExternalFile externalFile) throws MetadataException;
+    
+    /**
+     * @param mdTxnCtx
+     *            MetadataTransactionContext of an active metadata transaction.
+     * @param dataset
+     *            An instance of type Dataset that represents the "external" dataset 
+     * @return A list of external files belonging to the dataset
+     * @throws MetadataException
+     */
+    public List<ExternalFile> getDatasetExternalFiles(MetadataTransactionContext mdTxnCtx, Dataset dataset) throws MetadataException;
+
+    /**
+     * @param mdTxnCtx
+     *            MetadataTransactionContext of an active metadata transaction.
+     * @param externalFile
+     *            An instance of type ExternalFile that represents the external file being
+     *            dropped
+     * @throws MetadataException
+     */
+    public void dropExternalFile(MetadataTransactionContext mdTxnCtx, ExternalFile externalFile) throws MetadataException;
 
     public void initializeDatasetIdFactory(MetadataTransactionContext ctx) throws MetadataException;
     
@@ -453,4 +484,5 @@
     public void releaseReadLatch();
 
 
+
 }
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataNode.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataNode.java
index d1e63e1..206ef8a 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataNode.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataNode.java
@@ -28,6 +28,7 @@
 import edu.uci.ics.asterix.metadata.entities.DatasourceAdapter;
 import edu.uci.ics.asterix.metadata.entities.Datatype;
 import edu.uci.ics.asterix.metadata.entities.Dataverse;
+import edu.uci.ics.asterix.metadata.entities.ExternalFile;
 import edu.uci.ics.asterix.metadata.entities.Function;
 import edu.uci.ics.asterix.metadata.entities.Index;
 import edu.uci.ics.asterix.metadata.entities.Node;
@@ -471,6 +472,45 @@
      * @throws RemoteException
      */
     public void addAdapter(JobId jobId, DatasourceAdapter adapter) throws MetadataException, RemoteException;
+    
+    /**
+     * @param jobId
+     *            A globally unique id for an active metadata transaction.
+     * @param externalFile
+     * 			  An object representing the external file entity
+     * @throws MetadataException
+     *             for example, if the file already exists.
+     * @throws RemoteException
+     */
+	public void addExternalDatasetFile(JobId jobId, ExternalFile externalFile) throws MetadataException, RemoteException;
+	
+	/**
+     * @param jobId
+     *            A globally unique id for an active metadata transaction.
+     * @param dataset
+     *            A dataset the files belongs to.
+     * @throws MetadataException
+     * @throws RemoteException
+     */
+	public List<ExternalFile> getExternalDatasetFiles(JobId jobId, Dataset dataset
+			) throws MetadataException, RemoteException;
+	
+	/**
+     * Deletes an externalFile , acquiring local locks on behalf of the given
+     * transaction id.
+     * 
+     * @param jobId
+     *            A globally unique id for an active metadata transaction.
+     * @param dataverseName
+     *            dataverse asociated with the external dataset that owns the file to be deleted.
+     * @param datasetName
+     *            Name of dataset owning the file to be deleted.
+     * @param fileNumber
+     * 			  the id number for the file to be deleted
+     * @throws RemoteException
+     */
+    public void dropExternalFile(JobId jobId, String dataverseName, String datasetName, int fileNumber) throws MetadataException,
+            RemoteException;
 
     public void initializeDatasetIdFactory(JobId jobId) throws MetadataException, RemoteException;
     
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
index aa976f8..9d795f5 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -118,7 +118,7 @@
                 MetadataPrimaryIndexes.DATASET_DATASET, MetadataPrimaryIndexes.DATATYPE_DATASET,
                 MetadataPrimaryIndexes.INDEX_DATASET, MetadataPrimaryIndexes.NODE_DATASET,
                 MetadataPrimaryIndexes.NODEGROUP_DATASET, MetadataPrimaryIndexes.FUNCTION_DATASET,
-                MetadataPrimaryIndexes.DATASOURCE_ADAPTER_DATASET };
+                MetadataPrimaryIndexes.DATASOURCE_ADAPTER_DATASET, MetadataPrimaryIndexes.EXTERNAL_FILE_DATASET };
         secondaryIndexes = new IMetadataIndex[] { MetadataSecondaryIndexes.GROUPNAME_ON_DATASET_INDEX,
                 MetadataSecondaryIndexes.DATATYPENAME_ON_DATASET_INDEX,
                 MetadataSecondaryIndexes.DATATYPENAME_ON_DATATYPE_INDEX };
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataPrimaryIndexes.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataPrimaryIndexes.java
index 8bdd92b..100ec40 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataPrimaryIndexes.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataPrimaryIndexes.java
@@ -41,9 +41,11 @@
     public static final int NODEGROUP_DATASET_ID = 6;
     public static final int FUNCTION_DATASET_ID = 7;
     public static final int DATASOURCE_ADAPTER_DATASET_ID = 8;
+    public static final int EXTERNAL_FILE_DATASET_ID = 9;
     public static final int FIRST_AVAILABLE_USER_DATASET_ID = 100;
 
     public static IMetadataIndex DATASOURCE_ADAPTER_DATASET;
+    public static IMetadataIndex EXTERNAL_FILE_DATASET;
 
     /**
      * Create all metadata primary index descriptors. MetadataRecordTypes must
@@ -92,5 +94,11 @@
                 BuiltinType.ASTRING, BuiltinType.ASTRING }, new String[] { "DataverseName", "Name" }, 0,
                 MetadataRecordTypes.DATASOURCE_ADAPTER_RECORDTYPE, DATASOURCE_ADAPTER_DATASET_ID, true, new int[] { 0,
                         1 });
+        
+        EXTERNAL_FILE_DATASET = new MetadataIndex("ExternalFile", null, 4, new IAType[] {
+                BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.AINT32 }, new String[] { "DataverseName", "DatasetName", 
+        		"FileNumber"}, 0,
+                MetadataRecordTypes.EXTERNAL_FILE_RECORDTYPE, EXTERNAL_FILE_DATASET_ID, true, new int[] { 0,
+                        1, 2 });
     }
 }
\ No newline at end of file
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataRecordTypes.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataRecordTypes.java
index d3eee76..11f9c91 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataRecordTypes.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataRecordTypes.java
@@ -47,6 +47,7 @@
     public static ARecordType NODEGROUP_RECORDTYPE;
     public static ARecordType FUNCTION_RECORDTYPE;
     public static ARecordType DATASOURCE_ADAPTER_RECORDTYPE;
+    public static ARecordType EXTERNAL_FILE_RECORDTYPE;
 
     /**
      * Create all metadata record types.
@@ -76,6 +77,7 @@
             NODEGROUP_RECORDTYPE = createNodeGroupRecordType();
             FUNCTION_RECORDTYPE = createFunctionRecordType();
             DATASOURCE_ADAPTER_RECORDTYPE = createDatasourceAdapterRecordType();
+            EXTERNAL_FILE_RECORDTYPE = createExternalFileRecordType();
         } catch (AsterixException e) {
             throw new MetadataException(e);
         }
@@ -357,5 +359,19 @@
                 BuiltinType.ASTRING };
         return new ARecordType("DatasourceAdapterRecordType", fieldNames, fieldTypes, true);
     }
+    
+    public static final int EXTERNAL_FILE_ARECORD_DATAVERSENAME_FIELD_INDEX = 0;
+    public static final int EXTERNAL_FILE_ARECORD_DATASET_NAME_FIELD_INDEX = 1;
+    public static final int EXTERNAL_FILE_ARECORD_FILE_NUMBER_FIELD_INDEX = 2;
+    public static final int EXTERNAL_FILE_ARECORD_FILE_NAME_FIELD_INDEX = 3;
+    public static final int EXTERNAL_FILE_ARECORD_FILE_SIZE_FIELD_INDEX = 4;
+    public static final int EXTERNAL_FILE_ARECORD_FILE_MOD_DATE_FIELD_INDEX = 5;
+    
+    private static ARecordType createExternalFileRecordType() throws AsterixException {
+    	String[] fieldNames = { "DataverseName", "DatasetName", "FileNumber", "FileName", "FileSize", "FileModDate"};
+        IAType[] fieldTypes = { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.AINT32, BuiltinType.ASTRING, BuiltinType.AINT64,
+        		BuiltinType.ADATETIME};
+        return new ARecordType("ExternalFileRecordType", fieldNames, fieldTypes, true);
+    }
 
 }
\ No newline at end of file
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
index 98320ce..d37bf8f 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
@@ -18,11 +18,14 @@
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.logging.Logger;
 
+import org.apache.hadoop.fs.FileStatus;
+
 import edu.uci.ics.asterix.common.config.AsterixStorageProperties;
 import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
 import edu.uci.ics.asterix.common.config.DatasetConfig.IndexType;
@@ -46,10 +49,12 @@
 import edu.uci.ics.asterix.external.data.operator.ExternalDataScanOperatorDescriptor;
 import edu.uci.ics.asterix.external.data.operator.FeedIntakeOperatorDescriptor;
 import edu.uci.ics.asterix.external.data.operator.FeedMessageOperatorDescriptor;
+import edu.uci.ics.asterix.external.dataset.adapter.AbstractDatasourceAdapter;
 import edu.uci.ics.asterix.external.dataset.adapter.IDatasourceAdapter;
 import edu.uci.ics.asterix.external.dataset.adapter.ITypedDatasourceAdapter;
 import edu.uci.ics.asterix.external.feed.lifecycle.FeedId;
 import edu.uci.ics.asterix.external.feed.lifecycle.IFeedMessage;
+import edu.uci.ics.asterix.external.util.ExternalDataFilesMetadataProvider;
 import edu.uci.ics.asterix.formats.base.IDataFormat;
 import edu.uci.ics.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
 import edu.uci.ics.asterix.formats.nontagged.AqlTypeTraitProvider;
@@ -63,6 +68,7 @@
 import edu.uci.ics.asterix.metadata.entities.Datatype;
 import edu.uci.ics.asterix.metadata.entities.Dataverse;
 import edu.uci.ics.asterix.metadata.entities.ExternalDatasetDetails;
+import edu.uci.ics.asterix.metadata.entities.ExternalFile;
 import edu.uci.ics.asterix.metadata.entities.FeedDatasetDetails;
 import edu.uci.ics.asterix.metadata.entities.Index;
 import edu.uci.ics.asterix.metadata.entities.InternalDatasetDetails;
@@ -158,8 +164,9 @@
     private boolean asyncResults;
     private ResultSetId resultSetId;
     private IResultSerializerFactoryProvider resultSerializerFactoryProvider;
+    private static boolean optimizeExternalIndexes = false;
 
-    private final Dataverse defaultDataverse;
+	private final Dataverse defaultDataverse;
     private JobId jobId;
 
     private final AsterixStorageProperties storageProperties;
@@ -167,22 +174,6 @@
     private static final Map<String, String> adapterFactoryMapping = initializeAdapterFactoryMapping();
     private static Scheduler hdfsScheduler;
 
-    public String getPropertyValue(String propertyName) {
-        return config.get(propertyName);
-    }
-
-    public void setConfig(Map<String, String> config) {
-        this.config = config;
-    }
-
-    public Map<String, String[]> getAllStores() {
-        return stores;
-    }
-
-    public Map<String, String> getConfig() {
-        return config;
-    }
-
     public AqlMetadataProvider(Dataverse defaultDataverse) {
         this.defaultDataverse = defaultDataverse;
         this.stores = AsterixAppContextInfo.getInstance().getMetadataProperties().getStores();
@@ -262,6 +253,30 @@
     public IResultSerializerFactoryProvider getResultSerializerFactoryProvider() {
         return resultSerializerFactoryProvider;
     }
+    
+    public String getPropertyValue(String propertyName) {
+        return config.get(propertyName);
+    }
+
+    public void setConfig(Map<String, String> config) {
+        this.config = config;
+    }
+
+    public Map<String, String[]> getAllStores() {
+        return stores;
+    }
+
+    public Map<String, String> getConfig() {
+        return config;
+    }
+
+    public static boolean isOptimizeExternalIndexes() {
+		return optimizeExternalIndexes;
+	}
+    
+    public static void setOptimizeExternalIndexes(boolean optimizeExternalIndexes) {
+		AqlMetadataProvider.optimizeExternalIndexes = optimizeExternalIndexes;
+	}
 
     @Override
     public AqlDataSource findDataSource(AqlSourceId id) throws AlgebricksException {
@@ -403,13 +418,93 @@
     
     @SuppressWarnings("rawtypes")
 	public Pair<ExternalDataIndexingOperatorDescriptor, AlgebricksPartitionConstraint> buildExternalDataIndexingRuntime(
-			JobSpecification jobSpec, IAType itemType, ExternalDatasetDetails datasetDetails, IDataFormat format)
+			JobSpecification jobSpec, IAType itemType, Dataset dataset, IDataFormat format)
 					throws AlgebricksException {
 		IGenericDatasetAdapterFactory adapterFactory;
 		IDatasourceAdapter adapter;
 		String adapterName;
 		DatasourceAdapter adapterEntity;
 		String adapterFactoryClassname;
+		ExternalDatasetDetails datasetDetails = null;
+		try {
+			datasetDetails = (ExternalDatasetDetails) dataset.getDatasetDetails();
+			adapterName = datasetDetails.getAdapter();
+			adapterEntity = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, MetadataConstants.METADATA_DATAVERSE_NAME,
+					adapterName);
+			if (adapterEntity != null) {
+				adapterFactoryClassname = adapterEntity.getClassname();
+				adapterFactory = (IGenericDatasetAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
+			} else {
+				adapterFactoryClassname = adapterFactoryMapping.get(adapterName);
+				if (adapterFactoryClassname == null) {
+					throw new AlgebricksException(" Unknown adapter :" + adapterName);
+				}
+				adapterFactory = (IGenericDatasetAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
+			}
+
+			adapter = ((IGenericDatasetAdapterFactory) adapterFactory).createIndexingAdapter(
+					wrapProperties(datasetDetails.getProperties()), itemType, null);
+		} catch (AlgebricksException ae) {
+			throw ae;
+		} catch (Exception e) {
+			e.printStackTrace();
+			throw new AlgebricksException("Unable to create adapter " + e);
+		}
+		if (!(adapter.getAdapterType().equals(IDatasourceAdapter.AdapterType.READ) || adapter.getAdapterType().equals(
+				IDatasourceAdapter.AdapterType.READ_WRITE))) {
+			throw new AlgebricksException("external dataset adapter does not support read operation");
+		}
+		ARecordType rt = (ARecordType) itemType;
+		ISerializerDeserializer payloadSerde = format.getSerdeProvider().getSerializerDeserializer(itemType);
+		RecordDescriptor indexerDesc = new RecordDescriptor(new ISerializerDeserializer[] { payloadSerde });
+		ExternalDataIndexingOperatorDescriptor dataIndexScanner = null;
+		List<ExternalFile> files = null;
+		HashMap<String, Integer> filesNumbers = null;
+		if(optimizeExternalIndexes)
+		{
+			try {
+				files = MetadataManager.INSTANCE.getDatasetExternalFiles(mdTxnCtx, dataset);
+			} catch (MetadataException e) {
+				e.printStackTrace();
+				throw new AlgebricksException("Unable to get list of external files from metadata " + e);
+			}
+			
+			filesNumbers = new HashMap<String,Integer>();
+			for(int i=0; i< files.size(); i++)
+			{
+				filesNumbers.put(files.get(i).getFileName(), files.get(i).getFileNumber());
+			}
+			
+			dataIndexScanner = new ExternalDataIndexingOperatorDescriptor(jobSpec,
+					wrapPropertiesEmpty(datasetDetails.getProperties()), rt, indexerDesc, adapterFactory,filesNumbers);
+		}
+		else
+		{
+		dataIndexScanner = new ExternalDataIndexingOperatorDescriptor(jobSpec,
+				wrapPropertiesEmpty(datasetDetails.getProperties()), rt, indexerDesc, adapterFactory,filesNumbers);
+		}
+		AlgebricksPartitionConstraint constraint;
+		try {
+			constraint = adapter.getPartitionConstraint();
+		} catch (Exception e) {
+			throw new AlgebricksException(e);
+		}
+		return new Pair<ExternalDataIndexingOperatorDescriptor, AlgebricksPartitionConstraint>(dataIndexScanner, constraint);
+	}
+    
+    public ArrayList<ExternalFile> getExternalDatasetFiles(Dataset dataset) throws AlgebricksException
+	{
+    	ArrayList<ExternalFile> files = new ArrayList<ExternalFile>();
+		if(dataset.getDatasetType() != DatasetType.EXTERNAL)
+		{
+			throw new AlgebricksException("Can only get external dataset files");
+		}
+		ExternalDatasetDetails datasetDetails = (ExternalDatasetDetails)dataset.getDatasetDetails();
+		IGenericDatasetAdapterFactory adapterFactory;
+		IDatasourceAdapter adapter;
+		String adapterName;
+		DatasourceAdapter adapterEntity;
+		String adapterFactoryClassname;
 		try {
 			adapterName = datasetDetails.getAdapter();
 			adapterEntity = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, MetadataConstants.METADATA_DATAVERSE_NAME,
@@ -425,30 +520,28 @@
 				adapterFactory = (IGenericDatasetAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
 			}
 
-			adapter = ((IGenericDatasetAdapterFactory) adapterFactory).createIndexingAdapter(
-					wrapProperties(datasetDetails.getProperties()), itemType);
-		} catch (AlgebricksException ae) {
-			throw ae;
-		} catch (Exception e) {
+			adapter = ((IGenericDatasetAdapterFactory) adapterFactory).createAdapter(
+					wrapProperties(datasetDetails.getProperties()), null);
+		}
+		catch (Exception e) {
 			e.printStackTrace();
 			throw new AlgebricksException("Unable to create adapter " + e);
 		}
-		if (!(adapter.getAdapterType().equals(IDatasourceAdapter.AdapterType.READ) || adapter.getAdapterType().equals(
-				IDatasourceAdapter.AdapterType.READ_WRITE))) {
-			throw new AlgebricksException("external dataset adapter does not support read operation");
-		}
-		ARecordType rt = (ARecordType) itemType;
-		ISerializerDeserializer payloadSerde = format.getSerdeProvider().getSerializerDeserializer(itemType);
-		RecordDescriptor recordDesc = new RecordDescriptor(new ISerializerDeserializer[] { payloadSerde });
-		ExternalDataIndexingOperatorDescriptor dataIndexScanner = new ExternalDataIndexingOperatorDescriptor(jobSpec,
-				wrapPropertiesEmpty(datasetDetails.getProperties()), rt, recordDesc, adapterFactory);
-		AlgebricksPartitionConstraint constraint;
+		
 		try {
-			constraint = adapter.getPartitionConstraint();
-		} catch (Exception e) {
-			throw new AlgebricksException(e);
+			ArrayList<FileStatus> fileStatuses = ExternalDataFilesMetadataProvider.getHDFSFileStatus((AbstractDatasourceAdapter) adapter);
+			for(int i=0; i<fileStatuses.size(); i++)
+			{
+				files.add(new ExternalFile(dataset.getDataverseName(), dataset.getDatasetName(), new Date(fileStatuses.get(i).getModificationTime()),
+						fileStatuses.get(i).getLen(),
+						fileStatuses.get(i).getPath().toUri().getPath(),
+						i));
+			}
+			return files;
+		} catch (IOException e) {
+			e.printStackTrace();
+			throw new AlgebricksException("Unable to get list of HDFS files " + e);
 		}
-		return new Pair<ExternalDataIndexingOperatorDescriptor, AlgebricksPartitionConstraint>(dataIndexScanner, constraint);
 	}
 
 	@SuppressWarnings("rawtypes")
@@ -503,10 +596,33 @@
 		IDataFormat format = NonTaggedDataFormat.INSTANCE;
 		ISerializerDeserializer payloadSerde = format.getSerdeProvider().getSerializerDeserializer(itemType);
 		RecordDescriptor outRecDesc = new RecordDescriptor(new ISerializerDeserializer[] { payloadSerde });
-		ExternalDataAccessByRIDOperatorDescriptor dataAccessor = new ExternalDataAccessByRIDOperatorDescriptor(jobSpec, wrapPropertiesEmpty(datasetDetails.getProperties()),
-				itemType, outRecDesc, adapterFactory);
+
+		ExternalDataAccessByRIDOperatorDescriptor dataAccessOperator = null;
+		if(optimizeExternalIndexes)
+		{
+			//create the hashmap
+			List<ExternalFile> files=null;
+			try {
+				files = MetadataManager.INSTANCE.getDatasetExternalFiles(mdTxnCtx, dataset);
+			} catch (MetadataException e) {
+				e.printStackTrace();
+				throw new AlgebricksException("Couldn't get file names for access by optimized RIDs",e);
+			}
+			HashMap<Integer, String> filesMapping = new HashMap<Integer, String>();
+			for(int i=0; i < files.size(); i++)
+			{
+				filesMapping.put(files.get(i).getFileNumber(), files.get(i).getFileName());
+			}
+			dataAccessOperator = new ExternalDataAccessByRIDOperatorDescriptor(jobSpec, wrapPropertiesEmpty(datasetDetails.getProperties()),
+					itemType, outRecDesc, adapterFactory, filesMapping);
+		}
+		else
+		{
+			dataAccessOperator = new ExternalDataAccessByRIDOperatorDescriptor(jobSpec, wrapPropertiesEmpty(datasetDetails.getProperties()),
+					itemType, outRecDesc, adapterFactory, null);
+		}
 		Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraints = splitProviderAndPartitionConstraintsForExternalDataset(dataset.getDataverseName(),dataset.getDatasetName(),secondaryIndex.getIndexName());
-		return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(dataAccessor, splitsAndConstraints.second);
+		return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(dataAccessOperator, splitsAndConstraints.second);
 	}
 
     @SuppressWarnings("rawtypes")
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/ExternalFile.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/ExternalFile.java
new file mode 100644
index 0000000..0128783
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/ExternalFile.java
@@ -0,0 +1,110 @@
+package edu.uci.ics.asterix.metadata.entities;
+
+import java.util.Date;
+
+import edu.uci.ics.asterix.metadata.MetadataCache;
+import edu.uci.ics.asterix.metadata.api.IMetadataEntity;
+
+public class ExternalFile implements IMetadataEntity{
+
+	/**
+	 * A class for metadata entity externalFile
+	 * This class represents an external dataset file and is intended for use with external data indexes
+	 */
+	private static final long serialVersionUID = 1L;
+	
+	private String dataverseName;
+	private String datasetName;
+	private Date lastModefiedTime;
+	private long size;
+	private String fileName;
+	private int fileNumber;
+	
+	
+	public ExternalFile(String dataverseName, String datasetName, Date lastModefiedTime, long size, String fileName,
+			int fileNumber) {
+		this.dataverseName = dataverseName;
+		this.datasetName = datasetName;
+		this.lastModefiedTime = lastModefiedTime;
+		this.size = size;
+		this.fileName = fileName;
+		this.fileNumber = fileNumber;
+	}
+
+	public String getDataverseName() {
+		return dataverseName;
+	}
+
+	public void setDataverseName(String dataverseName) {
+		this.dataverseName = dataverseName;
+	}
+
+	public String getDatasetName() {
+		return datasetName;
+	}
+
+	public void setDatasetName(String datasetName) {
+		this.datasetName = datasetName;
+	}
+	public Date getLastModefiedTime() {
+		return lastModefiedTime;
+	}
+
+	public void setLastModefiedTime(Date lastModefiedTime) {
+		this.lastModefiedTime = lastModefiedTime;
+	}
+
+	public long getSize() {
+		return size;
+	}
+
+	public void setSize(long size) {
+		this.size = size;
+	}
+
+	public String getFileName() {
+		return fileName;
+	}
+
+	public void setFileName(String fileName) {
+		this.fileName = fileName;
+	}
+
+	public int getFileNumber() {
+		return fileNumber;
+	}
+
+	public void setFileNumber(int fileNumber) {
+		this.fileNumber = fileNumber;
+	}
+
+	@Override
+	public Object addToCache(MetadataCache cache) {
+		//return cache.addExternalFileIfNotExists(this);
+		return null;
+	}
+
+	@Override
+	public Object dropFromCache(MetadataCache cache) {
+		//cache.dropExternalFile(this);
+		return null;
+	}
+
+	@Override
+	public boolean equals(Object obj)
+	{
+		if (obj == null)
+            return false;
+        if (obj == this)
+            return true;
+        if (!(obj instanceof ExternalFile))
+            return false;
+        ExternalFile anotherFile = (ExternalFile) obj;
+        if(fileNumber != anotherFile.fileNumber)
+        	return false;
+        if(!fileName.equals(anotherFile.fileName))
+        	return false;
+        return true;
+	}
+	
+}
\ No newline at end of file
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/ExternalFileTupleTranslator.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/ExternalFileTupleTranslator.java
new file mode 100644
index 0000000..6837c72
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/ExternalFileTupleTranslator.java
@@ -0,0 +1,158 @@
+package edu.uci.ics.asterix.metadata.entitytupletranslators;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.Date;
+
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import edu.uci.ics.asterix.metadata.MetadataException;
+import edu.uci.ics.asterix.metadata.bootstrap.MetadataPrimaryIndexes;
+import edu.uci.ics.asterix.metadata.bootstrap.MetadataRecordTypes;
+import edu.uci.ics.asterix.metadata.entities.ExternalFile;
+import edu.uci.ics.asterix.om.base.ADateTime;
+import edu.uci.ics.asterix.om.base.AInt32;
+import edu.uci.ics.asterix.om.base.AInt64;
+import edu.uci.ics.asterix.om.base.AMutableDateTime;
+import edu.uci.ics.asterix.om.base.AMutableInt32;
+import edu.uci.ics.asterix.om.base.AMutableInt64;
+import edu.uci.ics.asterix.om.base.ARecord;
+import edu.uci.ics.asterix.om.base.AString;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+
+public class ExternalFileTupleTranslator extends AbstractTupleTranslator<ExternalFile>{
+
+	// Field indexes of serialized ExternalFile in a tuple.
+    // First key field.
+    public static final int EXTERNAL_FILE_DATAVERSENAME_TUPLE_FIELD_INDEX = 0;
+    // Second key field.
+    public static final int EXTERNAL_FILE_DATASETNAME_TUPLE_FIELD_INDEX = 1;
+    // Third key field
+    public static final int EXTERNAL_FILE_NUMBER_TUPLE_FIELD_INDEX = 2;
+    // Payload field containing serialized ExternalFile.
+    public static final int EXTERNAL_FILE_PAYLOAD_TUPLE_FIELD_INDEX = 3;
+
+    protected AMutableInt32 aInt32 = new AMutableInt32(0);
+    protected AMutableDateTime aDateTime = new AMutableDateTime(0);
+    protected AMutableInt64 aInt64 = new AMutableInt64(0);
+    
+    @SuppressWarnings("unchecked")
+	protected ISerializerDeserializer<AInt32> intSerde = AqlSerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(BuiltinType.AINT32);
+    @SuppressWarnings("unchecked")
+	protected ISerializerDeserializer<ADateTime> dateTimeSerde = AqlSerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(BuiltinType.ADATETIME);
+    @SuppressWarnings("unchecked")
+	protected ISerializerDeserializer<AInt64> longSerde = AqlSerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(BuiltinType.AINT64);
+    @SuppressWarnings("unchecked")
+    private ISerializerDeserializer<ARecord> recordSerDes = AqlSerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(MetadataRecordTypes.EXTERNAL_FILE_RECORDTYPE);
+    
+	public ExternalFileTupleTranslator(boolean getTuple) {
+		super(getTuple, MetadataPrimaryIndexes.EXTERNAL_FILE_DATASET.getFieldCount());
+	}
+
+	@Override
+	public ExternalFile getMetadataEntytiFromTuple(ITupleReference tuple)
+			throws MetadataException, IOException {
+		byte[] serRecord = tuple.getFieldData(EXTERNAL_FILE_PAYLOAD_TUPLE_FIELD_INDEX);
+		int recordStartOffset = tuple.getFieldStart(EXTERNAL_FILE_PAYLOAD_TUPLE_FIELD_INDEX);
+        int recordLength = tuple.getFieldLength(EXTERNAL_FILE_PAYLOAD_TUPLE_FIELD_INDEX);
+        ByteArrayInputStream stream = new ByteArrayInputStream(serRecord, recordStartOffset, recordLength);
+        DataInput in = new DataInputStream(stream);
+        ARecord externalFileRecord = (ARecord) recordSerDes.deserialize(in);
+        return createExternalFileFromARecord(externalFileRecord);
+	}
+
+	private ExternalFile createExternalFileFromARecord(ARecord externalFileRecord) {
+		String dataverseName = ((AString) externalFileRecord
+                .getValueByPos(MetadataRecordTypes.EXTERNAL_FILE_ARECORD_DATAVERSENAME_FIELD_INDEX))
+                .getStringValue();
+        String datasetName = ((AString) externalFileRecord
+                .getValueByPos(MetadataRecordTypes.EXTERNAL_FILE_ARECORD_DATASET_NAME_FIELD_INDEX)).getStringValue();
+        String FileName = ((AString) externalFileRecord
+                .getValueByPos(MetadataRecordTypes.EXTERNAL_FILE_ARECORD_FILE_NAME_FIELD_INDEX)).getStringValue();
+        int fileNumber = ((AInt32) externalFileRecord
+                .getValueByPos(MetadataRecordTypes.EXTERNAL_FILE_ARECORD_FILE_NUMBER_FIELD_INDEX)).getIntegerValue();
+        Date lastMoDifiedDate = new Date(((ADateTime) externalFileRecord
+                .getValueByPos(MetadataRecordTypes.EXTERNAL_FILE_ARECORD_FILE_MOD_DATE_FIELD_INDEX)).getChrononTime());
+        long fileSize = ((AInt64) externalFileRecord
+                .getValueByPos(MetadataRecordTypes.EXTERNAL_FILE_ARECORD_FILE_SIZE_FIELD_INDEX)).getLongValue();
+        
+        return new ExternalFile(dataverseName, datasetName,lastMoDifiedDate,fileSize,FileName,fileNumber);
+	}
+
+	@Override
+	public ITupleReference getTupleFromMetadataEntity(
+			ExternalFile externalFile) throws MetadataException, IOException {
+		// write the key in the first 3 fields of the tuple
+        tupleBuilder.reset();
+        //dataverse name
+        aString.setValue(externalFile.getDataverseName());
+        stringSerde.serialize(aString, tupleBuilder.getDataOutput());
+        tupleBuilder.addFieldEndOffset();
+        //dataset name
+        aString.setValue(externalFile.getDatasetName());
+        stringSerde.serialize(aString, tupleBuilder.getDataOutput());
+        tupleBuilder.addFieldEndOffset();
+        //file number
+        aInt32.setValue(externalFile.getFileNumber());
+        intSerde.serialize(aInt32, tupleBuilder.getDataOutput());
+        tupleBuilder.addFieldEndOffset();
+        
+        // write the pay-load in the fourth field of the tuple
+        recordBuilder.reset(MetadataRecordTypes.EXTERNAL_FILE_RECORDTYPE);
+
+        // write field 0
+        fieldValue.reset();
+        aString.setValue(externalFile.getDataverseName());
+        stringSerde.serialize(aString, fieldValue.getDataOutput());
+        recordBuilder.addField(MetadataRecordTypes.EXTERNAL_FILE_ARECORD_DATAVERSENAME_FIELD_INDEX, fieldValue);
+
+        // write field 1
+        fieldValue.reset();
+        aString.setValue(externalFile.getDatasetName());
+        stringSerde.serialize(aString, fieldValue.getDataOutput());
+        recordBuilder.addField(MetadataRecordTypes.EXTERNAL_FILE_ARECORD_DATASET_NAME_FIELD_INDEX, fieldValue);
+
+        // write field 2
+        fieldValue.reset();
+        aInt32.setValue(externalFile.getFileNumber());
+        intSerde.serialize(aInt32, fieldValue.getDataOutput());
+        recordBuilder.addField(MetadataRecordTypes.EXTERNAL_FILE_ARECORD_FILE_NUMBER_FIELD_INDEX, fieldValue);
+
+        // write field 3
+        fieldValue.reset();
+        aString.setValue(externalFile.getFileName());
+        stringSerde.serialize(aString, fieldValue.getDataOutput());
+        recordBuilder.addField(MetadataRecordTypes.EXTERNAL_FILE_ARECORD_FILE_NAME_FIELD_INDEX, fieldValue);
+
+        // write field 4
+        fieldValue.reset();
+        aInt64.setValue(externalFile.getSize());
+        longSerde.serialize(aInt64, fieldValue.getDataOutput());
+        recordBuilder.addField(MetadataRecordTypes.EXTERNAL_FILE_ARECORD_FILE_SIZE_FIELD_INDEX, fieldValue);
+
+        //write field 5 
+        fieldValue.reset();
+        aDateTime.setValue(externalFile.getLastModefiedTime().getTime());
+        dateTimeSerde.serialize(aDateTime, fieldValue.getDataOutput());
+        recordBuilder.addField(MetadataRecordTypes.EXTERNAL_FILE_ARECORD_FILE_MOD_DATE_FIELD_INDEX, fieldValue);
+        
+        // write record
+        try {
+            recordBuilder.write(tupleBuilder.getDataOutput(), true);
+        } catch (AsterixException e) {
+            throw new MetadataException(e);
+        }
+        tupleBuilder.addFieldEndOffset();
+
+        tuple.reset(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray());
+        return tuple;
+	}
+}
\ No newline at end of file
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/utils/DatasetUtils.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/utils/DatasetUtils.java
index 534c183..95d26d9 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/utils/DatasetUtils.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/utils/DatasetUtils.java
@@ -20,6 +20,7 @@
 
 import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
 import edu.uci.ics.asterix.formats.nontagged.AqlTypeTraitProvider;
+import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
 import edu.uci.ics.asterix.metadata.entities.Dataset;
 import edu.uci.ics.asterix.metadata.entities.ExternalDatasetDetails;
 import edu.uci.ics.asterix.metadata.entities.InternalDatasetDetails;
@@ -96,18 +97,32 @@
     	ExternalDatasetDetails edd = (ExternalDatasetDetails) dataset.getDatasetDetails();	
     	if (edd.getProperties().get(KEY_INPUT_FORMAT).trim().equals(INPUT_FORMAT_RC))
     	{
-    		//RID: <String(File name), Int64(Block byte location), Int32(row number)>
+    		//RID: <String(File name) OR Int32(File number), Int64(Block byte location), Int32(row number)>
     		IBinaryHashFunctionFactory[] bhffs = new IBinaryHashFunctionFactory[3];
-			bhffs[0] = hashFunProvider.getBinaryHashFunctionFactory(BuiltinType.ASTRING);
+    		if(AqlMetadataProvider.isOptimizeExternalIndexes())
+    		{
+    			bhffs[0] = hashFunProvider.getBinaryHashFunctionFactory(BuiltinType.AINT32);
+    		}
+    		else
+    		{
+    			bhffs[0] = hashFunProvider.getBinaryHashFunctionFactory(BuiltinType.ASTRING);
+    		}
 			bhffs[1] = hashFunProvider.getBinaryHashFunctionFactory(BuiltinType.AINT64);
 			bhffs[2] = hashFunProvider.getBinaryHashFunctionFactory(BuiltinType.AINT32);
 			return bhffs;
     	}
 		else
 		{
-			//RID: <String(File name), Int64(Record byte location)>
+			//RID: <String(File name) OR Int32(File number), Int64(Record byte location)>
 			IBinaryHashFunctionFactory[] bhffs = new IBinaryHashFunctionFactory[2];
-			bhffs[0] = hashFunProvider.getBinaryHashFunctionFactory(BuiltinType.ASTRING);
+			if(AqlMetadataProvider.isOptimizeExternalIndexes())
+    		{
+    			bhffs[0] = hashFunProvider.getBinaryHashFunctionFactory(BuiltinType.AINT32);
+    		}
+    		else
+    		{
+    			bhffs[0] = hashFunProvider.getBinaryHashFunctionFactory(BuiltinType.ASTRING);
+    		}
 			bhffs[1] = hashFunProvider.getBinaryHashFunctionFactory(BuiltinType.AINT64);
 			return bhffs;
 		}
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapterFactory.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapterFactory.java
index 23d4ef4..c576cf8 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapterFactory.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapterFactory.java
@@ -14,6 +14,7 @@
  */
 package edu.uci.ics.asterix.tools.external.data;
 
+import java.util.HashMap;
 import java.util.Map;
 
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
@@ -72,7 +73,7 @@
     
     @Override
 	public IDatasourceAdapter createIndexingAdapter(
-			Map<String, Object> configuration, IAType atype) throws Exception {
+			Map<String, Object> configuration, IAType atype, Map<String,Integer> files) throws Exception {
 		throw new NotImplementedException("Rate Controlled Indexing Adapter is not implemented for feeds");
 	}
 
@@ -93,7 +94,7 @@
 
 	@Override
 	public IControlledAdapter createAccessByRIDAdapter(
-			Map<String, Object> configuration, IAType atype) throws Exception {
+			Map<String, Object> configuration, IAType atype, HashMap<Integer,String> files) throws Exception {
 		throw new NotImplementedException("Rate Controlled Access by RID Adapter is not implemented for feeds");
 	}