Merge branch 'master' into yingyi/asterix_beta_fix
diff --git a/asterix-app/src/test/resources/hadoop/conf/mapred-site.xml b/asterix-app/src/test/resources/hadoop/conf/mapred-site.xml
index 1b9a4d6..b39fced 100644
--- a/asterix-app/src/test/resources/hadoop/conf/mapred-site.xml
+++ b/asterix-app/src/test/resources/hadoop/conf/mapred-site.xml
@@ -5,21 +5,21 @@
 
 <configuration>
 
-  <property>
-    <name>mapred.job.tracker</name>
-    <value>localhost:29007</value>
-  </property>
-  <property>
-     <name>mapred.tasktracker.map.tasks.maximum</name>
-     <value>20</value>
-  </property>
-   <property>
-      <name>mapred.tasktracker.reduce.tasks.maximum</name>
-      <value>20</value>
-   </property>
-   <property>
-      <name>mapred.min.split.size</name>
-      <value>65536</value>
-   </property>
+	<property>
+		<name>mapred.job.tracker</name>
+		<value>localhost:29007</value>
+	</property>
+	<property>
+		<name>mapred.tasktracker.map.tasks.maximum</name>
+		<value>20</value>
+	</property>
+	<property>
+		<name>mapred.tasktracker.reduce.tasks.maximum</name>
+		<value>20</value>
+	</property>
+	<property>
+		<name>mapred.max.split.size</name>
+		<value>128</value>
+	</property>
 
 </configuration>
diff --git a/asterix-app/src/test/resources/runtimets/queries/hdfs/issue_245_hdfs/issue_245_hdfs.3.query.aql b/asterix-app/src/test/resources/runtimets/queries/hdfs/issue_245_hdfs/issue_245_hdfs.3.query.aql
index 653ee6c..527a0e5 100644
--- a/asterix-app/src/test/resources/runtimets/queries/hdfs/issue_245_hdfs/issue_245_hdfs.3.query.aql
+++ b/asterix-app/src/test/resources/runtimets/queries/hdfs/issue_245_hdfs/issue_245_hdfs.3.query.aql
@@ -8,4 +8,5 @@
 use dataverse test;
 
 for $x in dataset('TextDataset')
+order by $x.line
 return $x
diff --git a/asterix-app/src/test/resources/runtimets/results/hdfs/issue_245_hdfs/issue_245_hdfs.1.adm b/asterix-app/src/test/resources/runtimets/results/hdfs/issue_245_hdfs/issue_245_hdfs.1.adm
index 8af2f5f..59425b1 100644
--- a/asterix-app/src/test/resources/runtimets/results/hdfs/issue_245_hdfs/issue_245_hdfs.1.adm
+++ b/asterix-app/src/test/resources/runtimets/results/hdfs/issue_245_hdfs/issue_245_hdfs.1.adm
@@ -1,4 +1,4 @@
+{ "line": "ASTERIX is taking an open stance on data formats and addressing research issues including highly scalable data storage and indexing, semi-structured query processing on very large clusters, and merging parallel database techniques with todays data-intensive computing techniques to support performant yet declarative solutions to the problem of analyzing semi-structured information" }
+{ "line": "ASTERIX targets a wide range of semi-structured information, ranging from data use cases where information is well-tagged and highly regular to content use cases where data is irregular and much of each datum is textual" }
 { "line": "The ASTERIX project is developing new technologies for ingesting, storing, managing, indexing, querying, analyzing, and subscribing to vast quantities of semi-structured information" }
 { "line": "The project is combining ideas from three distinct areas semi-structured data, parallel databases, and data-intensive computing  to create a next-generation, open source software platform that scales by running on large, shared-nothing commodity computing clusters" }
-{ "line": "ASTERIX targets a wide range of semi-structured information, ranging from data use cases where information is well-tagged and highly regular to content use cases where data is irregular and much of each datum is textual" }
-{ "line": "ASTERIX is taking an open stance on data formats and addressing research issues including highly scalable data storage and indexing, semi-structured query processing on very large clusters, and merging parallel database techniques with todays data-intensive computing techniques to support performant yet declarative solutions to the problem of analyzing semi-structured information" }
diff --git a/asterix-external-data/pom.xml b/asterix-external-data/pom.xml
index 842cd67..aff6967 100644
--- a/asterix-external-data/pom.xml
+++ b/asterix-external-data/pom.xml
@@ -131,7 +131,8 @@
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
-			<artifactId>hyracks-dataflow-hadoop</artifactId>
+			<artifactId>hyracks-hdfs-core</artifactId>
+			<version>${hyracks.version}</version>
 		</dependency>
 		<dependency>
 			<groupId>jdom</groupId>
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/CNNFeedAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/CNNFeedAdapterFactory.java
index f1f5884..8c8880a 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/CNNFeedAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/CNNFeedAdapterFactory.java
@@ -23,9 +23,10 @@
  * A factory class for creating the @see {CNNFeedAdapter}.  
  */
 public class CNNFeedAdapterFactory implements ITypedDatasetAdapterFactory {
+    private static final long serialVersionUID = 1L;
 
     @Override
-    public IDatasourceAdapter createAdapter(Map<String, String> configuration) throws Exception {
+    public IDatasourceAdapter createAdapter(Map<String, Object> configuration) throws Exception {
         CNNFeedAdapter cnnFeedAdapter = new CNNFeedAdapter();
         cnnFeedAdapter.configure(configuration);
         return cnnFeedAdapter;
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java
index 6fcb710..c267658 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java
@@ -14,22 +14,79 @@
  */
 package edu.uci.ics.asterix.external.adapter.factory;
 
+import java.util.Arrays;
+import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+
 import edu.uci.ics.asterix.external.dataset.adapter.HDFSAdapter;
 import edu.uci.ics.asterix.external.dataset.adapter.IDatasourceAdapter;
 import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.hdfs.dataflow.ConfFactory;
+import edu.uci.ics.hyracks.hdfs.dataflow.InputSplitsFactory;
+import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
 
 /**
  * A factory class for creating an instance of HDFSAdapter
  */
+@SuppressWarnings("deprecation")
 public class HDFSAdapterFactory implements IGenericDatasetAdapterFactory {
+    private static final long serialVersionUID = 1L;
 
     public static final String HDFS_ADAPTER_NAME = "hdfs";
+    public static final String CLUSTER_LOCATIONS = "cluster-locations";
+    public static transient String SCHEDULER = "hdfs-scheduler";
+
+    public static final String KEY_HDFS_URL = "hdfs";
+    public static final String KEY_PATH = "path";
+    public static final String KEY_INPUT_FORMAT = "input-format";
+    public static final String INPUT_FORMAT_TEXT = "text-input-format";
+    public static final String INPUT_FORMAT_SEQUENCE = "sequence-input-format";
+
+    private transient AlgebricksPartitionConstraint clusterLocations;
+    private String[] readSchedule;
+    private boolean executed[];
+    private InputSplitsFactory inputSplitsFactory;
+    private ConfFactory confFactory;
+    private boolean setup = false;
+
+    private static final Map<String, String> formatClassNames = initInputFormatMap();
+
+    private static Map<String, String> initInputFormatMap() {
+        Map<String, String> formatClassNames = new HashMap<String, String>();
+        formatClassNames.put(INPUT_FORMAT_TEXT, "org.apache.hadoop.mapred.TextInputFormat");
+        formatClassNames.put(INPUT_FORMAT_SEQUENCE, "org.apache.hadoop.mapred.SequenceFileInputFormat");
+        return formatClassNames;
+    }
 
     @Override
-    public IDatasourceAdapter createAdapter(Map<String, String> configuration, IAType atype) throws Exception {
-        HDFSAdapter hdfsAdapter = new HDFSAdapter(atype);
+    public IDatasourceAdapter createAdapter(Map<String, Object> configuration, IAType atype) throws Exception {
+        if (!setup) {
+            /** set up the factory --serializable stuff --- this if-block should be called only once for each factory instance */
+            configureJobConf(configuration);
+            JobConf conf = configureJobConf(configuration);
+            confFactory = new ConfFactory(conf);
+
+            clusterLocations = (AlgebricksPartitionConstraint) configuration.get(CLUSTER_LOCATIONS);
+            int numPartitions = ((AlgebricksAbsolutePartitionConstraint) clusterLocations).getLocations().length;
+
+            InputSplit[] inputSplits = conf.getInputFormat().getSplits(conf, numPartitions);
+            inputSplitsFactory = new InputSplitsFactory(inputSplits);
+
+            Scheduler scheduler = (Scheduler) configuration.get(SCHEDULER);
+            readSchedule = scheduler.getLocationConstraints(inputSplits);
+            executed = new boolean[readSchedule.length];
+            Arrays.fill(executed, false);
+
+            setup = true;
+        }
+        JobConf conf = confFactory.getConf();
+        InputSplit[] inputSplits = inputSplitsFactory.getSplits();
+        HDFSAdapter hdfsAdapter = new HDFSAdapter(atype, readSchedule, executed, inputSplits, conf, clusterLocations);
         hdfsAdapter.configure(configuration);
         return hdfsAdapter;
     }
@@ -39,4 +96,15 @@
         return HDFS_ADAPTER_NAME;
     }
 
+    private JobConf configureJobConf(Map<String, Object> configuration) throws Exception {
+        JobConf conf = new JobConf();
+        conf.set("fs.default.name", ((String) configuration.get(KEY_HDFS_URL)).trim());
+        conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
+        conf.setClassLoader(HDFSAdapter.class.getClassLoader());
+        conf.set("mapred.input.dir", ((String) configuration.get(KEY_PATH)).trim());
+        conf.set("mapred.input.format.class",
+                (String) formatClassNames.get(((String) configuration.get(KEY_INPUT_FORMAT)).trim()));
+        return conf;
+    }
+
 }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java
index 5e28eed..1845c07 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java
@@ -14,26 +14,96 @@
  */
 package edu.uci.ics.asterix.external.adapter.factory;
 
+import java.util.Arrays;
+import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+
+import edu.uci.ics.asterix.external.dataset.adapter.HDFSAdapter;
 import edu.uci.ics.asterix.external.dataset.adapter.HiveAdapter;
 import edu.uci.ics.asterix.external.dataset.adapter.IDatasourceAdapter;
 import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.hdfs.dataflow.ConfFactory;
+import edu.uci.ics.hyracks.hdfs.dataflow.InputSplitsFactory;
+import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
 
 /**
  * A factory class for creating an instance of HiveAdapter
  */
+@SuppressWarnings("deprecation")
 public class HiveAdapterFactory implements IGenericDatasetAdapterFactory {
+    private static final long serialVersionUID = 1L;
+
+    public static final String HDFS_ADAPTER_NAME = "hdfs";
+    public static final String CLUSTER_LOCATIONS = "cluster-locations";
+    public static transient String SCHEDULER = "hdfs-scheduler";
+
+    public static final String KEY_HDFS_URL = "hdfs";
+    public static final String KEY_PATH = "path";
+    public static final String KEY_INPUT_FORMAT = "input-format";
+    public static final String INPUT_FORMAT_TEXT = "text-input-format";
+    public static final String INPUT_FORMAT_SEQUENCE = "sequence-input-format";
+
+    private String[] readSchedule;
+    private boolean executed[];
+    private InputSplitsFactory inputSplitsFactory;
+    private ConfFactory confFactory;
+    private AlgebricksPartitionConstraint clusterLocations;
+    private boolean setup = false;
+
+    private static final Map<String, String> formatClassNames = initInputFormatMap();
+
+    private static Map<String, String> initInputFormatMap() {
+        Map<String, String> formatClassNames = new HashMap<String, String>();
+        formatClassNames.put(INPUT_FORMAT_TEXT, "org.apache.hadoop.mapred.TextInputFormat");
+        formatClassNames.put(INPUT_FORMAT_SEQUENCE, "org.apache.hadoop.mapred.SequenceFileInputFormat");
+        return formatClassNames;
+    }
 
     @Override
-    public IDatasourceAdapter createAdapter(Map<String, String> configuration, IAType type) throws Exception {
-        HiveAdapter hiveAdapter = new HiveAdapter(type);
-        hiveAdapter.configure(configuration);
-        return hiveAdapter;
+    public IDatasourceAdapter createAdapter(Map<String, Object> configuration, IAType atype) throws Exception {
+        if (!setup) {
+            /** set up the factory --serializable stuff --- this if-block should be called only once for each factory instance */
+            configureJobConf(configuration);
+            JobConf conf = configureJobConf(configuration);
+            confFactory = new ConfFactory(conf);
+
+            clusterLocations = (AlgebricksPartitionConstraint) configuration.get(CLUSTER_LOCATIONS);
+            int numPartitions = ((AlgebricksAbsolutePartitionConstraint) clusterLocations).getLocations().length;
+
+            InputSplit[] inputSplits = conf.getInputFormat().getSplits(conf, numPartitions);
+            inputSplitsFactory = new InputSplitsFactory(inputSplits);
+
+            Scheduler scheduler = (Scheduler) configuration.get(SCHEDULER);
+            readSchedule = scheduler.getLocationConstraints(inputSplits);
+            executed = new boolean[readSchedule.length];
+            Arrays.fill(executed, false);
+
+            setup = true;
+        }
+        JobConf conf = confFactory.getConf();
+        InputSplit[] inputSplits = inputSplitsFactory.getSplits();
+        HiveAdapter hdfsAdapter = new HiveAdapter(atype, readSchedule, executed, inputSplits, conf, clusterLocations);
+        return hdfsAdapter;
     }
 
     @Override
     public String getName() {
         return "hive";
     }
+
+    private JobConf configureJobConf(Map<String, Object> configuration) throws Exception {
+        JobConf conf = new JobConf();
+        conf.set("fs.default.name", ((String) configuration.get(KEY_HDFS_URL)).trim());
+        conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
+        conf.setClassLoader(HDFSAdapter.class.getClassLoader());
+        conf.set("mapred.input.dir", ((String) configuration.get(KEY_PATH)).trim());
+        conf.set("mapred.input.format.class",
+                (String) formatClassNames.get(((String) configuration.get(KEY_INPUT_FORMAT)).trim()));
+        return conf;
+    }
 }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/IAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/IAdapterFactory.java
index 45fd6cf..697c8ea 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/IAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/IAdapterFactory.java
@@ -14,12 +14,14 @@
  */
 package edu.uci.ics.asterix.external.adapter.factory;
 
+import java.io.Serializable;
+
 /**
  * Base interface for IGenericDatasetAdapterFactory and ITypedDatasetAdapterFactory.
  * Acts as a marker interface indicating that the implementation provides functionality
  * for creating an adapter.
  */
-public interface IAdapterFactory {
+public interface IAdapterFactory extends Serializable {
 
     /**
      * Returns the display name corresponding to the Adapter type that is created by the factory.
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/IGenericDatasetAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/IGenericDatasetAdapterFactory.java
index 093a3dd..e8d120a 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/IGenericDatasetAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/IGenericDatasetAdapterFactory.java
@@ -38,6 +38,6 @@
      * @return An instance of IDatasourceAdapter.
      * @throws Exception
      */
-    public IDatasourceAdapter createAdapter(Map<String, String> configuration, IAType atype) throws Exception;
+    public IDatasourceAdapter createAdapter(Map<String, Object> configuration, IAType atype) throws Exception;
 
 }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/ITypedDatasetAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/ITypedDatasetAdapterFactory.java
index 0f9978e..84a5ca8 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/ITypedDatasetAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/ITypedDatasetAdapterFactory.java
@@ -33,6 +33,6 @@
      * @return An instance of IDatasourceAdapter.
      * @throws Exception
      */
-    public IDatasourceAdapter createAdapter(Map<String, String> configuration) throws Exception;
+    public IDatasourceAdapter createAdapter(Map<String, Object> configuration) throws Exception;
 
 }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java
index 2040949..659fd23 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java
@@ -26,11 +26,11 @@
  * an NC.
  */
 public class NCFileSystemAdapterFactory implements IGenericDatasetAdapterFactory {
-
+    private static final long serialVersionUID = 1L;
     public static final String NC_FILE_SYSTEM_ADAPTER_NAME = "localfs";
 
     @Override
-    public IDatasourceAdapter createAdapter(Map<String, String> configuration, IAType atype) throws Exception {
+    public IDatasourceAdapter createAdapter(Map<String, Object> configuration, IAType atype) throws Exception {
         NCFileSystemAdapter fsAdapter = new NCFileSystemAdapter(atype);
         fsAdapter.configure(configuration);
         return fsAdapter;
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedTwitterAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedTwitterAdapterFactory.java
index bc00469..e63be17 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedTwitterAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedTwitterAdapterFactory.java
@@ -25,11 +25,11 @@
  * via pull-based Twitter API.
  */
 public class PullBasedTwitterAdapterFactory implements ITypedDatasetAdapterFactory {
-
+    private static final long serialVersionUID = 1L;
     public static final String PULL_BASED_TWITTER_ADAPTER_NAME = "pull_twitter";
 
     @Override
-    public IDatasourceAdapter createAdapter(Map<String, String> configuration) throws Exception {
+    public IDatasourceAdapter createAdapter(Map<String, Object> configuration) throws Exception {
         PullBasedTwitterAdapter twitterAdapter = new PullBasedTwitterAdapter();
         twitterAdapter.configure(configuration);
         return twitterAdapter;
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/RSSFeedAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/RSSFeedAdapterFactory.java
index bbbea38..3cd22e8 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/RSSFeedAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/RSSFeedAdapterFactory.java
@@ -24,11 +24,11 @@
  * RSSFeedAdapter provides the functionality of fetching an RSS based feed.
  */
 public class RSSFeedAdapterFactory implements ITypedDatasetAdapterFactory {
-
+    private static final long serialVersionUID = 1L;
     public static final String RSS_FEED_ADAPTER_NAME = "rss_feed";
 
     @Override
-    public IDatasourceAdapter createAdapter(Map<String, String> configuration) throws Exception {
+    public IDatasourceAdapter createAdapter(Map<String, Object> configuration) throws Exception {
         RSSFeedAdapter rssFeedAdapter = new RSSFeedAdapter();
         rssFeedAdapter.configure(configuration);
         return rssFeedAdapter;
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/ExternalDataScanOperatorDescriptor.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/ExternalDataScanOperatorDescriptor.java
index fb4cc99..f9f72cf 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/ExternalDataScanOperatorDescriptor.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/ExternalDataScanOperatorDescriptor.java
@@ -19,8 +19,6 @@
 import edu.uci.ics.asterix.external.adapter.factory.IGenericDatasetAdapterFactory;
 import edu.uci.ics.asterix.external.dataset.adapter.IDatasourceAdapter;
 import edu.uci.ics.asterix.om.types.IAType;
-import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
-import edu.uci.ics.hyracks.api.constraints.IConstraintAcceptor;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
@@ -38,61 +36,24 @@
 
     private static final long serialVersionUID = 1L;
 
-    private final String adapterFactory;
-    private final Map<String, String> adapterConfiguration;
+    private final Map<String, Object> adapterConfiguration;
     private final IAType atype;
     private IGenericDatasetAdapterFactory datasourceAdapterFactory;
 
-    public ExternalDataScanOperatorDescriptor(JobSpecification spec, String adapter, Map<String, String> arguments,
-            IAType atype, RecordDescriptor rDesc) {
+    public ExternalDataScanOperatorDescriptor(JobSpecification spec, Map<String, Object> arguments, IAType atype,
+            RecordDescriptor rDesc, IGenericDatasetAdapterFactory dataSourceAdapterFactory) {
         super(spec, 0, 1);
         recordDescriptors[0] = rDesc;
-        this.adapterFactory = adapter;
         this.adapterConfiguration = arguments;
         this.atype = atype;
-    }
-
-    @Override
-    public void contributeSchedulingConstraints(IConstraintAcceptor constraintAcceptor, ICCApplicationContext appCtx) {
-
-        /*
-        Comment: The following code is commented out. This is because constraints are being set at compile time so that they can 
-        be propagated to upstream Asterix operators. Hyracks has to provide a way to propagate constraints to upstream operators.
-        Once it is there, we will uncomment the following code.  
-        
-        AlgebricksPartitionConstraint constraint = datasourceReadAdapter.getPartitionConstraint();
-        switch (constraint.getPartitionConstraintType()) {
-            case ABSOLUTE:
-                String[] locations = ((AlgebricksAbsolutePartitionConstraint) constraint).getLocations();
-                for (int i = 0; i < locations.length; ++i) {
-                    constraintAcceptor.addConstraint(new Constraint(new PartitionLocationExpression(this.odId, i),
-                            new ConstantExpression(locations[i])));
-                }
-                constraintAcceptor.addConstraint(new Constraint(new PartitionCountExpression(this.odId),
-                        new ConstantExpression(locations.length)));
-
-                break;
-            case COUNT:
-                constraintAcceptor.addConstraint(new Constraint(new PartitionCountExpression(this.odId),
-                        new ConstantExpression(((AlgebricksCountPartitionConstraint) constraint).getCount())));
-                break;
-            default:
-                throw new IllegalStateException(" Constraint type :" + constraint.getPartitionConstraintType()
-                        + " not supported");
-
-        }*/
-
+        this.datasourceAdapterFactory = dataSourceAdapterFactory;
     }
 
     @Override
     public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
             IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
             throws HyracksDataException {
-        try {
-            datasourceAdapterFactory = (IGenericDatasetAdapterFactory) Class.forName(adapterFactory).newInstance();
-        } catch (Exception e) {
-            throw new HyracksDataException("initialization of adapter failed", e);
-        }
+
         return new AbstractUnaryOutputSourceOperatorNodePushable() {
             @Override
             public void initialize() throws HyracksDataException {
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedIntakeOperatorDescriptor.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedIntakeOperatorDescriptor.java
index 2da4e76..f07168a 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedIntakeOperatorDescriptor.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedIntakeOperatorDescriptor.java
@@ -35,52 +35,46 @@
  * Operator responsible for ingesting data from an external source. This
  * operator uses a (configurable) adapter associated with the feed dataset.
  */
-public class FeedIntakeOperatorDescriptor extends
-		AbstractSingleActivityOperatorDescriptor {
+public class FeedIntakeOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
 
-	private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 1L;
 
-	private final String adapterFactoryClassName;
-	private final Map<String, String> adapterConfiguration;
-	private final IAType atype;
-	private final FeedId feedId;
+    private final String adapterFactoryClassName;
+    private final Map<String, Object> adapterConfiguration;
+    private final IAType atype;
+    private final FeedId feedId;
+    private final IAdapterFactory datasourceAdapterFactory;
 
-	private transient IAdapterFactory datasourceAdapterFactory;
+    public FeedIntakeOperatorDescriptor(JobSpecification spec, FeedId feedId, String adapter,
+            Map<String, Object> arguments, ARecordType atype, RecordDescriptor rDesc,
+            IAdapterFactory datasourceAdapterFactory) {
+        super(spec, 1, 1);
+        recordDescriptors[0] = rDesc;
+        this.adapterFactoryClassName = adapter;
+        this.adapterConfiguration = arguments;
+        this.atype = atype;
+        this.feedId = feedId;
+        this.datasourceAdapterFactory = datasourceAdapterFactory;
+    }
 
-	public FeedIntakeOperatorDescriptor(JobSpecification spec, FeedId feedId,
-			String adapter, Map<String, String> arguments, ARecordType atype,
-			RecordDescriptor rDesc) {
-		super(spec, 1, 1);
-		recordDescriptors[0] = rDesc;
-		this.adapterFactoryClassName = adapter;
-		this.adapterConfiguration = arguments;
-		this.atype = atype;
-		this.feedId = feedId;
-	}
-
-	public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
-			IRecordDescriptorProvider recordDescProvider, final int partition,
-			int nPartitions) throws HyracksDataException {
-		ITypedDatasourceAdapter adapter;
-		try {
-			datasourceAdapterFactory = (IAdapterFactory) Class.forName(
-					adapterFactoryClassName).newInstance();
-			if (datasourceAdapterFactory instanceof IGenericDatasetAdapterFactory) {
-				adapter = (ITypedDatasourceAdapter) ((IGenericDatasetAdapterFactory) datasourceAdapterFactory)
-						.createAdapter(adapterConfiguration, atype);
-			} else if (datasourceAdapterFactory instanceof ITypedDatasetAdapterFactory) {
-				adapter = (ITypedDatasourceAdapter) ((ITypedDatasetAdapterFactory) datasourceAdapterFactory)
-						.createAdapter(adapterConfiguration);
-			} else {
-				throw new IllegalStateException(
-						" Unknown adapter factory type for "
-								+ adapterFactoryClassName);
-			}
-			adapter.initialize(ctx);
-		} catch (Exception e) {
-			throw new HyracksDataException("initialization of adapter failed",
-					e);
-		}
-		return new FeedIntakeOperatorNodePushable(feedId, adapter, partition);
-	}
+    public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+            IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
+            throws HyracksDataException {
+        ITypedDatasourceAdapter adapter;
+        try {
+            if (datasourceAdapterFactory instanceof IGenericDatasetAdapterFactory) {
+                adapter = (ITypedDatasourceAdapter) ((IGenericDatasetAdapterFactory) datasourceAdapterFactory)
+                        .createAdapter(adapterConfiguration, atype);
+            } else if (datasourceAdapterFactory instanceof ITypedDatasetAdapterFactory) {
+                adapter = (ITypedDatasourceAdapter) ((ITypedDatasetAdapterFactory) datasourceAdapterFactory)
+                        .createAdapter(adapterConfiguration);
+            } else {
+                throw new IllegalStateException(" Unknown adapter factory type for " + adapterFactoryClassName);
+            }
+            adapter.initialize(ctx);
+        } catch (Exception e) {
+            throw new HyracksDataException("initialization of adapter failed", e);
+        }
+        return new FeedIntakeOperatorNodePushable(feedId, adapter, partition);
+    }
 }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/AbstractDatasourceAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/AbstractDatasourceAdapter.java
index 440ee8c..23e545d 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/AbstractDatasourceAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/AbstractDatasourceAdapter.java
@@ -36,7 +36,7 @@
 
     private static final long serialVersionUID = 1L;
 
-    protected Map<String, String> configuration;
+    protected Map<String, Object> configuration;
     protected transient AlgebricksPartitionConstraint partitionConstraint;
     protected IAType atype;
     protected IHyracksTaskContext ctx;
@@ -51,15 +51,15 @@
         typeToValueParserFactMap.put(ATypeTag.STRING, UTF8StringParserFactory.INSTANCE);
     }
 
-    protected static final Map<String, String> formatToParserFactoryMap = initializeFormatParserFactoryMap();
+    protected static final Map<String, Object> formatToParserFactoryMap = initializeFormatParserFactoryMap();
 
     public static final String KEY_FORMAT = "format";
     public static final String KEY_PARSER_FACTORY = "parser";
     public static final String FORMAT_DELIMITED_TEXT = "delimited-text";
     public static final String FORMAT_ADM = "adm";
 
-    private static Map<String, String> initializeFormatParserFactoryMap() {
-        Map<String, String> map = new HashMap<String, String>();
+    private static Map<String, Object> initializeFormatParserFactoryMap() {
+        Map<String, Object> map = new HashMap<String, Object>();
         map.put(FORMAT_DELIMITED_TEXT, "edu.uci.ics.asterix.runtime.operators.file.NtDelimitedDataTupleParserFactory");
         map.put(FORMAT_ADM, "edu.uci.ics.asterix.runtime.operators.file.AdmSchemafullRecordParserFactory");
         return map;
@@ -77,7 +77,7 @@
      * @param attribute
      *            The attribute whose value needs to be obtained.
      */
-    public String getAdapterProperty(String attribute) {
+    public Object getAdapterProperty(String attribute) {
         return configuration.get(attribute);
     }
 
@@ -86,7 +86,7 @@
      * 
      * @return A Map<String,String> instance representing the adapter configuration.
      */
-    public Map<String, String> getConfiguration() {
+    public Map<String, Object> getConfiguration() {
         return configuration;
     }
 
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/CNNFeedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/CNNFeedAdapter.java
index 3898f7e..8976f7a 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/CNNFeedAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/CNNFeedAdapter.java
@@ -70,9 +70,9 @@
     }
 
     @Override
-    public void configure(Map<String, String> arguments) throws Exception {
+    public void configure(Map<String, Object> arguments) throws Exception {
         configuration = arguments;
-        String rssURLProperty = configuration.get(KEY_RSS_URL);
+        String rssURLProperty = (String) configuration.get(KEY_RSS_URL);
         if (rssURLProperty == null) {
             throw new IllegalArgumentException("no rss url provided");
         }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/FileSystemBasedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/FileSystemBasedAdapter.java
index 9f8cedc..8ab252d 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/FileSystemBasedAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/FileSystemBasedAdapter.java
@@ -69,7 +69,7 @@
     public abstract void initialize(IHyracksTaskContext ctx) throws Exception;
 
     @Override
-    public abstract void configure(Map<String, String> arguments) throws Exception;
+    public abstract void configure(Map<String, Object> arguments) throws Exception;
 
     @Override
     public abstract AdapterType getAdapterType();
@@ -82,14 +82,14 @@
     }
 
     protected void configureFormat() throws Exception {
-        String parserFactoryClassname = configuration.get(KEY_PARSER_FACTORY);
+        String parserFactoryClassname = (String) configuration.get(KEY_PARSER_FACTORY);
         if (parserFactoryClassname == null) {
-            String specifiedFormat = configuration.get(KEY_FORMAT);
+            String specifiedFormat = (String) configuration.get(KEY_FORMAT);
             if (specifiedFormat == null) {
                 throw new IllegalArgumentException(" Unspecified data format");
             } else if (FORMAT_DELIMITED_TEXT.equalsIgnoreCase(specifiedFormat)) {
                 parserFactory = getDelimitedDataTupleParserFactory((ARecordType) atype);
-            } else if (FORMAT_ADM.equalsIgnoreCase(configuration.get(KEY_FORMAT))) {
+            } else if (FORMAT_ADM.equalsIgnoreCase((String)configuration.get(KEY_FORMAT))) {
                 parserFactory = getADMDataTupleParserFactory((ARecordType) atype);
             } else {
                 throw new IllegalArgumentException(" format " + configuration.get(KEY_FORMAT) + " not supported");
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java
index 1e05b2f..02eacf5 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java
@@ -14,18 +14,9 @@
  */
 package edu.uci.ics.asterix.external.dataset.adapter;
 
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.logging.Level;
-import java.util.logging.Logger;
 
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.Counters.Counter;
@@ -37,14 +28,9 @@
 import org.apache.hadoop.mapred.TextInputFormat;
 
 import edu.uci.ics.asterix.om.types.IAType;
-import edu.uci.ics.asterix.om.util.AsterixRuntimeUtil;
-import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.hadoop.util.InputSplitsProxy;
 
 /**
  * Provides functionality for fetching external data stored in an HDFS instance.
@@ -53,118 +39,29 @@
 public class HDFSAdapter extends FileSystemBasedAdapter {
 
     private static final long serialVersionUID = 1L;
-    private static final Logger LOGGER = Logger.getLogger(HDFSAdapter.class.getName());
 
-    public static final String KEY_HDFS_URL = "hdfs";
-    public static final String KEY_INPUT_FORMAT = "input-format";
-    public static final String INPUT_FORMAT_TEXT = "text-input-format";
-    public static final String INPUT_FORMAT_SEQUENCE = "sequence-input-format";
-
-    private Object[] inputSplits;
+    private transient String[] readSchedule;
+    private transient boolean executed[];
+    private transient InputSplit[] inputSplits;
     private transient JobConf conf;
-    private InputSplitsProxy inputSplitsProxy;
-    private static final Map<String, String> formatClassNames = initInputFormatMap();
+    private transient AlgebricksPartitionConstraint clusterLocations;
 
-    private static Map<String, String> initInputFormatMap() {
-        Map<String, String> formatClassNames = new HashMap<String, String>();
-        formatClassNames.put(INPUT_FORMAT_TEXT, "org.apache.hadoop.mapred.TextInputFormat");
-        formatClassNames.put(INPUT_FORMAT_SEQUENCE, "org.apache.hadoop.mapred.SequenceFileInputFormat");
-        return formatClassNames;
-    }
+    private transient String nodeName;
 
-    public HDFSAdapter(IAType atype) {
+    public HDFSAdapter(IAType atype, String[] readSchedule, boolean[] executed, InputSplit[] inputSplits, JobConf conf,
+            AlgebricksPartitionConstraint clusterLocations) {
         super(atype);
+        this.readSchedule = readSchedule;
+        this.executed = executed;
+        this.inputSplits = inputSplits;
+        this.conf = conf;
+        this.clusterLocations = clusterLocations;
     }
 
     @Override
-    public void configure(Map<String, String> arguments) throws Exception {
-        configuration = arguments;
+    public void configure(Map<String, Object> arguments) throws Exception {
+        this.configuration = arguments;
         configureFormat();
-        configureJobConf();
-        configureSplits();
-    }
-
-    private void configureSplits() throws IOException {
-        if (inputSplitsProxy == null) {
-            inputSplits = conf.getInputFormat().getSplits(conf, 0);
-        }
-        inputSplitsProxy = new InputSplitsProxy(conf, inputSplits);
-    }
-
-    private void configurePartitionConstraint() throws Exception {
-        List<String> locations = new ArrayList<String>();
-        Random random = new Random();
-        boolean couldConfigureLocationConstraints = false;
-        try {
-            Map<String, Set<String>> nodeControllers = AsterixRuntimeUtil.getNodeControllerMap();
-            for (Object inputSplit : inputSplits) {
-                String[] dataNodeLocations = ((InputSplit) inputSplit).getLocations();
-                if (dataNodeLocations == null || dataNodeLocations.length == 0) {
-                    throw new IllegalArgumentException("No datanode locations found: check hdfs path");
-                }
-
-                // loop over all replicas until a split location coincides
-                // with an asterix datanode location
-                for (String datanodeLocation : dataNodeLocations) {
-                    Set<String> nodeControllersAtLocation = null;
-                    try {
-                        nodeControllersAtLocation = nodeControllers.get(AsterixRuntimeUtil
-                                .getIPAddress(datanodeLocation));
-                    } catch (UnknownHostException uhe) {
-                        if (LOGGER.isLoggable(Level.WARNING)) {
-                            LOGGER.log(Level.WARNING, "Unknown host :" + datanodeLocation);
-                        }
-                        continue;
-                    }
-                    if (nodeControllersAtLocation == null || nodeControllersAtLocation.size() == 0) {
-                        if (LOGGER.isLoggable(Level.WARNING)) {
-                            LOGGER.log(Level.WARNING, "No node controller found at " + datanodeLocation
-                                    + " will look at replica location");
-                        }
-                        couldConfigureLocationConstraints = false;
-                    } else {
-                        int locationIndex = random.nextInt(nodeControllersAtLocation.size());
-                        String chosenLocation = (String) nodeControllersAtLocation.toArray()[locationIndex];
-                        locations.add(chosenLocation);
-                        if (LOGGER.isLoggable(Level.INFO)) {
-                            LOGGER.log(Level.INFO, "split : " + inputSplit + " to be processed by :" + chosenLocation);
-                        }
-                        couldConfigureLocationConstraints = true;
-                        break;
-                    }
-                }
-
-                /* none of the replica locations coincides with an Asterix
-                   node controller location.
-                */
-                if (!couldConfigureLocationConstraints) {
-                    List<String> allNodeControllers = AsterixRuntimeUtil.getAllNodeControllers();
-                    int locationIndex = random.nextInt(allNodeControllers.size());
-                    String chosenLocation = allNodeControllers.get(locationIndex);
-                    locations.add(chosenLocation);
-                    if (LOGGER.isLoggable(Level.SEVERE)) {
-                        LOGGER.log(Level.SEVERE, "No local node controller found to process split : " + inputSplit
-                                + " will be processed by a remote node controller:" + chosenLocation);
-                    }
-                }
-            }
-            partitionConstraint = new AlgebricksAbsolutePartitionConstraint(locations.toArray(new String[] {}));
-        } catch (Exception e) {
-            if (LOGGER.isLoggable(Level.SEVERE)) {
-                LOGGER.log(Level.SEVERE, "Encountered exception :" + e + " using count constraints");
-            }
-            partitionConstraint = new AlgebricksCountPartitionConstraint(inputSplits.length);
-        }
-    }
-
-    private JobConf configureJobConf() throws Exception {
-        conf = new JobConf();
-        conf.set("fs.default.name", configuration.get(KEY_HDFS_URL).trim());
-        conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
-        conf.setClassLoader(HDFSAdapter.class.getClassLoader());
-        conf.set("mapred.input.dir", configuration.get(KEY_PATH).trim());
-        conf.set("mapred.input.format.class", formatClassNames.get(configuration.get(KEY_INPUT_FORMAT).trim()));
-        return conf;
     }
 
     public AdapterType getAdapterType() {
@@ -174,7 +71,7 @@
     @Override
     public void initialize(IHyracksTaskContext ctx) throws Exception {
         this.ctx = ctx;
-        inputSplits = inputSplitsProxy.toInputSplits(conf);
+        this.nodeName = ctx.getJobletContext().getApplicationContext().getNodeId();
     }
 
     private Reporter getReporter() {
@@ -215,98 +112,124 @@
         return reporter;
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     public InputStream getInputStream(int partition) throws IOException {
-        try {
-            InputStream inputStream;
-            if (conf.getInputFormat() instanceof SequenceFileInputFormat) {
-                SequenceFileInputFormat format = (SequenceFileInputFormat) conf.getInputFormat();
-                RecordReader reader = format.getRecordReader(
-                        (org.apache.hadoop.mapred.FileSplit) inputSplits[partition], conf, getReporter());
-                inputStream = new HDFSStream(reader, ctx);
-            } else {
-                try {
+
+        return new InputStream() {
+
+            private RecordReader<Object, Text> reader;
+            private Object key;
+            private Text value;
+            private boolean hasMore = false;
+            private int EOL = "\n".getBytes()[0];
+            private Text pendingValue = null;
+            private int currentSplitIndex = 0;
+
+            @SuppressWarnings("unchecked")
+            private boolean moveToNext() throws IOException {
+                for (; currentSplitIndex < inputSplits.length; currentSplitIndex++) {
+                    /**
+                     * read all the partitions scheduled to the current node
+                     */
+                    if (readSchedule[currentSplitIndex].equals(nodeName)) {
+                        /**
+                         * pick an unread split to read
+                         * synchronize among simultaneous partitions in the same machine
+                         */
+                        synchronized (executed) {
+                            if (executed[currentSplitIndex] == false) {
+                                executed[currentSplitIndex] = true;
+                            } else {
+                                continue;
+                            }
+                        }
+
+                        /**
+                         * read the split
+                         */
+                        reader = getRecordReader(currentSplitIndex);
+                        key = reader.createKey();
+                        value = (Text) reader.createValue();
+                        return true;
+                    }
+                }
+                return false;
+            }
+
+            @Override
+            public int read(byte[] buffer, int offset, int len) throws IOException {
+                if (reader == null) {
+                    if (!moveToNext()) {
+                        //nothing to read
+                        return -1;
+                    }
+                }
+
+                int numBytes = 0;
+                if (pendingValue != null) {
+                    System.arraycopy(pendingValue.getBytes(), 0, buffer, offset + numBytes, pendingValue.getLength());
+                    buffer[offset + numBytes + pendingValue.getLength()] = (byte) EOL;
+                    numBytes += pendingValue.getLength() + 1;
+                    pendingValue = null;
+                }
+
+                while (numBytes < len) {
+                    hasMore = reader.next(key, value);
+                    if (!hasMore) {
+                        while (moveToNext()) {
+                            hasMore = reader.next(key, value);
+                            if (hasMore) {
+                                //move to the next non-empty split
+                                break;
+                            }
+                        }
+                    }
+                    if (!hasMore) {
+                        return (numBytes == 0) ? -1 : numBytes;
+                    }
+                    int sizeOfNextTuple = value.getLength() + 1;
+                    if (numBytes + sizeOfNextTuple > len) {
+                        // cannot add tuple to current buffer
+                        // but the reader has moved pass the fetched tuple
+                        // we need to store this for a subsequent read call.
+                        // and return this then.
+                        pendingValue = value;
+                        break;
+                    } else {
+                        System.arraycopy(value.getBytes(), 0, buffer, offset + numBytes, value.getLength());
+                        buffer[offset + numBytes + value.getLength()] = (byte) EOL;
+                        numBytes += sizeOfNextTuple;
+                    }
+                }
+                return numBytes;
+            }
+
+            @Override
+            public int read() throws IOException {
+                throw new NotImplementedException("Use read(byte[], int, int");
+            }
+
+            private RecordReader getRecordReader(int slitIndex) throws IOException {
+                if (conf.getInputFormat() instanceof SequenceFileInputFormat) {
+                    SequenceFileInputFormat format = (SequenceFileInputFormat) conf.getInputFormat();
+                    RecordReader reader = format.getRecordReader(
+                            (org.apache.hadoop.mapred.FileSplit) inputSplits[slitIndex], conf, getReporter());
+                    return reader;
+                } else {
                     TextInputFormat format = (TextInputFormat) conf.getInputFormat();
                     RecordReader reader = format.getRecordReader(
-                            (org.apache.hadoop.mapred.FileSplit) inputSplits[partition], conf, getReporter());
-                    inputStream = new HDFSStream(reader, ctx);
-                } catch (FileNotFoundException e) {
-                    throw new HyracksDataException(e);
+                            (org.apache.hadoop.mapred.FileSplit) inputSplits[slitIndex], conf, getReporter());
+                    return reader;
                 }
             }
-            return inputStream;
-        } catch (Exception e) {
-            throw new IOException(e);
-        }
+
+        };
 
     }
 
     @Override
     public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
-        if (partitionConstraint == null) {
-            configurePartitionConstraint();
-        }
-        return partitionConstraint;
-    }
-
-}
-
-class HDFSStream extends InputStream {
-
-    private RecordReader<Object, Text> reader;
-    private final Object key;
-    private final Text value;
-    private boolean hasMore = false;
-    private static final int EOL = "\n".getBytes()[0];
-    private Text pendingValue = null;
-
-    public HDFSStream(RecordReader<Object, Text> reader, IHyracksTaskContext ctx) throws Exception {
-        this.reader = reader;
-        key = reader.createKey();
-        try {
-            value = (Text) reader.createValue();
-        } catch (ClassCastException cce) {
-            throw new Exception("value is not of type org.apache.hadoop.io.Text"
-                    + " type not supported in sequence file format", cce);
-        }
-    }
-
-    @Override
-    public int read(byte[] buffer, int offset, int len) throws IOException {
-        int numBytes = 0;
-        if (pendingValue != null) {
-            System.arraycopy(pendingValue.getBytes(), 0, buffer, offset + numBytes, pendingValue.getLength());
-            buffer[offset + numBytes + pendingValue.getLength()] = (byte) EOL;
-            numBytes += pendingValue.getLength() + 1;
-            pendingValue = null;
-        }
-
-        while (numBytes < len) {
-            hasMore = reader.next(key, value);
-            if (!hasMore) {
-                return (numBytes == 0) ? -1 : numBytes;
-            }
-            int sizeOfNextTuple = value.getLength() + 1;
-            if (numBytes + sizeOfNextTuple > len) {
-                // cannot add tuple to current buffer
-                // but the reader has moved pass the fetched tuple
-                // we need to store this for a subsequent read call.
-                // and return this then.
-                pendingValue = value;
-                break;
-            } else {
-                System.arraycopy(value.getBytes(), 0, buffer, offset + numBytes, value.getLength());
-                buffer[offset + numBytes + value.getLength()] = (byte) EOL;
-                numBytes += sizeOfNextTuple;
-            }
-        }
-        return numBytes;
-    }
-
-    @Override
-    public int read() throws IOException {
-        throw new NotImplementedException("Use read(byte[], int, int");
+        return clusterLocations;
     }
 
 }
\ No newline at end of file
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HiveAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HiveAdapter.java
index 3731eba..7f15a09 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HiveAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HiveAdapter.java
@@ -16,6 +16,9 @@
 
 import java.util.Map;
 
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+
 import edu.uci.ics.asterix.om.types.IAType;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
@@ -24,6 +27,7 @@
 /**
  * Provides the functionality of fetching data in form of ADM records from a Hive dataset.
  */
+@SuppressWarnings("deprecation")
 public class HiveAdapter extends AbstractDatasourceAdapter {
 
     private static final long serialVersionUID = 1L;
@@ -37,8 +41,9 @@
 
     private HDFSAdapter hdfsAdapter;
 
-    public HiveAdapter(IAType atype) {
-        this.hdfsAdapter = new HDFSAdapter(atype);
+    public HiveAdapter(IAType atype, String[] readSchedule, boolean[] executed, InputSplit[] inputSplits, JobConf conf,
+            AlgebricksPartitionConstraint clusterLocations) {
+        this.hdfsAdapter = new HDFSAdapter(atype, readSchedule, executed, inputSplits, conf, clusterLocations);
         this.atype = atype;
     }
 
@@ -48,33 +53,8 @@
     }
 
     @Override
-    public void configure(Map<String, String> arguments) throws Exception {
-        configuration = arguments;
-        configureHadoopAdapter();
-    }
+    public void configure(Map<String, Object> arguments) throws Exception {
 
-    private void configureHadoopAdapter() throws Exception {
-        String database = configuration.get(HIVE_DATABASE);
-        String tablePath = null;
-        if (database == null) {
-            tablePath = configuration.get(HIVE_WAREHOUSE_DIR) + "/" + configuration.get(HIVE_TABLE);
-        } else {
-            tablePath = configuration.get(HIVE_WAREHOUSE_DIR) + "/" + tablePath + ".db" + "/"
-                    + configuration.get(HIVE_TABLE);
-        }
-        configuration.put(HDFSAdapter.KEY_PATH, tablePath);
-        if (!configuration.get(KEY_FORMAT).equals(FORMAT_DELIMITED_TEXT)) {
-            throw new IllegalArgumentException("format" + configuration.get(KEY_FORMAT) + " is not supported");
-        }
-
-        if (!(configuration.get(HDFSAdapter.KEY_INPUT_FORMAT).equals(HDFSAdapter.INPUT_FORMAT_TEXT) || configuration
-                .get(HDFSAdapter.KEY_INPUT_FORMAT).equals(HDFSAdapter.INPUT_FORMAT_SEQUENCE))) {
-            throw new IllegalArgumentException("file input format" + configuration.get(HDFSAdapter.KEY_INPUT_FORMAT)
-                    + " is not supported");
-        }
-
-        hdfsAdapter = new HDFSAdapter(atype);
-        hdfsAdapter.configure(configuration);
     }
 
     @Override
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/IDatasourceAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/IDatasourceAdapter.java
index b0dc32f..031f34f 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/IDatasourceAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/IDatasourceAdapter.java
@@ -70,7 +70,7 @@
      * @return String the value corresponding to the configuration parameter
      *         represented by the key- attributeKey.
      */
-    public String getAdapterProperty(String propertyKey);
+    public Object getAdapterProperty(String propertyKey);
 
     /**
      * Configures the IDatasourceAdapter instance.
@@ -100,7 +100,7 @@
      *            providing all arguments as a set of (key,value) pairs. These
      *            arguments are put into the metadata.
      */
-    public void configure(Map<String, String> arguments) throws Exception;
+    public void configure(Map<String, Object> arguments) throws Exception;
 
     /**
      * Returns a list of partition constraints. A partition constraint can be a
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/NCFileSystemAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/NCFileSystemAdapter.java
index ef39d45..9abc92a 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/NCFileSystemAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/NCFileSystemAdapter.java
@@ -44,9 +44,9 @@
     }
 
     @Override
-    public void configure(Map<String, String> arguments) throws Exception {
+    public void configure(Map<String, Object> arguments) throws Exception {
         this.configuration = arguments;
-        String[] splits = arguments.get(KEY_PATH).split(",");
+        String[] splits = ((String) arguments.get(KEY_PATH)).split(",");
         configureFileSplits(splits);
         configureFormat();
     }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java
index ebfbcad..66d9f98 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java
@@ -31,9 +31,8 @@
  */
 public class PullBasedTwitterAdapter extends PullBasedAdapter implements IManagedFeedAdapter {
 
-   
     private static final long serialVersionUID = 1L;
-    
+
     public static final String QUERY = "query";
     public static final String INTERVAL = "interval";
 
@@ -49,7 +48,7 @@
     }
 
     @Override
-    public void configure(Map<String, String> arguments) throws Exception {
+    public void configure(Map<String, Object> arguments) throws Exception {
         configuration = arguments;
         String[] fieldNames = { "id", "username", "location", "text", "timestamp" };
         IAType[] fieldTypes = { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING,
@@ -69,12 +68,12 @@
     }
 
     @Override
-    public void stop()  {
+    public void stop() {
         tweetClient.stop();
     }
 
     @Override
-    public void alter(Map<String, String> properties)  {
+    public void alter(Map<String, String> properties) {
         alterRequested = true;
         this.alteredParams = properties;
     }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java
index 2a07472..06cddfd 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java
@@ -63,8 +63,8 @@
         tupleFieldValues = new String[recordType.getFieldNames().length];
     }
 
-    public void initialize(Map<String, String> params) {
-        this.keywords = params.get(PullBasedTwitterAdapter.QUERY);
+    public void initialize(Map<String, Object> params) {
+        this.keywords = (String) params.get(PullBasedTwitterAdapter.QUERY);
         this.query = new Query(keywords);
         query.setRpp(100);
     }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java
index 611183c..ccd6516 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java
@@ -72,9 +72,9 @@
     }
 
     @Override
-    public void configure(Map<String, String> arguments) throws Exception {
+    public void configure(Map<String, Object> arguments) throws Exception {
         configuration = arguments;
-        String rssURLProperty = configuration.get(KEY_RSS_URL);
+        String rssURLProperty = (String) configuration.get(KEY_RSS_URL);
         if (rssURLProperty == null) {
             throw new IllegalArgumentException("no rss url provided");
         }
@@ -94,7 +94,7 @@
     }
 
     protected void reconfigure(Map<String, String> arguments) {
-        String rssURLProperty = configuration.get(KEY_RSS_URL);
+        String rssURLProperty = (String) configuration.get(KEY_RSS_URL);
         if (rssURLProperty != null) {
             initializeFeedURLs(rssURLProperty);
         }
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
index 117f492..29bafa9 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
@@ -34,6 +34,7 @@
 import edu.uci.ics.asterix.common.transactions.IResourceManager.ResourceType;
 import edu.uci.ics.asterix.common.transactions.JobId;
 import edu.uci.ics.asterix.dataflow.data.nontagged.valueproviders.AqlPrimitiveValueProviderFactory;
+import edu.uci.ics.asterix.external.adapter.factory.HDFSAdapterFactory;
 import edu.uci.ics.asterix.external.adapter.factory.IAdapterFactory;
 import edu.uci.ics.asterix.external.adapter.factory.IGenericDatasetAdapterFactory;
 import edu.uci.ics.asterix.external.adapter.factory.ITypedDatasetAdapterFactory;
@@ -101,6 +102,7 @@
 import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
 import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
 import edu.uci.ics.hyracks.algebricks.runtime.operators.std.SinkWriterRuntimeFactory;
+import edu.uci.ics.hyracks.api.context.ICCContext;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ILinearizeComparatorFactory;
@@ -119,6 +121,7 @@
 import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
 import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
 import edu.uci.ics.hyracks.dataflow.std.result.ResultWriterOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
 import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMInteriorFrameFactory;
 import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
@@ -157,6 +160,7 @@
     private final AsterixStorageProperties storageProperties;
 
     private static final Map<String, String> adapterFactoryMapping = initializeAdapterFactoryMapping();
+    private static Scheduler hdfsScheduler;
 
     public String getPropertyValue(String propertyName) {
         return config.get(propertyName);
@@ -178,6 +182,16 @@
         this.defaultDataverse = defaultDataverse;
         this.stores = AsterixAppContextInfo.getInstance().getMetadataProperties().getStores();
         this.storageProperties = AsterixAppContextInfo.getInstance().getStorageProperties();
+        ICCContext ccContext = AsterixAppContextInfo.getInstance().getCCApplicationContext().getCCContext();
+        try {
+            if (hdfsScheduler == null) {
+                //set the singleton hdfs scheduler
+                hdfsScheduler = new Scheduler(ccContext.getClusterControllerInfo().getClientNetAddress(), ccContext
+                        .getClusterControllerInfo().getClientNetPort());
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
     }
 
     public void setJobId(JobId jobId) {
@@ -343,8 +357,8 @@
                 adapterFactory = (IGenericDatasetAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
             }
 
-            adapter = ((IGenericDatasetAdapterFactory) adapterFactory).createAdapter(datasetDetails.getProperties(),
-                    itemType);
+            adapter = ((IGenericDatasetAdapterFactory) adapterFactory).createAdapter(
+                    wrapProperties(datasetDetails.getProperties()), itemType);
         } catch (AlgebricksException ae) {
             throw ae;
         } catch (Exception e) {
@@ -362,7 +376,7 @@
         RecordDescriptor scannerDesc = new RecordDescriptor(new ISerializerDeserializer[] { payloadSerde });
 
         ExternalDataScanOperatorDescriptor dataScanner = new ExternalDataScanOperatorDescriptor(jobSpec,
-                adapterFactoryClassname, datasetDetails.getProperties(), rt, scannerDesc);
+                wrapPropertiesEmpty(datasetDetails.getProperties()), rt, scannerDesc, adapterFactory);
 
         AlgebricksPartitionConstraint constraint;
         try {
@@ -427,14 +441,15 @@
             }
 
             if (adapterFactory instanceof ITypedDatasetAdapterFactory) {
-                adapter = ((ITypedDatasetAdapterFactory) adapterFactory).createAdapter(datasetDetails.getProperties());
+                adapter = ((ITypedDatasetAdapterFactory) adapterFactory).createAdapter(wrapProperties(datasetDetails
+                        .getProperties()));
                 adapterOutputType = ((ITypedDatasourceAdapter) adapter).getAdapterOutputType();
             } else if (adapterFactory instanceof IGenericDatasetAdapterFactory) {
                 String outputTypeName = datasetDetails.getProperties().get(IGenericDatasetAdapterFactory.KEY_TYPE_NAME);
                 adapterOutputType = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataset.getDataverseName(),
                         outputTypeName).getDatatype();
                 adapter = ((IGenericDatasetAdapterFactory) adapterFactory).createAdapter(
-                        datasetDetails.getProperties(), adapterOutputType);
+                        wrapProperties(datasetDetails.getProperties()), adapterOutputType);
             } else {
                 throw new IllegalStateException(" Unknown factory type for " + adapterFactoryClassname);
             }
@@ -451,7 +466,8 @@
 
         FeedIntakeOperatorDescriptor feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, new FeedId(
                 dataset.getDataverseName(), dataset.getDatasetName()), adapterFactoryClassname,
-                datasetDetails.getProperties(), (ARecordType) adapterOutputType, feedDesc);
+                this.wrapPropertiesEmpty(datasetDetails.getProperties()), (ARecordType) adapterOutputType, feedDesc,
+                adapterFactory);
 
         AlgebricksPartitionConstraint constraint = null;
         try {
@@ -1484,4 +1500,32 @@
         return FormatUtils.getDefaultFormat();
     }
 
+    /**
+     * Add HDFS scheduler and the cluster location constraint into the scheduler
+     * 
+     * @param properties
+     *            the original dataset properties
+     * @return a new map containing the original dataset properties and the scheduler/locations
+     */
+    private Map<String, Object> wrapProperties(Map<String, String> properties) {
+        Map<String, Object> wrappedProperties = new HashMap<String, Object>();
+        wrappedProperties.putAll(properties);
+        wrappedProperties.put(HDFSAdapterFactory.SCHEDULER, hdfsScheduler);
+        wrappedProperties.put(HDFSAdapterFactory.CLUSTER_LOCATIONS, getClusterLocations());
+        return wrappedProperties;
+    }
+
+    /**
+     * Adapt the original properties to a string-object map
+     * 
+     * @param properties
+     *            the original properties
+     * @return the new stirng-object map
+     */
+    private Map<String, Object> wrapPropertiesEmpty(Map<String, String> properties) {
+        Map<String, Object> wrappedProperties = new HashMap<String, Object>();
+        wrappedProperties.putAll(properties);
+        return wrappedProperties;
+    }
+
 }
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapter.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapter.java
index 84b989d..afdf343 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapter.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapter.java
@@ -19,7 +19,6 @@
 import java.util.Map;
 
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.external.adapter.factory.IGenericDatasetAdapterFactory;
 import edu.uci.ics.asterix.external.dataset.adapter.FileSystemBasedAdapter;
 import edu.uci.ics.asterix.external.dataset.adapter.ITypedDatasourceAdapter;
 import edu.uci.ics.asterix.feed.managed.adapter.IManagedFeedAdapter;
@@ -47,48 +46,15 @@
         IManagedFeedAdapter {
 
     private static final long serialVersionUID = 1L;
+    private FileSystemBasedAdapter coreAdapter;
+    private String format;
 
-    public static final String KEY_FILE_SYSTEM = "fs";
-    public static final String LOCAL_FS = "localfs";
-    public static final String HDFS = "hdfs";
-
-    private final FileSystemBasedAdapter coreAdapter;
-    private final Map<String, String> configuration;
-    private final String fileSystem;
-    private final String format;
-
-    public RateControlledFileSystemBasedAdapter(ARecordType atype, Map<String, String> configuration) throws Exception {
+    public RateControlledFileSystemBasedAdapter(ARecordType atype, Map<String, Object> configuration,
+            FileSystemBasedAdapter coreAdapter, String format) throws Exception {
         super(atype);
-        checkRequiredArgs(configuration);
-        fileSystem = configuration.get(KEY_FILE_SYSTEM);
-        String adapterFactoryClass = null;
-        if (fileSystem.equalsIgnoreCase(LOCAL_FS)) {
-            adapterFactoryClass = "edu.uci.ics.asterix.external.adapter.factory.NCFileSystemAdapterFactory";
-        } else if (fileSystem.equals(HDFS)) {
-            adapterFactoryClass = "edu.uci.ics.asterix.external.adapter.factory.HDFSAdapterFactory";
-        } else {
-            throw new AsterixException("Unsupported file system type " + fileSystem);
-        }
-        format = configuration.get(KEY_FORMAT);
-        IGenericDatasetAdapterFactory adapterFactory = (IGenericDatasetAdapterFactory) Class.forName(
-                adapterFactoryClass).newInstance();
-        coreAdapter = (FileSystemBasedAdapter) adapterFactory.createAdapter(configuration, atype);
         this.configuration = configuration;
-    }
-
-    private void checkRequiredArgs(Map<String, String> configuration) throws Exception {
-        if (configuration.get(KEY_FILE_SYSTEM) == null) {
-            throw new Exception("File system type not specified. (fs=?) File system could be 'localfs' or 'hdfs'");
-        }
-        if (configuration.get(IGenericDatasetAdapterFactory.KEY_TYPE_NAME) == null) {
-            throw new Exception("Record type not specified (output-type-name=?)");
-        }
-        if (configuration.get(KEY_PATH) == null) {
-            throw new Exception("File path not specified (path=?)");
-        }
-        if (configuration.get(KEY_FORMAT) == null) {
-            throw new Exception("File format not specified (format=?)");
-        }
+        this.coreAdapter = coreAdapter;
+        this.format = format;
     }
 
     @Override
@@ -103,7 +69,7 @@
     }
 
     @Override
-    public void configure(Map<String, String> arguments) throws Exception {
+    public void configure(Map<String, Object> arguments) throws Exception {
         coreAdapter.configure(arguments);
     }
 
@@ -189,16 +155,16 @@
 
     private final ARecordType recordType;
     private final IDataParser dataParser;
-    private final Map<String, String> configuration;
+    private final Map<String, Object> configuration;
 
     public RateControlledTupleParserFactory(ARecordType recordType, IValueParserFactory[] valueParserFactories,
-            char fieldDelimiter, Map<String, String> configuration) {
+            char fieldDelimiter, Map<String, Object> configuration) {
         this.recordType = recordType;
         dataParser = new DelimitedDataParser(recordType, valueParserFactories, fieldDelimiter);
         this.configuration = configuration;
     }
 
-    public RateControlledTupleParserFactory(ARecordType recordType, Map<String, String> configuration) {
+    public RateControlledTupleParserFactory(ARecordType recordType, Map<String, Object> configuration) {
         this.recordType = recordType;
         dataParser = new ADMDataParser();
         this.configuration = configuration;
@@ -221,10 +187,10 @@
     public static final String INTER_TUPLE_INTERVAL = "tuple-interval";
 
     public RateControlledTupleParser(IHyracksTaskContext ctx, ARecordType recType, IDataParser dataParser,
-            Map<String, String> configuration) {
+            Map<String, Object> configuration) {
         super(ctx, recType);
         this.dataParser = dataParser;
-        String propValue = configuration.get(INTER_TUPLE_INTERVAL);
+        String propValue = (String) configuration.get(INTER_TUPLE_INTERVAL);
         if (propValue != null) {
             interTupleInterval = Long.parseLong(propValue);
         } else {
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapterFactory.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapterFactory.java
index 6c32acb..bf1c268 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapterFactory.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapterFactory.java
@@ -16,7 +16,9 @@
 
 import java.util.Map;
 
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
 import edu.uci.ics.asterix.external.adapter.factory.IGenericDatasetAdapterFactory;
+import edu.uci.ics.asterix.external.dataset.adapter.FileSystemBasedAdapter;
 import edu.uci.ics.asterix.external.dataset.adapter.IDatasourceAdapter;
 import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.asterix.om.types.IAType;
@@ -28,10 +30,37 @@
  * source file has been ingested.
  */
 public class RateControlledFileSystemBasedAdapterFactory implements IGenericDatasetAdapterFactory {
+    private static final long serialVersionUID = 1L;
+    
+    public static final String KEY_FILE_SYSTEM = "fs";
+    public static final String LOCAL_FS = "localfs";
+    public static final String HDFS = "hdfs";
+    public static final String KEY_PATH = "path";
+    public static final String KEY_FORMAT = "format";
+
+    private IGenericDatasetAdapterFactory adapterFactory;
+    private String format;
+    private boolean setup = false;
 
     @Override
-    public IDatasourceAdapter createAdapter(Map<String, String> configuration, IAType type) throws Exception {
-        return new RateControlledFileSystemBasedAdapter((ARecordType) type, configuration);
+    public IDatasourceAdapter createAdapter(Map<String, Object> configuration, IAType type) throws Exception {
+        if (!setup) {
+            checkRequiredArgs(configuration);
+            String fileSystem = (String) configuration.get(KEY_FILE_SYSTEM);
+            String adapterFactoryClass = null;
+            if (fileSystem.equalsIgnoreCase(LOCAL_FS)) {
+                adapterFactoryClass = "edu.uci.ics.asterix.external.adapter.factory.NCFileSystemAdapterFactory";
+            } else if (fileSystem.equals(HDFS)) {
+                adapterFactoryClass = "edu.uci.ics.asterix.external.adapter.factory.HDFSAdapterFactory";
+            } else {
+                throw new AsterixException("Unsupported file system type " + fileSystem);
+            }
+            format = (String) configuration.get(KEY_FORMAT);
+            adapterFactory = (IGenericDatasetAdapterFactory) Class.forName(adapterFactoryClass).newInstance();
+            setup = true;
+        }
+        return new RateControlledFileSystemBasedAdapter((ARecordType) type, configuration,
+                (FileSystemBasedAdapter) adapterFactory.createAdapter(configuration, type), format);
     }
 
     @Override
@@ -39,4 +68,19 @@
         return "file_feed";
     }
 
+    private void checkRequiredArgs(Map<String, Object> configuration) throws Exception {
+        if (configuration.get(KEY_FILE_SYSTEM) == null) {
+            throw new Exception("File system type not specified. (fs=?) File system could be 'localfs' or 'hdfs'");
+        }
+        if (configuration.get(IGenericDatasetAdapterFactory.KEY_TYPE_NAME) == null) {
+            throw new Exception("Record type not specified (output-type-name=?)");
+        }
+        if (configuration.get(KEY_PATH) == null) {
+            throw new Exception("File path not specified (path=?)");
+        }
+        if (configuration.get(KEY_FORMAT) == null) {
+            throw new Exception("File format not specified (format=?)");
+        }
+    }
+
 }
\ No newline at end of file