Merge branch 'master' into raman/asterix_lsm_stabilization_coredump
diff --git a/asterix-app/src/test/resources/hadoop/conf/mapred-site.xml b/asterix-app/src/test/resources/hadoop/conf/mapred-site.xml
index 1b9a4d6..b39fced 100644
--- a/asterix-app/src/test/resources/hadoop/conf/mapred-site.xml
+++ b/asterix-app/src/test/resources/hadoop/conf/mapred-site.xml
@@ -5,21 +5,21 @@
 
 <configuration>
 
-  <property>
-    <name>mapred.job.tracker</name>
-    <value>localhost:29007</value>
-  </property>
-  <property>
-     <name>mapred.tasktracker.map.tasks.maximum</name>
-     <value>20</value>
-  </property>
-   <property>
-      <name>mapred.tasktracker.reduce.tasks.maximum</name>
-      <value>20</value>
-   </property>
-   <property>
-      <name>mapred.min.split.size</name>
-      <value>65536</value>
-   </property>
+	<property>
+		<name>mapred.job.tracker</name>
+		<value>localhost:29007</value>
+	</property>
+	<property>
+		<name>mapred.tasktracker.map.tasks.maximum</name>
+		<value>20</value>
+	</property>
+	<property>
+		<name>mapred.tasktracker.reduce.tasks.maximum</name>
+		<value>20</value>
+	</property>
+	<property>
+		<name>mapred.max.split.size</name>
+		<value>128</value>
+	</property>
 
 </configuration>
diff --git a/asterix-app/src/test/resources/runtimets/queries/hdfs/issue_245_hdfs/issue_245_hdfs.3.query.aql b/asterix-app/src/test/resources/runtimets/queries/hdfs/issue_245_hdfs/issue_245_hdfs.3.query.aql
index 653ee6c..527a0e5 100644
--- a/asterix-app/src/test/resources/runtimets/queries/hdfs/issue_245_hdfs/issue_245_hdfs.3.query.aql
+++ b/asterix-app/src/test/resources/runtimets/queries/hdfs/issue_245_hdfs/issue_245_hdfs.3.query.aql
@@ -8,4 +8,5 @@
 use dataverse test;
 
 for $x in dataset('TextDataset')
+order by $x.line
 return $x
diff --git a/asterix-app/src/test/resources/runtimets/results/hdfs/issue_245_hdfs/issue_245_hdfs.1.adm b/asterix-app/src/test/resources/runtimets/results/hdfs/issue_245_hdfs/issue_245_hdfs.1.adm
index 8af2f5f..59425b1 100644
--- a/asterix-app/src/test/resources/runtimets/results/hdfs/issue_245_hdfs/issue_245_hdfs.1.adm
+++ b/asterix-app/src/test/resources/runtimets/results/hdfs/issue_245_hdfs/issue_245_hdfs.1.adm
@@ -1,4 +1,4 @@
+{ "line": "ASTERIX is taking an open stance on data formats and addressing research issues including highly scalable data storage and indexing, semi-structured query processing on very large clusters, and merging parallel database techniques with todays data-intensive computing techniques to support performant yet declarative solutions to the problem of analyzing semi-structured information" }
+{ "line": "ASTERIX targets a wide range of semi-structured information, ranging from data use cases where information is well-tagged and highly regular to content use cases where data is irregular and much of each datum is textual" }
 { "line": "The ASTERIX project is developing new technologies for ingesting, storing, managing, indexing, querying, analyzing, and subscribing to vast quantities of semi-structured information" }
 { "line": "The project is combining ideas from three distinct areas semi-structured data, parallel databases, and data-intensive computing  to create a next-generation, open source software platform that scales by running on large, shared-nothing commodity computing clusters" }
-{ "line": "ASTERIX targets a wide range of semi-structured information, ranging from data use cases where information is well-tagged and highly regular to content use cases where data is irregular and much of each datum is textual" }
-{ "line": "ASTERIX is taking an open stance on data formats and addressing research issues including highly scalable data storage and indexing, semi-structured query processing on very large clusters, and merging parallel database techniques with todays data-intensive computing techniques to support performant yet declarative solutions to the problem of analyzing semi-structured information" }
diff --git a/asterix-doc/pom.xml b/asterix-doc/pom.xml
index d987e5f..4b8cb2f 100644
--- a/asterix-doc/pom.xml
+++ b/asterix-doc/pom.xml
@@ -1,21 +1,21 @@
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-	<modelVersion>4.0.0</modelVersion>
-	<parent>
-		<artifactId>asterix</artifactId>
-		<groupId>edu.uci.ics.asterix</groupId>
-		<version>0.0.6-SNAPSHOT</version>
-	</parent>
-	<artifactId>asterix-doc</artifactId>
-	<build>
-		<plugins>
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-site-plugin</artifactId>
-				<version>3.3</version>
-				<configuration>
-				  <generateReports>false</generateReports>
-				</configuration>
-			</plugin>
-		</plugins>
-	</build>
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <artifactId>asterix</artifactId>
+    <groupId>edu.uci.ics.asterix</groupId>
+    <version>0.0.6-SNAPSHOT</version>
+  </parent>
+  <artifactId>asterix-doc</artifactId>
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-site-plugin</artifactId>
+        <version>3.3</version>
+        <configuration>
+          <generateReports>false</generateReports>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
 </project>
diff --git a/asterix-doc/src/site/markdown/AsterixDBRestAPI.md b/asterix-doc/src/site/markdown/api.md
similarity index 100%
rename from asterix-doc/src/site/markdown/AsterixDBRestAPI.md
rename to asterix-doc/src/site/markdown/api.md
diff --git a/asterix-doc/src/site/markdown/AsterixDBDataModel.md b/asterix-doc/src/site/markdown/aql/datamodel.md
similarity index 100%
rename from asterix-doc/src/site/markdown/AsterixDBDataModel.md
rename to asterix-doc/src/site/markdown/aql/datamodel.md
diff --git a/asterix-doc/src/site/markdown/AccessingExternalDataInAsterixDB.md b/asterix-doc/src/site/markdown/aql/externaldata.md
similarity index 98%
rename from asterix-doc/src/site/markdown/AccessingExternalDataInAsterixDB.md
rename to asterix-doc/src/site/markdown/aql/externaldata.md
index 7319094..e603954 100644
--- a/asterix-doc/src/site/markdown/AccessingExternalDataInAsterixDB.md
+++ b/asterix-doc/src/site/markdown/aql/externaldata.md
@@ -10,7 +10,7 @@
 
 As an example we consider the Lineitem dataset from [TPCH schema](http://www.openlinksw.com/dataspace/doc/dav/wiki/Main/VOSTPCHLinkedData/tpch.sql).
 
-We assume that you have successfully created an ASTERIX instance following the instructions at [Installing Asterix Using Managix](InstallingAsterixUsingManagix.html).
+We assume that you have successfully created an ASTERIX instance following the instructions at [Installing Asterix Using Managix](../install.html).
 _For constructing an example, we assume a single machine setup._
 
 Similar to a regular dataset, an external dataset has an associated datatype.  We shall first create the datatype associated with each record in Lineitem data.
diff --git a/asterix-doc/src/site/markdown/AsterixDBFunctions.md b/asterix-doc/src/site/markdown/aql/functions.md
similarity index 100%
rename from asterix-doc/src/site/markdown/AsterixDBFunctions.md
rename to asterix-doc/src/site/markdown/aql/functions.md
diff --git a/asterix-doc/src/site/markdown/AsterixQueryLanguageReference.md b/asterix-doc/src/site/markdown/aql/manual.md
similarity index 100%
rename from asterix-doc/src/site/markdown/AsterixQueryLanguageReference.md
rename to asterix-doc/src/site/markdown/aql/manual.md
diff --git a/asterix-doc/src/site/markdown/AdmAql101.md b/asterix-doc/src/site/markdown/aql/primer.md
similarity index 99%
rename from asterix-doc/src/site/markdown/AdmAql101.md
rename to asterix-doc/src/site/markdown/aql/primer.md
index 994c464..9046527 100644
--- a/asterix-doc/src/site/markdown/AdmAql101.md
+++ b/asterix-doc/src/site/markdown/aql/primer.md
@@ -12,7 +12,7 @@
 Most importantly, it assumes you already have a running instance of AsterixDB and that you know how to query
 it using AsterixDB's basic web interface.
 For more information on these topics, you should go through the steps in 
-[Installing Asterix Using Managix](InstallingAsterixUsingManagix.html)
+[Installing Asterix Using Managix](../install.html)
 before reading this document and make sure that you have a running AsterixDB instance ready to go.
 To get your feet wet, you should probably start with a simple local installation of AsterixDB on your favorite
 machine, accepting all of the default settings that Managix offers.
@@ -350,7 +350,7 @@
 in the not-too-distant future we will also provide a complete reference manual for the language.
 In the meantime, this will get you started down the path of using AsterixDB.
 A more complete list of the supported AsterixDB primitive types and built-in functions can be
-found at [AsterixDataTypesAndFunctions](AsterixDataTypesAndFunctions.html).
+found at [AsterixDataTypesAndFunctions](functions.html).
 
 AQL is an expression language.
 Even the expression 1+1 is a valid AQL query that evaluates to 2.
diff --git a/asterix-doc/src/site/markdown/SimilarityQuery.md b/asterix-doc/src/site/markdown/aql/similarity.md
similarity index 86%
rename from asterix-doc/src/site/markdown/SimilarityQuery.md
rename to asterix-doc/src/site/markdown/aql/similarity.md
index 7f6f8ae..244103c 100644
--- a/asterix-doc/src/site/markdown/SimilarityQuery.md
+++ b/asterix-doc/src/site/markdown/aql/similarity.md
@@ -19,7 +19,7 @@
 AsterixDB supports [edit distance](http://en.wikipedia.org/wiki/Levenshtein_distance) (on strings) and
 [Jaccard](http://en.wikipedia.org/wiki/Jaccard_index) (on sets).  For
 instance, in our
-[TinySocial](AdmAql101.html#ADM:_Modeling_Semistructed_Data_in_AsterixDB)
+[TinySocial](primer.html#ADM:_Modeling_Semistructed_Data_in_AsterixDB)
 example, the `friend-ids` of a Facebook user forms a set
 of friends, and we can define a similarity between the sets of
 friends of two users. We can also convert a string to a set of grams of a length "n"
@@ -29,13 +29,13 @@
 `schwarzenegger` are `sch`, `chw`, `hwa`, ..., `ger`.
 
 AsterixDB provides
-[tokenization functions](AsterixDBFunctions.html#Tokenizing_Functions)
+[tokenization functions](functions.html#Tokenizing_Functions)
 to convert strings to sets, and the
-[similarity functions](AsterixDBFunctions.html#Similarity_Functions).
+[similarity functions](functions.html#Similarity_Functions).
 
 ## Similarity Selection Queries ##
 
-The following [query](AsterixDBFunctions.html#edit-distance)
+The following [query](functions.html#edit-distance)
 asks for all the Facebook users whose name is similar to
 `Suzanna Tilson`, i.e., their edit distance is at most 2.
 
@@ -47,7 +47,7 @@
         return $user
 
 
-The following [query](AsterixDBFunctions.html#similarity-jaccard)
+The following [query](functions.html#similarity-jaccard)
 asks for all the Facebook users whose set of friend ids is
 similar to `[1,5,9]`, i.e., their Jaccard similarity is at least 0.6.
 
@@ -81,7 +81,7 @@
 ## Similarity Join Queries ##
 
 AsterixDB supports fuzzy joins between two sets. The following
-[query](AdmAql101.html#Query_5_-_Fuzzy_Join)
+[query](primer.html#Query_5_-_Fuzzy_Join)
 finds, for each Facebook user, all Twitter users with names
 similar to their name based on the edit distance.
 
@@ -129,13 +129,13 @@
 The number "3" in "ngram(3)" is the length "n" in the grams. This
 index can be used to optimize similarity queries on this attribute
 using 
-[edit-distance](AsterixDBFunctions.html#edit-distance), 
-[edit-distance-check](AsterixDBFunctions.html#edit-distance-check), 
-[jaccard](AsterixDBFunctions.html#similarity-jaccard),
-or [jaccard-check](AsterixDBFunctions.html#similarity-jaccard-check) 
+[edit-distance](functions.html#edit-distance), 
+[edit-distance-check](functions.html#edit-distance-check), 
+[jaccard](functions.html#similarity-jaccard),
+or [jaccard-check](functions.html#similarity-jaccard-check) 
 queries on this attribute where the
 similarity is defined on sets of 3-grams.  This index can also be used
-to optimize queries with the "[contains()]((AsterixDBFunctions.html#contains))" predicate (i.e., substring
+to optimize queries with the "[contains()]((functions.html#contains))" predicate (i.e., substring
 matching) since it can be also be solved by counting on the inverted
 lists of the grams in the query string.
 
@@ -169,6 +169,6 @@
         return $c
         
 As shown above, keyword index can be used to optimize queries with token-based similarity predicates, including
-[similarity-jaccard](AsterixDBFunctions.html#similarity-jaccard) and
-[similarity-jaccard-check](AsterixDBFunctions.html#similarity-jaccard-check).
+[similarity-jaccard](functions.html#similarity-jaccard) and
+[similarity-jaccard-check](functions.html#similarity-jaccard-check).
 
diff --git a/asterix-doc/src/site/markdown/index.md b/asterix-doc/src/site/markdown/index.md
index aeb57b4..f14c1e9 100644
--- a/asterix-doc/src/site/markdown/index.md
+++ b/asterix-doc/src/site/markdown/index.md
@@ -32,23 +32,23 @@
 For the Beta release, we've got a start; for the Beta release a month or so from now, we will hopefully have much more.
 The following is a list of the wiki pages and supporting documents that we have available today:
 
-1. [InstallingAsterixUsingManagix](InstallingAsterixUsingManagix.html) :
+1. [InstallingAsterixUsingManagix](install.html) :
 This is our installation guide, and it is where you should start.
 This document will tell you how to obtain, install, and manage instances of [AsterixDB](https://asterixdb.googlecode.com/files/asterix-installer-0.0.4-binary-assembly.zip), including both single-machine setup (for developers) as well as cluster installations (for deployment in its intended form).
 
-2. [AdmAql101](AdmAql101.html) :
+2. [AdmAql101](aql/primer.html) :
 This is a first-timers introduction to the user model of the AsterixDB BDMS, by which we mean the view of AsterixDB as seen from the perspective of an "average user" or Big Data application developer.
 The AsterixDB user model consists of its data modeling features (ADM) and its query capabilities (AQL).
 This document presents a tiny "social data warehousing" example and uses it as a backdrop for describing, by example, the key features of AsterixDB.
 By working through this document, you will learn how to define the artifacts needed to manage data in AsterixDB, how to load data into the system, how to use most of the basic features of its query language, and how to insert and delete data dynamically.
 
-3. [AsterixDataTypesAndFunctions](AsterixDataTypesAndFunctions.html) :
+3. [AsterixDataTypesAndFunctions](aql/functions.html) :
 This is a reference document that catalogs the primitive data types and built-in functions available for use in AsterixDB schemas (in ADM) and queries (in AQL).
 
-4. [AQL Reference](AsterixQueryLanguageReference.html) :
+4. [AQL Reference](aql/manual.html) :
 This is the AQL language reference manual.
 
-5. [AsterixDBRestAPI](AsterixDBRestAPI.html) :
+5. [AsterixDBRestAPI](api.html) :
 Access to data in an AsterixDB instance is provided via a REST-based API.
 This is a short document that describes the REST API entry points and their URL syntax.
 
diff --git a/asterix-doc/src/site/markdown/InstallingAsterixUsingManagix.md b/asterix-doc/src/site/markdown/install.md
similarity index 100%
rename from asterix-doc/src/site/markdown/InstallingAsterixUsingManagix.md
rename to asterix-doc/src/site/markdown/install.md
diff --git a/asterix-doc/src/site/site.xml b/asterix-doc/src/site/site.xml
index 4953eb4..f24fb4e 100644
--- a/asterix-doc/src/site/site.xml
+++ b/asterix-doc/src/site/site.xml
@@ -29,14 +29,14 @@
     </links>
 
     <menu name="Documentation">
-      <item name="Installing Asterix using Managix" href="InstallingAsterixUsingManagix.html"/>
-      <item name="AsterixDB 101: An ADM and AQL Primer" href="AdmAql101.html"/>
-      <item name="Asterix Data Model (ADM)" href="AsterixDBDataModel.html"/>
-      <item name="AsterixDB Functions" href="AsterixDBFunctions.html"/>
-      <item name="The Asterix Query Language" href="AsterixQueryLanguageReference.html"/>
-      <item name="AsterixDB Support of Similarity Queries" href="AsterixSimilarityQueries.html"/>
-      <item name="Accessing External Data in AsterixDB" href="AccessingExternalDataInAsterixDB.html"/>
-      <item name="REST API to AsterixDB" href="AsterixDBRestAPI.html"/>
+      <item name="Installing Asterix using Managix" href="install.html"/>
+      <item name="AsterixDB 101: An ADM and AQL Primer" href="aql/primer.html"/>
+      <item name="Asterix Data Model (ADM)" href="aql/datamodel.html"/>
+      <item name="AsterixDB Functions" href="aql/functions.html"/>
+      <item name="The Asterix Query Language" href="aql/manual.html"/>
+      <item name="AsterixDB Support of Similarity Queries" href="aql/similarity.html"/>
+      <item name="Accessing External Data in AsterixDB" href="aql/externaldata.html"/>
+      <item name="REST API to AsterixDB" href="api.html"/>
     </menu>
 
     <menu ref="reports"/>
diff --git a/asterix-external-data/pom.xml b/asterix-external-data/pom.xml
index 842cd67..aff6967 100644
--- a/asterix-external-data/pom.xml
+++ b/asterix-external-data/pom.xml
@@ -131,7 +131,8 @@
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
-			<artifactId>hyracks-dataflow-hadoop</artifactId>
+			<artifactId>hyracks-hdfs-core</artifactId>
+			<version>${hyracks.version}</version>
 		</dependency>
 		<dependency>
 			<groupId>jdom</groupId>
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/CNNFeedAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/CNNFeedAdapterFactory.java
index f1f5884..8c8880a 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/CNNFeedAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/CNNFeedAdapterFactory.java
@@ -23,9 +23,10 @@
  * A factory class for creating the @see {CNNFeedAdapter}.  
  */
 public class CNNFeedAdapterFactory implements ITypedDatasetAdapterFactory {
+    private static final long serialVersionUID = 1L;
 
     @Override
-    public IDatasourceAdapter createAdapter(Map<String, String> configuration) throws Exception {
+    public IDatasourceAdapter createAdapter(Map<String, Object> configuration) throws Exception {
         CNNFeedAdapter cnnFeedAdapter = new CNNFeedAdapter();
         cnnFeedAdapter.configure(configuration);
         return cnnFeedAdapter;
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java
index 6fcb710..c267658 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java
@@ -14,22 +14,79 @@
  */
 package edu.uci.ics.asterix.external.adapter.factory;
 
+import java.util.Arrays;
+import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+
 import edu.uci.ics.asterix.external.dataset.adapter.HDFSAdapter;
 import edu.uci.ics.asterix.external.dataset.adapter.IDatasourceAdapter;
 import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.hdfs.dataflow.ConfFactory;
+import edu.uci.ics.hyracks.hdfs.dataflow.InputSplitsFactory;
+import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
 
 /**
  * A factory class for creating an instance of HDFSAdapter
  */
+@SuppressWarnings("deprecation")
 public class HDFSAdapterFactory implements IGenericDatasetAdapterFactory {
+    private static final long serialVersionUID = 1L;
 
     public static final String HDFS_ADAPTER_NAME = "hdfs";
+    public static final String CLUSTER_LOCATIONS = "cluster-locations";
+    public static transient String SCHEDULER = "hdfs-scheduler";
+
+    public static final String KEY_HDFS_URL = "hdfs";
+    public static final String KEY_PATH = "path";
+    public static final String KEY_INPUT_FORMAT = "input-format";
+    public static final String INPUT_FORMAT_TEXT = "text-input-format";
+    public static final String INPUT_FORMAT_SEQUENCE = "sequence-input-format";
+
+    private transient AlgebricksPartitionConstraint clusterLocations;
+    private String[] readSchedule;
+    private boolean executed[];
+    private InputSplitsFactory inputSplitsFactory;
+    private ConfFactory confFactory;
+    private boolean setup = false;
+
+    private static final Map<String, String> formatClassNames = initInputFormatMap();
+
+    private static Map<String, String> initInputFormatMap() {
+        Map<String, String> formatClassNames = new HashMap<String, String>();
+        formatClassNames.put(INPUT_FORMAT_TEXT, "org.apache.hadoop.mapred.TextInputFormat");
+        formatClassNames.put(INPUT_FORMAT_SEQUENCE, "org.apache.hadoop.mapred.SequenceFileInputFormat");
+        return formatClassNames;
+    }
 
     @Override
-    public IDatasourceAdapter createAdapter(Map<String, String> configuration, IAType atype) throws Exception {
-        HDFSAdapter hdfsAdapter = new HDFSAdapter(atype);
+    public IDatasourceAdapter createAdapter(Map<String, Object> configuration, IAType atype) throws Exception {
+        if (!setup) {
+            /** set up the factory --serializable stuff --- this if-block should be called only once for each factory instance */
+            configureJobConf(configuration);
+            JobConf conf = configureJobConf(configuration);
+            confFactory = new ConfFactory(conf);
+
+            clusterLocations = (AlgebricksPartitionConstraint) configuration.get(CLUSTER_LOCATIONS);
+            int numPartitions = ((AlgebricksAbsolutePartitionConstraint) clusterLocations).getLocations().length;
+
+            InputSplit[] inputSplits = conf.getInputFormat().getSplits(conf, numPartitions);
+            inputSplitsFactory = new InputSplitsFactory(inputSplits);
+
+            Scheduler scheduler = (Scheduler) configuration.get(SCHEDULER);
+            readSchedule = scheduler.getLocationConstraints(inputSplits);
+            executed = new boolean[readSchedule.length];
+            Arrays.fill(executed, false);
+
+            setup = true;
+        }
+        JobConf conf = confFactory.getConf();
+        InputSplit[] inputSplits = inputSplitsFactory.getSplits();
+        HDFSAdapter hdfsAdapter = new HDFSAdapter(atype, readSchedule, executed, inputSplits, conf, clusterLocations);
         hdfsAdapter.configure(configuration);
         return hdfsAdapter;
     }
@@ -39,4 +96,15 @@
         return HDFS_ADAPTER_NAME;
     }
 
+    private JobConf configureJobConf(Map<String, Object> configuration) throws Exception {
+        JobConf conf = new JobConf();
+        conf.set("fs.default.name", ((String) configuration.get(KEY_HDFS_URL)).trim());
+        conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
+        conf.setClassLoader(HDFSAdapter.class.getClassLoader());
+        conf.set("mapred.input.dir", ((String) configuration.get(KEY_PATH)).trim());
+        conf.set("mapred.input.format.class",
+                (String) formatClassNames.get(((String) configuration.get(KEY_INPUT_FORMAT)).trim()));
+        return conf;
+    }
+
 }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java
index 5e28eed..ba6ade5 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java
@@ -14,20 +14,92 @@
  */
 package edu.uci.ics.asterix.external.adapter.factory;
 
+import java.util.Arrays;
+import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+
+import edu.uci.ics.asterix.external.dataset.adapter.HDFSAdapter;
 import edu.uci.ics.asterix.external.dataset.adapter.HiveAdapter;
 import edu.uci.ics.asterix.external.dataset.adapter.IDatasourceAdapter;
 import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.hdfs.dataflow.ConfFactory;
+import edu.uci.ics.hyracks.hdfs.dataflow.InputSplitsFactory;
+import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
 
 /**
  * A factory class for creating an instance of HiveAdapter
  */
+@SuppressWarnings("deprecation")
 public class HiveAdapterFactory implements IGenericDatasetAdapterFactory {
+    private static final long serialVersionUID = 1L;
+
+    public static final String HDFS_ADAPTER_NAME = "hdfs";
+    public static final String CLUSTER_LOCATIONS = "cluster-locations";
+    public static transient String SCHEDULER = "hdfs-scheduler";
+
+    public static final String KEY_HDFS_URL = "hdfs";
+    public static final String KEY_PATH = "path";
+    public static final String KEY_INPUT_FORMAT = "input-format";
+    public static final String INPUT_FORMAT_TEXT = "text-input-format";
+    public static final String INPUT_FORMAT_SEQUENCE = "sequence-input-format";
+
+    public static final String KEY_FORMAT = "format";
+    public static final String KEY_PARSER_FACTORY = "parser";
+    public static final String FORMAT_DELIMITED_TEXT = "delimited-text";
+    public static final String FORMAT_ADM = "adm";
+
+    public static final String HIVE_DATABASE = "database";
+    public static final String HIVE_TABLE = "table";
+    public static final String HIVE_HOME = "hive-home";
+    public static final String HIVE_METASTORE_URI = "metastore-uri";
+    public static final String HIVE_WAREHOUSE_DIR = "warehouse-dir";
+    public static final String HIVE_METASTORE_RAWSTORE_IMPL = "rawstore-impl";
+
+    private String[] readSchedule;
+    private boolean executed[];
+    private InputSplitsFactory inputSplitsFactory;
+    private ConfFactory confFactory;
+    private transient AlgebricksPartitionConstraint clusterLocations;
+    private boolean setup = false;
+
+    private static final Map<String, String> formatClassNames = initInputFormatMap();
+
+    private static Map<String, String> initInputFormatMap() {
+        Map<String, String> formatClassNames = new HashMap<String, String>();
+        formatClassNames.put(INPUT_FORMAT_TEXT, "org.apache.hadoop.mapred.TextInputFormat");
+        formatClassNames.put(INPUT_FORMAT_SEQUENCE, "org.apache.hadoop.mapred.SequenceFileInputFormat");
+        return formatClassNames;
+    }
 
     @Override
-    public IDatasourceAdapter createAdapter(Map<String, String> configuration, IAType type) throws Exception {
-        HiveAdapter hiveAdapter = new HiveAdapter(type);
+    public IDatasourceAdapter createAdapter(Map<String, Object> configuration, IAType atype) throws Exception {
+        if (!setup) {
+            /** set up the factory --serializable stuff --- this if-block should be called only once for each factory instance */
+            configureJobConf(configuration);
+            JobConf conf = configureJobConf(configuration);
+            confFactory = new ConfFactory(conf);
+
+            clusterLocations = (AlgebricksPartitionConstraint) configuration.get(CLUSTER_LOCATIONS);
+            int numPartitions = ((AlgebricksAbsolutePartitionConstraint) clusterLocations).getLocations().length;
+
+            InputSplit[] inputSplits = conf.getInputFormat().getSplits(conf, numPartitions);
+            inputSplitsFactory = new InputSplitsFactory(inputSplits);
+
+            Scheduler scheduler = (Scheduler) configuration.get(SCHEDULER);
+            readSchedule = scheduler.getLocationConstraints(inputSplits);
+            executed = new boolean[readSchedule.length];
+            Arrays.fill(executed, false);
+
+            setup = true;
+        }
+        JobConf conf = confFactory.getConf();
+        InputSplit[] inputSplits = inputSplitsFactory.getSplits();
+        HiveAdapter hiveAdapter = new HiveAdapter(atype, readSchedule, executed, inputSplits, conf, clusterLocations);
         hiveAdapter.configure(configuration);
         return hiveAdapter;
     }
@@ -36,4 +108,37 @@
     public String getName() {
         return "hive";
     }
+
+    private JobConf configureJobConf(Map<String, Object> configuration) throws Exception {
+        JobConf conf = new JobConf();
+
+        /** configure hive */
+        String database = (String) configuration.get(HIVE_DATABASE);
+        String tablePath = null;
+        if (database == null) {
+            tablePath = configuration.get(HIVE_WAREHOUSE_DIR) + "/" + configuration.get(HIVE_TABLE);
+        } else {
+            tablePath = configuration.get(HIVE_WAREHOUSE_DIR) + "/" + tablePath + ".db" + "/"
+                    + configuration.get(HIVE_TABLE);
+        }
+        configuration.put(HDFSAdapter.KEY_PATH, tablePath);
+        if (!configuration.get(KEY_FORMAT).equals(FORMAT_DELIMITED_TEXT)) {
+            throw new IllegalArgumentException("format" + configuration.get(KEY_FORMAT) + " is not supported");
+        }
+
+        if (!(configuration.get(HDFSAdapterFactory.KEY_INPUT_FORMAT).equals(HDFSAdapterFactory.INPUT_FORMAT_TEXT) || configuration
+                .get(HDFSAdapterFactory.KEY_INPUT_FORMAT).equals(HDFSAdapterFactory.INPUT_FORMAT_SEQUENCE))) {
+            throw new IllegalArgumentException("file input format"
+                    + configuration.get(HDFSAdapterFactory.KEY_INPUT_FORMAT) + " is not supported");
+        }
+
+        /** configure hdfs */
+        conf.set("fs.default.name", ((String) configuration.get(KEY_HDFS_URL)).trim());
+        conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
+        conf.setClassLoader(HDFSAdapter.class.getClassLoader());
+        conf.set("mapred.input.dir", ((String) configuration.get(KEY_PATH)).trim());
+        conf.set("mapred.input.format.class",
+                (String) formatClassNames.get(((String) configuration.get(KEY_INPUT_FORMAT)).trim()));
+        return conf;
+    }
 }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/IAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/IAdapterFactory.java
index 45fd6cf..697c8ea 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/IAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/IAdapterFactory.java
@@ -14,12 +14,14 @@
  */
 package edu.uci.ics.asterix.external.adapter.factory;
 
+import java.io.Serializable;
+
 /**
  * Base interface for IGenericDatasetAdapterFactory and ITypedDatasetAdapterFactory.
  * Acts as a marker interface indicating that the implementation provides functionality
  * for creating an adapter.
  */
-public interface IAdapterFactory {
+public interface IAdapterFactory extends Serializable {
 
     /**
      * Returns the display name corresponding to the Adapter type that is created by the factory.
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/IGenericDatasetAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/IGenericDatasetAdapterFactory.java
index 093a3dd..e8d120a 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/IGenericDatasetAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/IGenericDatasetAdapterFactory.java
@@ -38,6 +38,6 @@
      * @return An instance of IDatasourceAdapter.
      * @throws Exception
      */
-    public IDatasourceAdapter createAdapter(Map<String, String> configuration, IAType atype) throws Exception;
+    public IDatasourceAdapter createAdapter(Map<String, Object> configuration, IAType atype) throws Exception;
 
 }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/ITypedDatasetAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/ITypedDatasetAdapterFactory.java
index 0f9978e..84a5ca8 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/ITypedDatasetAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/ITypedDatasetAdapterFactory.java
@@ -33,6 +33,6 @@
      * @return An instance of IDatasourceAdapter.
      * @throws Exception
      */
-    public IDatasourceAdapter createAdapter(Map<String, String> configuration) throws Exception;
+    public IDatasourceAdapter createAdapter(Map<String, Object> configuration) throws Exception;
 
 }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java
index 2040949..659fd23 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java
@@ -26,11 +26,11 @@
  * an NC.
  */
 public class NCFileSystemAdapterFactory implements IGenericDatasetAdapterFactory {
-
+    private static final long serialVersionUID = 1L;
     public static final String NC_FILE_SYSTEM_ADAPTER_NAME = "localfs";
 
     @Override
-    public IDatasourceAdapter createAdapter(Map<String, String> configuration, IAType atype) throws Exception {
+    public IDatasourceAdapter createAdapter(Map<String, Object> configuration, IAType atype) throws Exception {
         NCFileSystemAdapter fsAdapter = new NCFileSystemAdapter(atype);
         fsAdapter.configure(configuration);
         return fsAdapter;
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedTwitterAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedTwitterAdapterFactory.java
index bc00469..e63be17 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedTwitterAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedTwitterAdapterFactory.java
@@ -25,11 +25,11 @@
  * via pull-based Twitter API.
  */
 public class PullBasedTwitterAdapterFactory implements ITypedDatasetAdapterFactory {
-
+    private static final long serialVersionUID = 1L;
     public static final String PULL_BASED_TWITTER_ADAPTER_NAME = "pull_twitter";
 
     @Override
-    public IDatasourceAdapter createAdapter(Map<String, String> configuration) throws Exception {
+    public IDatasourceAdapter createAdapter(Map<String, Object> configuration) throws Exception {
         PullBasedTwitterAdapter twitterAdapter = new PullBasedTwitterAdapter();
         twitterAdapter.configure(configuration);
         return twitterAdapter;
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/RSSFeedAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/RSSFeedAdapterFactory.java
index bbbea38..3cd22e8 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/RSSFeedAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/RSSFeedAdapterFactory.java
@@ -24,11 +24,11 @@
  * RSSFeedAdapter provides the functionality of fetching an RSS based feed.
  */
 public class RSSFeedAdapterFactory implements ITypedDatasetAdapterFactory {
-
+    private static final long serialVersionUID = 1L;
     public static final String RSS_FEED_ADAPTER_NAME = "rss_feed";
 
     @Override
-    public IDatasourceAdapter createAdapter(Map<String, String> configuration) throws Exception {
+    public IDatasourceAdapter createAdapter(Map<String, Object> configuration) throws Exception {
         RSSFeedAdapter rssFeedAdapter = new RSSFeedAdapter();
         rssFeedAdapter.configure(configuration);
         return rssFeedAdapter;
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/ExternalDataScanOperatorDescriptor.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/ExternalDataScanOperatorDescriptor.java
index fb4cc99..f9f72cf 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/ExternalDataScanOperatorDescriptor.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/ExternalDataScanOperatorDescriptor.java
@@ -19,8 +19,6 @@
 import edu.uci.ics.asterix.external.adapter.factory.IGenericDatasetAdapterFactory;
 import edu.uci.ics.asterix.external.dataset.adapter.IDatasourceAdapter;
 import edu.uci.ics.asterix.om.types.IAType;
-import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
-import edu.uci.ics.hyracks.api.constraints.IConstraintAcceptor;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
@@ -38,61 +36,24 @@
 
     private static final long serialVersionUID = 1L;
 
-    private final String adapterFactory;
-    private final Map<String, String> adapterConfiguration;
+    private final Map<String, Object> adapterConfiguration;
     private final IAType atype;
     private IGenericDatasetAdapterFactory datasourceAdapterFactory;
 
-    public ExternalDataScanOperatorDescriptor(JobSpecification spec, String adapter, Map<String, String> arguments,
-            IAType atype, RecordDescriptor rDesc) {
+    public ExternalDataScanOperatorDescriptor(JobSpecification spec, Map<String, Object> arguments, IAType atype,
+            RecordDescriptor rDesc, IGenericDatasetAdapterFactory dataSourceAdapterFactory) {
         super(spec, 0, 1);
         recordDescriptors[0] = rDesc;
-        this.adapterFactory = adapter;
         this.adapterConfiguration = arguments;
         this.atype = atype;
-    }
-
-    @Override
-    public void contributeSchedulingConstraints(IConstraintAcceptor constraintAcceptor, ICCApplicationContext appCtx) {
-
-        /*
-        Comment: The following code is commented out. This is because constraints are being set at compile time so that they can 
-        be propagated to upstream Asterix operators. Hyracks has to provide a way to propagate constraints to upstream operators.
-        Once it is there, we will uncomment the following code.  
-        
-        AlgebricksPartitionConstraint constraint = datasourceReadAdapter.getPartitionConstraint();
-        switch (constraint.getPartitionConstraintType()) {
-            case ABSOLUTE:
-                String[] locations = ((AlgebricksAbsolutePartitionConstraint) constraint).getLocations();
-                for (int i = 0; i < locations.length; ++i) {
-                    constraintAcceptor.addConstraint(new Constraint(new PartitionLocationExpression(this.odId, i),
-                            new ConstantExpression(locations[i])));
-                }
-                constraintAcceptor.addConstraint(new Constraint(new PartitionCountExpression(this.odId),
-                        new ConstantExpression(locations.length)));
-
-                break;
-            case COUNT:
-                constraintAcceptor.addConstraint(new Constraint(new PartitionCountExpression(this.odId),
-                        new ConstantExpression(((AlgebricksCountPartitionConstraint) constraint).getCount())));
-                break;
-            default:
-                throw new IllegalStateException(" Constraint type :" + constraint.getPartitionConstraintType()
-                        + " not supported");
-
-        }*/
-
+        this.datasourceAdapterFactory = dataSourceAdapterFactory;
     }
 
     @Override
     public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
             IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
             throws HyracksDataException {
-        try {
-            datasourceAdapterFactory = (IGenericDatasetAdapterFactory) Class.forName(adapterFactory).newInstance();
-        } catch (Exception e) {
-            throw new HyracksDataException("initialization of adapter failed", e);
-        }
+
         return new AbstractUnaryOutputSourceOperatorNodePushable() {
             @Override
             public void initialize() throws HyracksDataException {
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedIntakeOperatorDescriptor.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedIntakeOperatorDescriptor.java
index 2da4e76..f07168a 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedIntakeOperatorDescriptor.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedIntakeOperatorDescriptor.java
@@ -35,52 +35,46 @@
  * Operator responsible for ingesting data from an external source. This
  * operator uses a (configurable) adapter associated with the feed dataset.
  */
-public class FeedIntakeOperatorDescriptor extends
-		AbstractSingleActivityOperatorDescriptor {
+public class FeedIntakeOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
 
-	private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 1L;
 
-	private final String adapterFactoryClassName;
-	private final Map<String, String> adapterConfiguration;
-	private final IAType atype;
-	private final FeedId feedId;
+    private final String adapterFactoryClassName;
+    private final Map<String, Object> adapterConfiguration;
+    private final IAType atype;
+    private final FeedId feedId;
+    private final IAdapterFactory datasourceAdapterFactory;
 
-	private transient IAdapterFactory datasourceAdapterFactory;
+    public FeedIntakeOperatorDescriptor(JobSpecification spec, FeedId feedId, String adapter,
+            Map<String, Object> arguments, ARecordType atype, RecordDescriptor rDesc,
+            IAdapterFactory datasourceAdapterFactory) {
+        super(spec, 1, 1);
+        recordDescriptors[0] = rDesc;
+        this.adapterFactoryClassName = adapter;
+        this.adapterConfiguration = arguments;
+        this.atype = atype;
+        this.feedId = feedId;
+        this.datasourceAdapterFactory = datasourceAdapterFactory;
+    }
 
-	public FeedIntakeOperatorDescriptor(JobSpecification spec, FeedId feedId,
-			String adapter, Map<String, String> arguments, ARecordType atype,
-			RecordDescriptor rDesc) {
-		super(spec, 1, 1);
-		recordDescriptors[0] = rDesc;
-		this.adapterFactoryClassName = adapter;
-		this.adapterConfiguration = arguments;
-		this.atype = atype;
-		this.feedId = feedId;
-	}
-
-	public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
-			IRecordDescriptorProvider recordDescProvider, final int partition,
-			int nPartitions) throws HyracksDataException {
-		ITypedDatasourceAdapter adapter;
-		try {
-			datasourceAdapterFactory = (IAdapterFactory) Class.forName(
-					adapterFactoryClassName).newInstance();
-			if (datasourceAdapterFactory instanceof IGenericDatasetAdapterFactory) {
-				adapter = (ITypedDatasourceAdapter) ((IGenericDatasetAdapterFactory) datasourceAdapterFactory)
-						.createAdapter(adapterConfiguration, atype);
-			} else if (datasourceAdapterFactory instanceof ITypedDatasetAdapterFactory) {
-				adapter = (ITypedDatasourceAdapter) ((ITypedDatasetAdapterFactory) datasourceAdapterFactory)
-						.createAdapter(adapterConfiguration);
-			} else {
-				throw new IllegalStateException(
-						" Unknown adapter factory type for "
-								+ adapterFactoryClassName);
-			}
-			adapter.initialize(ctx);
-		} catch (Exception e) {
-			throw new HyracksDataException("initialization of adapter failed",
-					e);
-		}
-		return new FeedIntakeOperatorNodePushable(feedId, adapter, partition);
-	}
+    public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+            IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
+            throws HyracksDataException {
+        ITypedDatasourceAdapter adapter;
+        try {
+            if (datasourceAdapterFactory instanceof IGenericDatasetAdapterFactory) {
+                adapter = (ITypedDatasourceAdapter) ((IGenericDatasetAdapterFactory) datasourceAdapterFactory)
+                        .createAdapter(adapterConfiguration, atype);
+            } else if (datasourceAdapterFactory instanceof ITypedDatasetAdapterFactory) {
+                adapter = (ITypedDatasourceAdapter) ((ITypedDatasetAdapterFactory) datasourceAdapterFactory)
+                        .createAdapter(adapterConfiguration);
+            } else {
+                throw new IllegalStateException(" Unknown adapter factory type for " + adapterFactoryClassName);
+            }
+            adapter.initialize(ctx);
+        } catch (Exception e) {
+            throw new HyracksDataException("initialization of adapter failed", e);
+        }
+        return new FeedIntakeOperatorNodePushable(feedId, adapter, partition);
+    }
 }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/AbstractDatasourceAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/AbstractDatasourceAdapter.java
index 440ee8c..23e545d 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/AbstractDatasourceAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/AbstractDatasourceAdapter.java
@@ -36,7 +36,7 @@
 
     private static final long serialVersionUID = 1L;
 
-    protected Map<String, String> configuration;
+    protected Map<String, Object> configuration;
     protected transient AlgebricksPartitionConstraint partitionConstraint;
     protected IAType atype;
     protected IHyracksTaskContext ctx;
@@ -51,15 +51,15 @@
         typeToValueParserFactMap.put(ATypeTag.STRING, UTF8StringParserFactory.INSTANCE);
     }
 
-    protected static final Map<String, String> formatToParserFactoryMap = initializeFormatParserFactoryMap();
+    protected static final Map<String, Object> formatToParserFactoryMap = initializeFormatParserFactoryMap();
 
     public static final String KEY_FORMAT = "format";
     public static final String KEY_PARSER_FACTORY = "parser";
     public static final String FORMAT_DELIMITED_TEXT = "delimited-text";
     public static final String FORMAT_ADM = "adm";
 
-    private static Map<String, String> initializeFormatParserFactoryMap() {
-        Map<String, String> map = new HashMap<String, String>();
+    private static Map<String, Object> initializeFormatParserFactoryMap() {
+        Map<String, Object> map = new HashMap<String, Object>();
         map.put(FORMAT_DELIMITED_TEXT, "edu.uci.ics.asterix.runtime.operators.file.NtDelimitedDataTupleParserFactory");
         map.put(FORMAT_ADM, "edu.uci.ics.asterix.runtime.operators.file.AdmSchemafullRecordParserFactory");
         return map;
@@ -77,7 +77,7 @@
      * @param attribute
      *            The attribute whose value needs to be obtained.
      */
-    public String getAdapterProperty(String attribute) {
+    public Object getAdapterProperty(String attribute) {
         return configuration.get(attribute);
     }
 
@@ -86,7 +86,7 @@
      * 
      * @return A Map<String,String> instance representing the adapter configuration.
      */
-    public Map<String, String> getConfiguration() {
+    public Map<String, Object> getConfiguration() {
         return configuration;
     }
 
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/CNNFeedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/CNNFeedAdapter.java
index 3898f7e..8976f7a 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/CNNFeedAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/CNNFeedAdapter.java
@@ -70,9 +70,9 @@
     }
 
     @Override
-    public void configure(Map<String, String> arguments) throws Exception {
+    public void configure(Map<String, Object> arguments) throws Exception {
         configuration = arguments;
-        String rssURLProperty = configuration.get(KEY_RSS_URL);
+        String rssURLProperty = (String) configuration.get(KEY_RSS_URL);
         if (rssURLProperty == null) {
             throw new IllegalArgumentException("no rss url provided");
         }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/FileSystemBasedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/FileSystemBasedAdapter.java
index 9f8cedc..8ab252d 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/FileSystemBasedAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/FileSystemBasedAdapter.java
@@ -69,7 +69,7 @@
     public abstract void initialize(IHyracksTaskContext ctx) throws Exception;
 
     @Override
-    public abstract void configure(Map<String, String> arguments) throws Exception;
+    public abstract void configure(Map<String, Object> arguments) throws Exception;
 
     @Override
     public abstract AdapterType getAdapterType();
@@ -82,14 +82,14 @@
     }
 
     protected void configureFormat() throws Exception {
-        String parserFactoryClassname = configuration.get(KEY_PARSER_FACTORY);
+        String parserFactoryClassname = (String) configuration.get(KEY_PARSER_FACTORY);
         if (parserFactoryClassname == null) {
-            String specifiedFormat = configuration.get(KEY_FORMAT);
+            String specifiedFormat = (String) configuration.get(KEY_FORMAT);
             if (specifiedFormat == null) {
                 throw new IllegalArgumentException(" Unspecified data format");
             } else if (FORMAT_DELIMITED_TEXT.equalsIgnoreCase(specifiedFormat)) {
                 parserFactory = getDelimitedDataTupleParserFactory((ARecordType) atype);
-            } else if (FORMAT_ADM.equalsIgnoreCase(configuration.get(KEY_FORMAT))) {
+            } else if (FORMAT_ADM.equalsIgnoreCase((String)configuration.get(KEY_FORMAT))) {
                 parserFactory = getADMDataTupleParserFactory((ARecordType) atype);
             } else {
                 throw new IllegalArgumentException(" format " + configuration.get(KEY_FORMAT) + " not supported");
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java
index 1e05b2f..02eacf5 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java
@@ -14,18 +14,9 @@
  */
 package edu.uci.ics.asterix.external.dataset.adapter;
 
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.logging.Level;
-import java.util.logging.Logger;
 
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.Counters.Counter;
@@ -37,14 +28,9 @@
 import org.apache.hadoop.mapred.TextInputFormat;
 
 import edu.uci.ics.asterix.om.types.IAType;
-import edu.uci.ics.asterix.om.util.AsterixRuntimeUtil;
-import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.hadoop.util.InputSplitsProxy;
 
 /**
  * Provides functionality for fetching external data stored in an HDFS instance.
@@ -53,118 +39,29 @@
 public class HDFSAdapter extends FileSystemBasedAdapter {
 
     private static final long serialVersionUID = 1L;
-    private static final Logger LOGGER = Logger.getLogger(HDFSAdapter.class.getName());
 
-    public static final String KEY_HDFS_URL = "hdfs";
-    public static final String KEY_INPUT_FORMAT = "input-format";
-    public static final String INPUT_FORMAT_TEXT = "text-input-format";
-    public static final String INPUT_FORMAT_SEQUENCE = "sequence-input-format";
-
-    private Object[] inputSplits;
+    private transient String[] readSchedule;
+    private transient boolean executed[];
+    private transient InputSplit[] inputSplits;
     private transient JobConf conf;
-    private InputSplitsProxy inputSplitsProxy;
-    private static final Map<String, String> formatClassNames = initInputFormatMap();
+    private transient AlgebricksPartitionConstraint clusterLocations;
 
-    private static Map<String, String> initInputFormatMap() {
-        Map<String, String> formatClassNames = new HashMap<String, String>();
-        formatClassNames.put(INPUT_FORMAT_TEXT, "org.apache.hadoop.mapred.TextInputFormat");
-        formatClassNames.put(INPUT_FORMAT_SEQUENCE, "org.apache.hadoop.mapred.SequenceFileInputFormat");
-        return formatClassNames;
-    }
+    private transient String nodeName;
 
-    public HDFSAdapter(IAType atype) {
+    public HDFSAdapter(IAType atype, String[] readSchedule, boolean[] executed, InputSplit[] inputSplits, JobConf conf,
+            AlgebricksPartitionConstraint clusterLocations) {
         super(atype);
+        this.readSchedule = readSchedule;
+        this.executed = executed;
+        this.inputSplits = inputSplits;
+        this.conf = conf;
+        this.clusterLocations = clusterLocations;
     }
 
     @Override
-    public void configure(Map<String, String> arguments) throws Exception {
-        configuration = arguments;
+    public void configure(Map<String, Object> arguments) throws Exception {
+        this.configuration = arguments;
         configureFormat();
-        configureJobConf();
-        configureSplits();
-    }
-
-    private void configureSplits() throws IOException {
-        if (inputSplitsProxy == null) {
-            inputSplits = conf.getInputFormat().getSplits(conf, 0);
-        }
-        inputSplitsProxy = new InputSplitsProxy(conf, inputSplits);
-    }
-
-    private void configurePartitionConstraint() throws Exception {
-        List<String> locations = new ArrayList<String>();
-        Random random = new Random();
-        boolean couldConfigureLocationConstraints = false;
-        try {
-            Map<String, Set<String>> nodeControllers = AsterixRuntimeUtil.getNodeControllerMap();
-            for (Object inputSplit : inputSplits) {
-                String[] dataNodeLocations = ((InputSplit) inputSplit).getLocations();
-                if (dataNodeLocations == null || dataNodeLocations.length == 0) {
-                    throw new IllegalArgumentException("No datanode locations found: check hdfs path");
-                }
-
-                // loop over all replicas until a split location coincides
-                // with an asterix datanode location
-                for (String datanodeLocation : dataNodeLocations) {
-                    Set<String> nodeControllersAtLocation = null;
-                    try {
-                        nodeControllersAtLocation = nodeControllers.get(AsterixRuntimeUtil
-                                .getIPAddress(datanodeLocation));
-                    } catch (UnknownHostException uhe) {
-                        if (LOGGER.isLoggable(Level.WARNING)) {
-                            LOGGER.log(Level.WARNING, "Unknown host :" + datanodeLocation);
-                        }
-                        continue;
-                    }
-                    if (nodeControllersAtLocation == null || nodeControllersAtLocation.size() == 0) {
-                        if (LOGGER.isLoggable(Level.WARNING)) {
-                            LOGGER.log(Level.WARNING, "No node controller found at " + datanodeLocation
-                                    + " will look at replica location");
-                        }
-                        couldConfigureLocationConstraints = false;
-                    } else {
-                        int locationIndex = random.nextInt(nodeControllersAtLocation.size());
-                        String chosenLocation = (String) nodeControllersAtLocation.toArray()[locationIndex];
-                        locations.add(chosenLocation);
-                        if (LOGGER.isLoggable(Level.INFO)) {
-                            LOGGER.log(Level.INFO, "split : " + inputSplit + " to be processed by :" + chosenLocation);
-                        }
-                        couldConfigureLocationConstraints = true;
-                        break;
-                    }
-                }
-
-                /* none of the replica locations coincides with an Asterix
-                   node controller location.
-                */
-                if (!couldConfigureLocationConstraints) {
-                    List<String> allNodeControllers = AsterixRuntimeUtil.getAllNodeControllers();
-                    int locationIndex = random.nextInt(allNodeControllers.size());
-                    String chosenLocation = allNodeControllers.get(locationIndex);
-                    locations.add(chosenLocation);
-                    if (LOGGER.isLoggable(Level.SEVERE)) {
-                        LOGGER.log(Level.SEVERE, "No local node controller found to process split : " + inputSplit
-                                + " will be processed by a remote node controller:" + chosenLocation);
-                    }
-                }
-            }
-            partitionConstraint = new AlgebricksAbsolutePartitionConstraint(locations.toArray(new String[] {}));
-        } catch (Exception e) {
-            if (LOGGER.isLoggable(Level.SEVERE)) {
-                LOGGER.log(Level.SEVERE, "Encountered exception :" + e + " using count constraints");
-            }
-            partitionConstraint = new AlgebricksCountPartitionConstraint(inputSplits.length);
-        }
-    }
-
-    private JobConf configureJobConf() throws Exception {
-        conf = new JobConf();
-        conf.set("fs.default.name", configuration.get(KEY_HDFS_URL).trim());
-        conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
-        conf.setClassLoader(HDFSAdapter.class.getClassLoader());
-        conf.set("mapred.input.dir", configuration.get(KEY_PATH).trim());
-        conf.set("mapred.input.format.class", formatClassNames.get(configuration.get(KEY_INPUT_FORMAT).trim()));
-        return conf;
     }
 
     public AdapterType getAdapterType() {
@@ -174,7 +71,7 @@
     @Override
     public void initialize(IHyracksTaskContext ctx) throws Exception {
         this.ctx = ctx;
-        inputSplits = inputSplitsProxy.toInputSplits(conf);
+        this.nodeName = ctx.getJobletContext().getApplicationContext().getNodeId();
     }
 
     private Reporter getReporter() {
@@ -215,98 +112,124 @@
         return reporter;
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     public InputStream getInputStream(int partition) throws IOException {
-        try {
-            InputStream inputStream;
-            if (conf.getInputFormat() instanceof SequenceFileInputFormat) {
-                SequenceFileInputFormat format = (SequenceFileInputFormat) conf.getInputFormat();
-                RecordReader reader = format.getRecordReader(
-                        (org.apache.hadoop.mapred.FileSplit) inputSplits[partition], conf, getReporter());
-                inputStream = new HDFSStream(reader, ctx);
-            } else {
-                try {
+
+        return new InputStream() {
+
+            private RecordReader<Object, Text> reader;
+            private Object key;
+            private Text value;
+            private boolean hasMore = false;
+            private int EOL = "\n".getBytes()[0];
+            private Text pendingValue = null;
+            private int currentSplitIndex = 0;
+
+            @SuppressWarnings("unchecked")
+            private boolean moveToNext() throws IOException {
+                for (; currentSplitIndex < inputSplits.length; currentSplitIndex++) {
+                    /**
+                     * read all the partitions scheduled to the current node
+                     */
+                    if (readSchedule[currentSplitIndex].equals(nodeName)) {
+                        /**
+                         * pick an unread split to read
+                         * synchronize among simultaneous partitions in the same machine
+                         */
+                        synchronized (executed) {
+                            if (executed[currentSplitIndex] == false) {
+                                executed[currentSplitIndex] = true;
+                            } else {
+                                continue;
+                            }
+                        }
+
+                        /**
+                         * read the split
+                         */
+                        reader = getRecordReader(currentSplitIndex);
+                        key = reader.createKey();
+                        value = (Text) reader.createValue();
+                        return true;
+                    }
+                }
+                return false;
+            }
+
+            @Override
+            public int read(byte[] buffer, int offset, int len) throws IOException {
+                if (reader == null) {
+                    if (!moveToNext()) {
+                        //nothing to read
+                        return -1;
+                    }
+                }
+
+                int numBytes = 0;
+                if (pendingValue != null) {
+                    System.arraycopy(pendingValue.getBytes(), 0, buffer, offset + numBytes, pendingValue.getLength());
+                    buffer[offset + numBytes + pendingValue.getLength()] = (byte) EOL;
+                    numBytes += pendingValue.getLength() + 1;
+                    pendingValue = null;
+                }
+
+                while (numBytes < len) {
+                    hasMore = reader.next(key, value);
+                    if (!hasMore) {
+                        while (moveToNext()) {
+                            hasMore = reader.next(key, value);
+                            if (hasMore) {
+                                //move to the next non-empty split
+                                break;
+                            }
+                        }
+                    }
+                    if (!hasMore) {
+                        return (numBytes == 0) ? -1 : numBytes;
+                    }
+                    int sizeOfNextTuple = value.getLength() + 1;
+                    if (numBytes + sizeOfNextTuple > len) {
+                        // cannot add tuple to current buffer
+                        // but the reader has moved pass the fetched tuple
+                        // we need to store this for a subsequent read call.
+                        // and return this then.
+                        pendingValue = value;
+                        break;
+                    } else {
+                        System.arraycopy(value.getBytes(), 0, buffer, offset + numBytes, value.getLength());
+                        buffer[offset + numBytes + value.getLength()] = (byte) EOL;
+                        numBytes += sizeOfNextTuple;
+                    }
+                }
+                return numBytes;
+            }
+
+            @Override
+            public int read() throws IOException {
+                throw new NotImplementedException("Use read(byte[], int, int");
+            }
+
+            private RecordReader getRecordReader(int slitIndex) throws IOException {
+                if (conf.getInputFormat() instanceof SequenceFileInputFormat) {
+                    SequenceFileInputFormat format = (SequenceFileInputFormat) conf.getInputFormat();
+                    RecordReader reader = format.getRecordReader(
+                            (org.apache.hadoop.mapred.FileSplit) inputSplits[slitIndex], conf, getReporter());
+                    return reader;
+                } else {
                     TextInputFormat format = (TextInputFormat) conf.getInputFormat();
                     RecordReader reader = format.getRecordReader(
-                            (org.apache.hadoop.mapred.FileSplit) inputSplits[partition], conf, getReporter());
-                    inputStream = new HDFSStream(reader, ctx);
-                } catch (FileNotFoundException e) {
-                    throw new HyracksDataException(e);
+                            (org.apache.hadoop.mapred.FileSplit) inputSplits[slitIndex], conf, getReporter());
+                    return reader;
                 }
             }
-            return inputStream;
-        } catch (Exception e) {
-            throw new IOException(e);
-        }
+
+        };
 
     }
 
     @Override
     public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
-        if (partitionConstraint == null) {
-            configurePartitionConstraint();
-        }
-        return partitionConstraint;
-    }
-
-}
-
-class HDFSStream extends InputStream {
-
-    private RecordReader<Object, Text> reader;
-    private final Object key;
-    private final Text value;
-    private boolean hasMore = false;
-    private static final int EOL = "\n".getBytes()[0];
-    private Text pendingValue = null;
-
-    public HDFSStream(RecordReader<Object, Text> reader, IHyracksTaskContext ctx) throws Exception {
-        this.reader = reader;
-        key = reader.createKey();
-        try {
-            value = (Text) reader.createValue();
-        } catch (ClassCastException cce) {
-            throw new Exception("value is not of type org.apache.hadoop.io.Text"
-                    + " type not supported in sequence file format", cce);
-        }
-    }
-
-    @Override
-    public int read(byte[] buffer, int offset, int len) throws IOException {
-        int numBytes = 0;
-        if (pendingValue != null) {
-            System.arraycopy(pendingValue.getBytes(), 0, buffer, offset + numBytes, pendingValue.getLength());
-            buffer[offset + numBytes + pendingValue.getLength()] = (byte) EOL;
-            numBytes += pendingValue.getLength() + 1;
-            pendingValue = null;
-        }
-
-        while (numBytes < len) {
-            hasMore = reader.next(key, value);
-            if (!hasMore) {
-                return (numBytes == 0) ? -1 : numBytes;
-            }
-            int sizeOfNextTuple = value.getLength() + 1;
-            if (numBytes + sizeOfNextTuple > len) {
-                // cannot add tuple to current buffer
-                // but the reader has moved pass the fetched tuple
-                // we need to store this for a subsequent read call.
-                // and return this then.
-                pendingValue = value;
-                break;
-            } else {
-                System.arraycopy(value.getBytes(), 0, buffer, offset + numBytes, value.getLength());
-                buffer[offset + numBytes + value.getLength()] = (byte) EOL;
-                numBytes += sizeOfNextTuple;
-            }
-        }
-        return numBytes;
-    }
-
-    @Override
-    public int read() throws IOException {
-        throw new NotImplementedException("Use read(byte[], int, int");
+        return clusterLocations;
     }
 
 }
\ No newline at end of file
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HiveAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HiveAdapter.java
index 3731eba..5e48834 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HiveAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HiveAdapter.java
@@ -16,6 +16,9 @@
 
 import java.util.Map;
 
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+
 import edu.uci.ics.asterix.om.types.IAType;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
@@ -24,6 +27,7 @@
 /**
  * Provides the functionality of fetching data in form of ADM records from a Hive dataset.
  */
+@SuppressWarnings("deprecation")
 public class HiveAdapter extends AbstractDatasourceAdapter {
 
     private static final long serialVersionUID = 1L;
@@ -37,8 +41,9 @@
 
     private HDFSAdapter hdfsAdapter;
 
-    public HiveAdapter(IAType atype) {
-        this.hdfsAdapter = new HDFSAdapter(atype);
+    public HiveAdapter(IAType atype, String[] readSchedule, boolean[] executed, InputSplit[] inputSplits, JobConf conf,
+            AlgebricksPartitionConstraint clusterLocations) {
+        this.hdfsAdapter = new HDFSAdapter(atype, readSchedule, executed, inputSplits, conf, clusterLocations);
         this.atype = atype;
     }
 
@@ -48,33 +53,9 @@
     }
 
     @Override
-    public void configure(Map<String, String> arguments) throws Exception {
-        configuration = arguments;
-        configureHadoopAdapter();
-    }
-
-    private void configureHadoopAdapter() throws Exception {
-        String database = configuration.get(HIVE_DATABASE);
-        String tablePath = null;
-        if (database == null) {
-            tablePath = configuration.get(HIVE_WAREHOUSE_DIR) + "/" + configuration.get(HIVE_TABLE);
-        } else {
-            tablePath = configuration.get(HIVE_WAREHOUSE_DIR) + "/" + tablePath + ".db" + "/"
-                    + configuration.get(HIVE_TABLE);
-        }
-        configuration.put(HDFSAdapter.KEY_PATH, tablePath);
-        if (!configuration.get(KEY_FORMAT).equals(FORMAT_DELIMITED_TEXT)) {
-            throw new IllegalArgumentException("format" + configuration.get(KEY_FORMAT) + " is not supported");
-        }
-
-        if (!(configuration.get(HDFSAdapter.KEY_INPUT_FORMAT).equals(HDFSAdapter.INPUT_FORMAT_TEXT) || configuration
-                .get(HDFSAdapter.KEY_INPUT_FORMAT).equals(HDFSAdapter.INPUT_FORMAT_SEQUENCE))) {
-            throw new IllegalArgumentException("file input format" + configuration.get(HDFSAdapter.KEY_INPUT_FORMAT)
-                    + " is not supported");
-        }
-
-        hdfsAdapter = new HDFSAdapter(atype);
-        hdfsAdapter.configure(configuration);
+    public void configure(Map<String, Object> arguments) throws Exception {
+        this.configuration = arguments;
+        this.hdfsAdapter.configure(arguments);
     }
 
     @Override
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/IDatasourceAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/IDatasourceAdapter.java
index b0dc32f..031f34f 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/IDatasourceAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/IDatasourceAdapter.java
@@ -70,7 +70,7 @@
      * @return String the value corresponding to the configuration parameter
      *         represented by the key- attributeKey.
      */
-    public String getAdapterProperty(String propertyKey);
+    public Object getAdapterProperty(String propertyKey);
 
     /**
      * Configures the IDatasourceAdapter instance.
@@ -100,7 +100,7 @@
      *            providing all arguments as a set of (key,value) pairs. These
      *            arguments are put into the metadata.
      */
-    public void configure(Map<String, String> arguments) throws Exception;
+    public void configure(Map<String, Object> arguments) throws Exception;
 
     /**
      * Returns a list of partition constraints. A partition constraint can be a
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/NCFileSystemAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/NCFileSystemAdapter.java
index ef39d45..9abc92a 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/NCFileSystemAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/NCFileSystemAdapter.java
@@ -44,9 +44,9 @@
     }
 
     @Override
-    public void configure(Map<String, String> arguments) throws Exception {
+    public void configure(Map<String, Object> arguments) throws Exception {
         this.configuration = arguments;
-        String[] splits = arguments.get(KEY_PATH).split(",");
+        String[] splits = ((String) arguments.get(KEY_PATH)).split(",");
         configureFileSplits(splits);
         configureFormat();
     }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java
index ebfbcad..66d9f98 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java
@@ -31,9 +31,8 @@
  */
 public class PullBasedTwitterAdapter extends PullBasedAdapter implements IManagedFeedAdapter {
 
-   
     private static final long serialVersionUID = 1L;
-    
+
     public static final String QUERY = "query";
     public static final String INTERVAL = "interval";
 
@@ -49,7 +48,7 @@
     }
 
     @Override
-    public void configure(Map<String, String> arguments) throws Exception {
+    public void configure(Map<String, Object> arguments) throws Exception {
         configuration = arguments;
         String[] fieldNames = { "id", "username", "location", "text", "timestamp" };
         IAType[] fieldTypes = { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING,
@@ -69,12 +68,12 @@
     }
 
     @Override
-    public void stop()  {
+    public void stop() {
         tweetClient.stop();
     }
 
     @Override
-    public void alter(Map<String, String> properties)  {
+    public void alter(Map<String, String> properties) {
         alterRequested = true;
         this.alteredParams = properties;
     }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java
index 2a07472..06cddfd 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java
@@ -63,8 +63,8 @@
         tupleFieldValues = new String[recordType.getFieldNames().length];
     }
 
-    public void initialize(Map<String, String> params) {
-        this.keywords = params.get(PullBasedTwitterAdapter.QUERY);
+    public void initialize(Map<String, Object> params) {
+        this.keywords = (String) params.get(PullBasedTwitterAdapter.QUERY);
         this.query = new Query(keywords);
         query.setRpp(100);
     }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java
index 611183c..ccd6516 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java
@@ -72,9 +72,9 @@
     }
 
     @Override
-    public void configure(Map<String, String> arguments) throws Exception {
+    public void configure(Map<String, Object> arguments) throws Exception {
         configuration = arguments;
-        String rssURLProperty = configuration.get(KEY_RSS_URL);
+        String rssURLProperty = (String) configuration.get(KEY_RSS_URL);
         if (rssURLProperty == null) {
             throw new IllegalArgumentException("no rss url provided");
         }
@@ -94,7 +94,7 @@
     }
 
     protected void reconfigure(Map<String, String> arguments) {
-        String rssURLProperty = configuration.get(KEY_RSS_URL);
+        String rssURLProperty = (String) configuration.get(KEY_RSS_URL);
         if (rssURLProperty != null) {
             initializeFeedURLs(rssURLProperty);
         }
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
index 117f492..29bafa9 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
@@ -34,6 +34,7 @@
 import edu.uci.ics.asterix.common.transactions.IResourceManager.ResourceType;
 import edu.uci.ics.asterix.common.transactions.JobId;
 import edu.uci.ics.asterix.dataflow.data.nontagged.valueproviders.AqlPrimitiveValueProviderFactory;
+import edu.uci.ics.asterix.external.adapter.factory.HDFSAdapterFactory;
 import edu.uci.ics.asterix.external.adapter.factory.IAdapterFactory;
 import edu.uci.ics.asterix.external.adapter.factory.IGenericDatasetAdapterFactory;
 import edu.uci.ics.asterix.external.adapter.factory.ITypedDatasetAdapterFactory;
@@ -101,6 +102,7 @@
 import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
 import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
 import edu.uci.ics.hyracks.algebricks.runtime.operators.std.SinkWriterRuntimeFactory;
+import edu.uci.ics.hyracks.api.context.ICCContext;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ILinearizeComparatorFactory;
@@ -119,6 +121,7 @@
 import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
 import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
 import edu.uci.ics.hyracks.dataflow.std.result.ResultWriterOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
 import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMInteriorFrameFactory;
 import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
@@ -157,6 +160,7 @@
     private final AsterixStorageProperties storageProperties;
 
     private static final Map<String, String> adapterFactoryMapping = initializeAdapterFactoryMapping();
+    private static Scheduler hdfsScheduler;
 
     public String getPropertyValue(String propertyName) {
         return config.get(propertyName);
@@ -178,6 +182,16 @@
         this.defaultDataverse = defaultDataverse;
         this.stores = AsterixAppContextInfo.getInstance().getMetadataProperties().getStores();
         this.storageProperties = AsterixAppContextInfo.getInstance().getStorageProperties();
+        ICCContext ccContext = AsterixAppContextInfo.getInstance().getCCApplicationContext().getCCContext();
+        try {
+            if (hdfsScheduler == null) {
+                //set the singleton hdfs scheduler
+                hdfsScheduler = new Scheduler(ccContext.getClusterControllerInfo().getClientNetAddress(), ccContext
+                        .getClusterControllerInfo().getClientNetPort());
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
     }
 
     public void setJobId(JobId jobId) {
@@ -343,8 +357,8 @@
                 adapterFactory = (IGenericDatasetAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
             }
 
-            adapter = ((IGenericDatasetAdapterFactory) adapterFactory).createAdapter(datasetDetails.getProperties(),
-                    itemType);
+            adapter = ((IGenericDatasetAdapterFactory) adapterFactory).createAdapter(
+                    wrapProperties(datasetDetails.getProperties()), itemType);
         } catch (AlgebricksException ae) {
             throw ae;
         } catch (Exception e) {
@@ -362,7 +376,7 @@
         RecordDescriptor scannerDesc = new RecordDescriptor(new ISerializerDeserializer[] { payloadSerde });
 
         ExternalDataScanOperatorDescriptor dataScanner = new ExternalDataScanOperatorDescriptor(jobSpec,
-                adapterFactoryClassname, datasetDetails.getProperties(), rt, scannerDesc);
+                wrapPropertiesEmpty(datasetDetails.getProperties()), rt, scannerDesc, adapterFactory);
 
         AlgebricksPartitionConstraint constraint;
         try {
@@ -427,14 +441,15 @@
             }
 
             if (adapterFactory instanceof ITypedDatasetAdapterFactory) {
-                adapter = ((ITypedDatasetAdapterFactory) adapterFactory).createAdapter(datasetDetails.getProperties());
+                adapter = ((ITypedDatasetAdapterFactory) adapterFactory).createAdapter(wrapProperties(datasetDetails
+                        .getProperties()));
                 adapterOutputType = ((ITypedDatasourceAdapter) adapter).getAdapterOutputType();
             } else if (adapterFactory instanceof IGenericDatasetAdapterFactory) {
                 String outputTypeName = datasetDetails.getProperties().get(IGenericDatasetAdapterFactory.KEY_TYPE_NAME);
                 adapterOutputType = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataset.getDataverseName(),
                         outputTypeName).getDatatype();
                 adapter = ((IGenericDatasetAdapterFactory) adapterFactory).createAdapter(
-                        datasetDetails.getProperties(), adapterOutputType);
+                        wrapProperties(datasetDetails.getProperties()), adapterOutputType);
             } else {
                 throw new IllegalStateException(" Unknown factory type for " + adapterFactoryClassname);
             }
@@ -451,7 +466,8 @@
 
         FeedIntakeOperatorDescriptor feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, new FeedId(
                 dataset.getDataverseName(), dataset.getDatasetName()), adapterFactoryClassname,
-                datasetDetails.getProperties(), (ARecordType) adapterOutputType, feedDesc);
+                this.wrapPropertiesEmpty(datasetDetails.getProperties()), (ARecordType) adapterOutputType, feedDesc,
+                adapterFactory);
 
         AlgebricksPartitionConstraint constraint = null;
         try {
@@ -1484,4 +1500,32 @@
         return FormatUtils.getDefaultFormat();
     }
 
+    /**
+     * Add HDFS scheduler and the cluster location constraint into the scheduler
+     * 
+     * @param properties
+     *            the original dataset properties
+     * @return a new map containing the original dataset properties and the scheduler/locations
+     */
+    private Map<String, Object> wrapProperties(Map<String, String> properties) {
+        Map<String, Object> wrappedProperties = new HashMap<String, Object>();
+        wrappedProperties.putAll(properties);
+        wrappedProperties.put(HDFSAdapterFactory.SCHEDULER, hdfsScheduler);
+        wrappedProperties.put(HDFSAdapterFactory.CLUSTER_LOCATIONS, getClusterLocations());
+        return wrappedProperties;
+    }
+
+    /**
+     * Adapt the original properties to a string-object map
+     * 
+     * @param properties
+     *            the original properties
+     * @return the new stirng-object map
+     */
+    private Map<String, Object> wrapPropertiesEmpty(Map<String, String> properties) {
+        Map<String, Object> wrappedProperties = new HashMap<String, Object>();
+        wrappedProperties.putAll(properties);
+        return wrappedProperties;
+    }
+
 }
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapter.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapter.java
index 84b989d..afdf343 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapter.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapter.java
@@ -19,7 +19,6 @@
 import java.util.Map;
 
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.external.adapter.factory.IGenericDatasetAdapterFactory;
 import edu.uci.ics.asterix.external.dataset.adapter.FileSystemBasedAdapter;
 import edu.uci.ics.asterix.external.dataset.adapter.ITypedDatasourceAdapter;
 import edu.uci.ics.asterix.feed.managed.adapter.IManagedFeedAdapter;
@@ -47,48 +46,15 @@
         IManagedFeedAdapter {
 
     private static final long serialVersionUID = 1L;
+    private FileSystemBasedAdapter coreAdapter;
+    private String format;
 
-    public static final String KEY_FILE_SYSTEM = "fs";
-    public static final String LOCAL_FS = "localfs";
-    public static final String HDFS = "hdfs";
-
-    private final FileSystemBasedAdapter coreAdapter;
-    private final Map<String, String> configuration;
-    private final String fileSystem;
-    private final String format;
-
-    public RateControlledFileSystemBasedAdapter(ARecordType atype, Map<String, String> configuration) throws Exception {
+    public RateControlledFileSystemBasedAdapter(ARecordType atype, Map<String, Object> configuration,
+            FileSystemBasedAdapter coreAdapter, String format) throws Exception {
         super(atype);
-        checkRequiredArgs(configuration);
-        fileSystem = configuration.get(KEY_FILE_SYSTEM);
-        String adapterFactoryClass = null;
-        if (fileSystem.equalsIgnoreCase(LOCAL_FS)) {
-            adapterFactoryClass = "edu.uci.ics.asterix.external.adapter.factory.NCFileSystemAdapterFactory";
-        } else if (fileSystem.equals(HDFS)) {
-            adapterFactoryClass = "edu.uci.ics.asterix.external.adapter.factory.HDFSAdapterFactory";
-        } else {
-            throw new AsterixException("Unsupported file system type " + fileSystem);
-        }
-        format = configuration.get(KEY_FORMAT);
-        IGenericDatasetAdapterFactory adapterFactory = (IGenericDatasetAdapterFactory) Class.forName(
-                adapterFactoryClass).newInstance();
-        coreAdapter = (FileSystemBasedAdapter) adapterFactory.createAdapter(configuration, atype);
         this.configuration = configuration;
-    }
-
-    private void checkRequiredArgs(Map<String, String> configuration) throws Exception {
-        if (configuration.get(KEY_FILE_SYSTEM) == null) {
-            throw new Exception("File system type not specified. (fs=?) File system could be 'localfs' or 'hdfs'");
-        }
-        if (configuration.get(IGenericDatasetAdapterFactory.KEY_TYPE_NAME) == null) {
-            throw new Exception("Record type not specified (output-type-name=?)");
-        }
-        if (configuration.get(KEY_PATH) == null) {
-            throw new Exception("File path not specified (path=?)");
-        }
-        if (configuration.get(KEY_FORMAT) == null) {
-            throw new Exception("File format not specified (format=?)");
-        }
+        this.coreAdapter = coreAdapter;
+        this.format = format;
     }
 
     @Override
@@ -103,7 +69,7 @@
     }
 
     @Override
-    public void configure(Map<String, String> arguments) throws Exception {
+    public void configure(Map<String, Object> arguments) throws Exception {
         coreAdapter.configure(arguments);
     }
 
@@ -189,16 +155,16 @@
 
     private final ARecordType recordType;
     private final IDataParser dataParser;
-    private final Map<String, String> configuration;
+    private final Map<String, Object> configuration;
 
     public RateControlledTupleParserFactory(ARecordType recordType, IValueParserFactory[] valueParserFactories,
-            char fieldDelimiter, Map<String, String> configuration) {
+            char fieldDelimiter, Map<String, Object> configuration) {
         this.recordType = recordType;
         dataParser = new DelimitedDataParser(recordType, valueParserFactories, fieldDelimiter);
         this.configuration = configuration;
     }
 
-    public RateControlledTupleParserFactory(ARecordType recordType, Map<String, String> configuration) {
+    public RateControlledTupleParserFactory(ARecordType recordType, Map<String, Object> configuration) {
         this.recordType = recordType;
         dataParser = new ADMDataParser();
         this.configuration = configuration;
@@ -221,10 +187,10 @@
     public static final String INTER_TUPLE_INTERVAL = "tuple-interval";
 
     public RateControlledTupleParser(IHyracksTaskContext ctx, ARecordType recType, IDataParser dataParser,
-            Map<String, String> configuration) {
+            Map<String, Object> configuration) {
         super(ctx, recType);
         this.dataParser = dataParser;
-        String propValue = configuration.get(INTER_TUPLE_INTERVAL);
+        String propValue = (String) configuration.get(INTER_TUPLE_INTERVAL);
         if (propValue != null) {
             interTupleInterval = Long.parseLong(propValue);
         } else {
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapterFactory.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapterFactory.java
index 6c32acb..bf1c268 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapterFactory.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapterFactory.java
@@ -16,7 +16,9 @@
 
 import java.util.Map;
 
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
 import edu.uci.ics.asterix.external.adapter.factory.IGenericDatasetAdapterFactory;
+import edu.uci.ics.asterix.external.dataset.adapter.FileSystemBasedAdapter;
 import edu.uci.ics.asterix.external.dataset.adapter.IDatasourceAdapter;
 import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.asterix.om.types.IAType;
@@ -28,10 +30,37 @@
  * source file has been ingested.
  */
 public class RateControlledFileSystemBasedAdapterFactory implements IGenericDatasetAdapterFactory {
+    private static final long serialVersionUID = 1L;
+    
+    public static final String KEY_FILE_SYSTEM = "fs";
+    public static final String LOCAL_FS = "localfs";
+    public static final String HDFS = "hdfs";
+    public static final String KEY_PATH = "path";
+    public static final String KEY_FORMAT = "format";
+
+    private IGenericDatasetAdapterFactory adapterFactory;
+    private String format;
+    private boolean setup = false;
 
     @Override
-    public IDatasourceAdapter createAdapter(Map<String, String> configuration, IAType type) throws Exception {
-        return new RateControlledFileSystemBasedAdapter((ARecordType) type, configuration);
+    public IDatasourceAdapter createAdapter(Map<String, Object> configuration, IAType type) throws Exception {
+        if (!setup) {
+            checkRequiredArgs(configuration);
+            String fileSystem = (String) configuration.get(KEY_FILE_SYSTEM);
+            String adapterFactoryClass = null;
+            if (fileSystem.equalsIgnoreCase(LOCAL_FS)) {
+                adapterFactoryClass = "edu.uci.ics.asterix.external.adapter.factory.NCFileSystemAdapterFactory";
+            } else if (fileSystem.equals(HDFS)) {
+                adapterFactoryClass = "edu.uci.ics.asterix.external.adapter.factory.HDFSAdapterFactory";
+            } else {
+                throw new AsterixException("Unsupported file system type " + fileSystem);
+            }
+            format = (String) configuration.get(KEY_FORMAT);
+            adapterFactory = (IGenericDatasetAdapterFactory) Class.forName(adapterFactoryClass).newInstance();
+            setup = true;
+        }
+        return new RateControlledFileSystemBasedAdapter((ARecordType) type, configuration,
+                (FileSystemBasedAdapter) adapterFactory.createAdapter(configuration, type), format);
     }
 
     @Override
@@ -39,4 +68,19 @@
         return "file_feed";
     }
 
+    private void checkRequiredArgs(Map<String, Object> configuration) throws Exception {
+        if (configuration.get(KEY_FILE_SYSTEM) == null) {
+            throw new Exception("File system type not specified. (fs=?) File system could be 'localfs' or 'hdfs'");
+        }
+        if (configuration.get(IGenericDatasetAdapterFactory.KEY_TYPE_NAME) == null) {
+            throw new Exception("Record type not specified (output-type-name=?)");
+        }
+        if (configuration.get(KEY_PATH) == null) {
+            throw new Exception("File path not specified (path=?)");
+        }
+        if (configuration.get(KEY_FORMAT) == null) {
+            throw new Exception("File format not specified (format=?)");
+        }
+    }
+
 }
\ No newline at end of file