code cleanup for adaptors/factories
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
index 355382a..d57d424 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
@@ -445,8 +445,7 @@
 
                     datasetDetails = new FeedDatasetDetails(InternalDatasetDetails.FileStructure.BTREE,
                             InternalDatasetDetails.PartitioningStrategy.HASH, partitioningExprs, partitioningExprs,
-                            ngName, adapter, configuration, signature,
-                            FeedDatasetDetails.FeedState.INACTIVE.toString(), null, null);
+                            ngName, adapter, configuration, signature);
                     break;
                 }
             }
@@ -1355,7 +1354,7 @@
         try {
             ControlFeedStatement cfs = (ControlFeedStatement) stmt;
             String dataverseName = getActiveDataverseName(cfs.getDataverseName());
-            
+
             String datasetName = cfs.getDatasetName().getValue();
             Dataset dataset = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(),
                     dataverseName, cfs.getDatasetName().getValue());
diff --git a/asterix-app/src/test/resources/runtimets/results/feeds/feeds_01/feeds_01.1.adm b/asterix-app/src/test/resources/runtimets/results/feeds/feeds_01/feeds_01.1.adm
index 4e3714c..fb4e84e 100644
--- a/asterix-app/src/test/resources/runtimets/results/feeds/feeds_01/feeds_01.1.adm
+++ b/asterix-app/src/test/resources/runtimets/results/feeds/feeds_01/feeds_01.1.adm
@@ -1 +1 @@
-{ "DataverseName": "feeds", "DatasetName": "TweetFeed", "DataTypeName": "TweetType", "DatasetType": "FEED", "InternalDetails": null, "ExternalDetails": null, "FeedDetails": { "FileStructure": "BTREE", "PartitioningStrategy": "HASH", "PartitioningKey": [ "id" ], "PrimaryKey": [ "id" ], "GroupName": "DEFAULT_NG_ALL_NODES", "DatasourceAdapter": "edu.uci.ics.asterix.tools.external.data.RateControlledFileSystemBasedAdapterFactory", "Properties": [ { "Name": "output-type-name", "Value": "TweetType" }, { "Name": "fs", "Value": "localfs" }, { "Name": "path", "Value": "nc1://data/twitter/obamatweets.adm" }, { "Name": "format", "Value": "adm" }, { "Name": "tuple-interval", "Value": "10" } ], "Function": null, "Status": "INACTIVE" }, "Hints": {{  }}, "Timestamp": "Tue Jan 29 19:07:24 PST 2013" }
+{ "DataverseName": "feeds", "DatasetName": "TweetFeed", "DataTypeName": "TweetType", "DatasetType": "FEED", "InternalDetails": null, "ExternalDetails": null, "FeedDetails": { "FileStructure": "BTREE", "PartitioningStrategy": "HASH", "PartitioningKey": [ "id" ], "PrimaryKey": [ "id" ], "GroupName": "DEFAULT_NG_ALL_NODES", "DatasourceAdapter": "edu.uci.ics.asterix.tools.external.data.RateControlledFileSystemBasedAdapterFactory", "Properties": [ { "Name": "output-type-name", "Value": "TweetType" }, { "Name": "fs", "Value": "localfs" }, { "Name": "path", "Value": "nc1://data/twitter/obamatweets.adm" }, { "Name": "format", "Value": "adm" }, { "Name": "tuple-interval", "Value": "10" } ], "Function": null }, "Hints": {{  }}, "Timestamp": "Tue Jun 11 13:56:43 PDT 2013", "DatasetId": 696, "PendingOp": 0 }
diff --git a/asterix-app/src/test/resources/runtimets/results/feeds/feeds_02/feeds_02.1.adm b/asterix-app/src/test/resources/runtimets/results/feeds/feeds_02/feeds_02.1.adm
index 9720960..e69de29 100644
--- a/asterix-app/src/test/resources/runtimets/results/feeds/feeds_02/feeds_02.1.adm
+++ b/asterix-app/src/test/resources/runtimets/results/feeds/feeds_02/feeds_02.1.adm
@@ -1,12 +0,0 @@
-{ "id": "nc1:1", "username": "BronsonMike", "location": "", "text": "@GottaLaff @reutersus Christie and obama just foul weather friends", "timestamp": "Thu Dec 06 16:53:06 PST 2012" }
-{ "id": "nc1:100", "username": "KidrauhlProuds", "location": "", "text": "RT @01Direclieber: A filha do Michael Jackson  uma Belieber,a filha do Eminem e uma Belieber,as filhas de Obama sao Beliebers, e a filha do meu pai e Belieber", "timestamp": "Thu Dec 06 16:53:16 PST 2012" }
-{ "id": "nc1:102", "username": "jaysauce82", "location": "", "text": "Not voting for President Obama #BadDecision", "timestamp": "Thu Dec 06 16:53:16 PST 2012" }
-{ "id": "nc1:104", "username": "princeofsupras", "location": "", "text": "RT @01Direclieber: A filha do Michael Jackson e uma Belieber,a filha do Eminem e uma Belieber,as filhas de Obama sao Beliebers, e a filha do meu pai e Belieber", "timestamp": "Thu Dec 06 16:53:15 PST 2012" }
-{ "id": "nc1:106", "username": "GulfDogs", "location": "", "text": "Obama Admin Knew Libyan Terrorists Had US-Provided Weaponsteaparty #tcot #ccot #NewGuards #BreitbartArmy #patriotwttp://t.co/vJxzrQUE", "timestamp": "Thu Dec 06 16:53:14 PST 2012" }
-{ "id": "nc1:108", "username": "Laugzpz", "location": "", "text": "@AlfredoJalife Maestro Obama se hace de la vista gorda, es un acuerdo de siempre creo yo.", "timestamp": "Thu Dec 06 16:53:14 PST 2012" }
-{ "id": "nc1:11", "username": "magarika", "location": "", "text": "RT @ken24xavier: Obama tells SOROS - our plan is ALMOST finished http://t.co/WvzK0GtU", "timestamp": "Thu Dec 06 16:53:05 PST 2012" }
-{ "id": "nc1:111", "username": "ToucanMall", "location": "", "text": "RT @WorldWar3Watch: Michelle Obama Gets More Grammy Nominations Than Justin ...  #Obama #WW3 http://t.co/0Wv2GKij", "timestamp": "Thu Dec 06 16:53:13 PST 2012" }
-{ "id": "nc1:113", "username": "ToucanMall", "location": "", "text": "RT @ObamaPalooza: Tiffany Shared What $2,000 Meant to Her ... and the President Stopped by to Talk About It http://t.co/sgT7lsNV #Obama", "timestamp": "Thu Dec 06 16:53:12 PST 2012" }
-{ "id": "nc1:115", "username": "thewildpitch", "location": "", "text": "RT @RevkahJC: Dennis Miller: Obama Should Just Say He Wants To Tax Successful People http://t.co/Ihlemy9Y", "timestamp": "Thu Dec 06 16:53:11 PST 2012" }
-{ "id": "nc1:117", "username": "Rnugent24", "location": "", "text": "RT @ConservativeQuo: unemployment is above 8% again. I wonder how long it will take for Obama to start blaming Bush? 3-2-1 #tcot #antiobama", "timestamp": "Thu Dec 06 16:53:10 PST 2012" }
-{ "id": "nc1:119", "username": "ToucanMall", "location": "", "text": "RT @Newitrsdotcom: I hope #Obama will win re-election... Other four years without meaningless #wars", "timestamp": "Thu Dec 06 16:53:09 PST 2012" }
diff --git a/asterix-app/src/test/resources/runtimets/results/feeds/feeds_03/feeds_03.1.adm b/asterix-app/src/test/resources/runtimets/results/feeds/feeds_03/feeds_03.1.adm
index 8011e4b..d8a0c3f 100644
--- a/asterix-app/src/test/resources/runtimets/results/feeds/feeds_03/feeds_03.1.adm
+++ b/asterix-app/src/test/resources/runtimets/results/feeds/feeds_03/feeds_03.1.adm
@@ -1 +1 @@
-{ "DataverseName": "feeds", "DatasetName": "TweetFeed", "DataTypeName": "TweetType", "DatasetType": "FEED", "InternalDetails": null, "ExternalDetails": null, "FeedDetails": { "FileStructure": "BTREE", "PartitioningStrategy": "HASH", "PartitioningKey": [ "id" ], "PrimaryKey": [ "id" ], "GroupName": "DEFAULT_NG_ALL_NODES", "DatasourceAdapter": "edu.uci.ics.asterix.tools.external.data.RateControlledFileSystemBasedAdapterFactory", "Properties": [ { "Name": "output-type-name", "Value": "TweetType" }, { "Name": "fs", "Value": "localfs" }, { "Name": "path", "Value": "nc1://data/twitter/obamatweets.adm" }, { "Name": "format", "Value": "adm" }, { "Name": "tuple-interval", "Value": "10" } ], "Function": "feeds.feed_processor@1", "Status": "INACTIVE" }, "Hints": {{  }}, "Timestamp": "Tue Jan 29 19:08:49 PST 2013" }
+{ "DataverseName": "feeds", "DatasetName": "TweetFeed", "DataTypeName": "TweetType", "DatasetType": "FEED", "InternalDetails": null, "ExternalDetails": null, "FeedDetails": { "FileStructure": "BTREE", "PartitioningStrategy": "HASH", "PartitioningKey": [ "id" ], "PrimaryKey": [ "id" ], "GroupName": "DEFAULT_NG_ALL_NODES", "DatasourceAdapter": "edu.uci.ics.asterix.tools.external.data.RateControlledFileSystemBasedAdapterFactory", "Properties": [ { "Name": "output-type-name", "Value": "TweetType" }, { "Name": "fs", "Value": "localfs" }, { "Name": "path", "Value": "nc1://data/twitter/obamatweets.adm" }, { "Name": "format", "Value": "adm" }, { "Name": "tuple-interval", "Value": "10" } ], "Function": "feeds.feed_processor@1" }, "Hints": {{  }}, "Timestamp": "Tue Jun 11 12:28:32 PDT 2013", "DatasetId": 698, "PendingOp": 0 }
diff --git a/asterix-app/src/test/resources/runtimets/results/feeds/feeds_04/feeds_04.1.adm b/asterix-app/src/test/resources/runtimets/results/feeds/feeds_04/feeds_04.1.adm
index 2567483..e69de29 100644
--- a/asterix-app/src/test/resources/runtimets/results/feeds/feeds_04/feeds_04.1.adm
+++ b/asterix-app/src/test/resources/runtimets/results/feeds/feeds_04/feeds_04.1.adm
@@ -1,11 +0,0 @@
-{ "id": "nc1:1", "username": "BronsonMike", "location": "", "text": "@GottaLaff @reutersus Christie and obama just foul weather friends", "timestamp": "Thu Dec 06 16:53:06 PST 2012" }
-{ "id": "nc1:100", "username": "KidrauhlProuds", "location": "", "text": "RT @01Direclieber: A filha do Michael Jackson  uma Belieber,a filha do Eminem e uma Belieber,as filhas de Obama sao Beliebers, e a filha do meu pai e Belieber", "timestamp": "Thu Dec 06 16:53:16 PST 2012" }
-{ "id": "nc1:102", "username": "jaysauce82", "location": "", "text": "Not voting for President Obama #BadDecision", "timestamp": "Thu Dec 06 16:53:16 PST 2012" }
-{ "id": "nc1:104", "username": "princeofsupras", "location": "", "text": "RT @01Direclieber: A filha do Michael Jackson e uma Belieber,a filha do Eminem e uma Belieber,as filhas de Obama sao Beliebers, e a filha do meu pai e Belieber", "timestamp": "Thu Dec 06 16:53:15 PST 2012" }
-{ "id": "nc1:106", "username": "GulfDogs", "location": "", "text": "Obama Admin Knew Libyan Terrorists Had US-Provided Weaponsteaparty #tcot #ccot #NewGuards #BreitbartArmy #patriotwttp://t.co/vJxzrQUE", "timestamp": "Thu Dec 06 16:53:14 PST 2012" }
-{ "id": "nc1:108", "username": "Laugzpz", "location": "", "text": "@AlfredoJalife Maestro Obama se hace de la vista gorda, es un acuerdo de siempre creo yo.", "timestamp": "Thu Dec 06 16:53:14 PST 2012" }
-{ "id": "nc1:11", "username": "magarika", "location": "", "text": "RT @ken24xavier: Obama tells SOROS - our plan is ALMOST finished http://t.co/WvzK0GtU", "timestamp": "Thu Dec 06 16:53:05 PST 2012" }
-{ "id": "nc1:111", "username": "ToucanMall", "location": "", "text": "RT @WorldWar3Watch: Michelle Obama Gets More Grammy Nominations Than Justin ...  #Obama #WW3 http://t.co/0Wv2GKij", "timestamp": "Thu Dec 06 16:53:13 PST 2012" }
-{ "id": "nc1:113", "username": "ToucanMall", "location": "", "text": "RT @ObamaPalooza: Tiffany Shared What $2,000 Meant to Her ... and the President Stopped by to Talk About It http://t.co/sgT7lsNV #Obama", "timestamp": "Thu Dec 06 16:53:12 PST 2012" }
-{ "id": "nc1:115", "username": "thewildpitch", "location": "", "text": "RT @RevkahJC: Dennis Miller: Obama Should Just Say He Wants To Tax Successful People http://t.co/Ihlemy9Y", "timestamp": "Thu Dec 06 16:53:11 PST 2012" }
-{ "id": "nc1:117", "username": "Rnugent24", "location": "", "text": "RT @ConservativeQuo: unemployment is above 8% again. I wonder how long it will take for Obama to start blaming Bush? 3-2-1 #tcot #antiobama", "timestamp": "Thu Dec 06 16:53:10 PST 2012" }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/CNNFeedAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/CNNFeedAdapterFactory.java
index 68eac48..465a226 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/CNNFeedAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/CNNFeedAdapterFactory.java
@@ -14,22 +14,70 @@
  */
 package edu.uci.ics.asterix.external.adapter.factory;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import edu.uci.ics.asterix.external.dataset.adapter.CNNFeedAdapter;
+import edu.uci.ics.asterix.metadata.feeds.IAdapterFactory;
 import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
-import edu.uci.ics.asterix.metadata.feeds.ITypedDatasetAdapterFactory;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 
 /**
- * A factory class for creating the @see {CNNFeedAdapter}.  
+ * A factory class for creating the @see {CNNFeedAdapter}.
  */
-public class CNNFeedAdapterFactory implements ITypedDatasetAdapterFactory {
+public class CNNFeedAdapterFactory implements IAdapterFactory {
     private static final long serialVersionUID = 1L;
 
+    private Map<String, Object> configuration;
+
+    private List<String> feedURLs = new ArrayList<String>();
+    private static Map<String, String> topicFeeds = new HashMap<String, String>();
+
+    public static final String KEY_RSS_URL = "topic";
+    public static final String KEY_INTERVAL = "interval";
+    public static final String TOP_STORIES = "topstories";
+    public static final String WORLD = "world";
+    public static final String US = "us";
+    public static final String SPORTS = "sports";
+    public static final String BUSINESS = "business";
+    public static final String POLITICS = "politics";
+    public static final String CRIME = "crime";
+    public static final String TECHNOLOGY = "technology";
+    public static final String HEALTH = "health";
+    public static final String ENTERNTAINMENT = "entertainemnt";
+    public static final String TRAVEL = "travel";
+    public static final String LIVING = "living";
+    public static final String VIDEO = "video";
+    public static final String STUDENT = "student";
+    public static final String POPULAR = "popular";
+    public static final String RECENT = "recent";
+
+    private void initTopics() {
+        topicFeeds.put(TOP_STORIES, "http://rss.cnn.com/rss/cnn_topstories.rss");
+        topicFeeds.put(WORLD, "http://rss.cnn.com/rss/cnn_world.rss");
+        topicFeeds.put(US, "http://rss.cnn.com/rss/cnn_us.rss");
+        topicFeeds.put(SPORTS, "http://rss.cnn.com/rss/si_topstories.rss");
+        topicFeeds.put(BUSINESS, "http://rss.cnn.com/rss/money_latest.rss");
+        topicFeeds.put(POLITICS, "http://rss.cnn.com/rss/cnn_allpolitics.rss");
+        topicFeeds.put(CRIME, "http://rss.cnn.com/rss/cnn_crime.rss");
+        topicFeeds.put(TECHNOLOGY, "http://rss.cnn.com/rss/cnn_tech.rss");
+        topicFeeds.put(HEALTH, "http://rss.cnn.com/rss/cnn_health.rss");
+        topicFeeds.put(ENTERNTAINMENT, "http://rss.cnn.com/rss/cnn_showbiz.rss");
+        topicFeeds.put(LIVING, "http://rss.cnn.com/rss/cnn_living.rss");
+        topicFeeds.put(VIDEO, "http://rss.cnn.com/rss/cnn_freevideo.rss");
+        topicFeeds.put(TRAVEL, "http://rss.cnn.com/rss/cnn_travel.rss");
+        topicFeeds.put(STUDENT, "http://rss.cnn.com/rss/cnn_studentnews.rss");
+        topicFeeds.put(POPULAR, "http://rss.cnn.com/rss/cnn_mostpopular.rss");
+        topicFeeds.put(RECENT, "http://rss.cnn.com/rss/cnn_latest.rss");
+    }
+
     @Override
-    public IDatasourceAdapter createAdapter(Map<String, Object> configuration) throws Exception {
-        CNNFeedAdapter cnnFeedAdapter = new CNNFeedAdapter();
-        cnnFeedAdapter.configure(configuration);
+    public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx) throws Exception {
+        CNNFeedAdapter cnnFeedAdapter = new CNNFeedAdapter(configuration, ctx);
         return cnnFeedAdapter;
     }
 
@@ -38,4 +86,53 @@
         return "cnn_feed";
     }
 
+    @Override
+    public AdapterType getAdapterType() {
+        return AdapterType.TYPED;
+    }
+
+    @Override
+    public void configure(Map<String, Object> configuration) throws Exception {
+        this.configuration = configuration;
+        String rssURLProperty = (String) configuration.get(KEY_RSS_URL);
+        if (rssURLProperty == null) {
+            throw new IllegalArgumentException("no rss url provided");
+        }
+        initializeFeedURLs(rssURLProperty);
+    }
+
+    private void initializeFeedURLs(String rssURLProperty) {
+        feedURLs.clear();
+        String[] rssTopics = rssURLProperty.split(",");
+        initTopics();
+        for (String topic : rssTopics) {
+            String feedURL = topicFeeds.get(topic);
+            if (feedURL == null) {
+                throw new IllegalArgumentException(" unknown topic :" + topic + " please choose from the following "
+                        + getValidTopics());
+            }
+            feedURLs.add(feedURL);
+        }
+    }
+
+    private static String getValidTopics() {
+        StringBuilder builder = new StringBuilder();
+        for (String key : topicFeeds.keySet()) {
+            builder.append(key);
+            builder.append(" ");
+        }
+        return new String(builder);
+    }
+
+    @Override
+    public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
+        AlgebricksPartitionConstraint partitionConstraint = new AlgebricksCountPartitionConstraint(feedURLs.size());
+        return new AlgebricksCountPartitionConstraint(1);
+    }
+
+    @Override
+    public SupportedOperation getSupportedOperations() {
+        return SupportedOperation.READ;
+    }
+
 }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java
index 15f702b..5618ef8 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java
@@ -14,6 +14,7 @@
  */
 package edu.uci.ics.asterix.external.adapter.factory;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
@@ -23,10 +24,14 @@
 
 import edu.uci.ics.asterix.external.dataset.adapter.HDFSAdapter;
 import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
-import edu.uci.ics.asterix.metadata.feeds.IGenericDatasetAdapterFactory;
 import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
+import edu.uci.ics.asterix.om.util.AsterixClusterProperties;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.api.context.ICCContext;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.hdfs.dataflow.ConfFactory;
 import edu.uci.ics.hyracks.hdfs.dataflow.InputSplitsFactory;
 import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
@@ -35,7 +40,7 @@
  * A factory class for creating an instance of HDFSAdapter
  */
 @SuppressWarnings("deprecation")
-public class HDFSAdapterFactory implements IGenericDatasetAdapterFactory {
+public class HDFSAdapterFactory extends FileSystemAdapterFactory {
     private static final long serialVersionUID = 1L;
 
     public static final String HDFS_ADAPTER_NAME = "hdfs";
@@ -53,7 +58,21 @@
     private boolean executed[];
     private InputSplitsFactory inputSplitsFactory;
     private ConfFactory confFactory;
-    private boolean setup = false;
+    private IAType atype;
+    private boolean configured = false;
+    public static Scheduler hdfsScheduler = initializeHDFSScheduler();
+
+    private static Scheduler initializeHDFSScheduler() {
+        ICCContext ccContext = AsterixAppContextInfo.getInstance().getCCApplicationContext().getCCContext();
+        Scheduler scheduler = null;
+        try {
+            scheduler = new Scheduler(ccContext.getClusterControllerInfo().getClientNetAddress(), ccContext
+                    .getClusterControllerInfo().getClientNetPort());
+        } catch (HyracksException e) {
+            throw new IllegalStateException("Cannot obtain hdfs scheduler");
+        }
+        return scheduler;
+    }
 
     private static final Map<String, String> formatClassNames = initInputFormatMap();
 
@@ -65,30 +84,12 @@
     }
 
     @Override
-    public IDatasourceAdapter createAdapter(Map<String, Object> configuration, IAType atype) throws Exception {
-        if (!setup) {
-            /** set up the factory --serializable stuff --- this if-block should be called only once for each factory instance */
-            configureJobConf(configuration);
-            JobConf conf = configureJobConf(configuration);
-            confFactory = new ConfFactory(conf);
-
-            clusterLocations = (AlgebricksPartitionConstraint) configuration.get(CLUSTER_LOCATIONS);
-            int numPartitions = ((AlgebricksAbsolutePartitionConstraint) clusterLocations).getLocations().length;
-
-            InputSplit[] inputSplits = conf.getInputFormat().getSplits(conf, numPartitions);
-            inputSplitsFactory = new InputSplitsFactory(inputSplits);
-
-            Scheduler scheduler = (Scheduler) configuration.get(SCHEDULER);
-            readSchedule = scheduler.getLocationConstraints(inputSplits);
-            executed = new boolean[readSchedule.length];
-            Arrays.fill(executed, false);
-
-            setup = true;
-        }
+    public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx) throws Exception {
         JobConf conf = confFactory.getConf();
         InputSplit[] inputSplits = inputSplitsFactory.getSplits();
-        HDFSAdapter hdfsAdapter = new HDFSAdapter(atype, readSchedule, executed, inputSplits, conf, clusterLocations);
-        hdfsAdapter.configure(configuration);
+        String nodeName = ctx.getJobletContext().getApplicationContext().getNodeId();
+        HDFSAdapter hdfsAdapter = new HDFSAdapter(atype, readSchedule, executed, inputSplits, conf, nodeName,
+                parserFactory, ctx);
         return hdfsAdapter;
     }
 
@@ -108,4 +109,60 @@
         return conf;
     }
 
+    @Override
+    public AdapterType getAdapterType() {
+        return AdapterType.GENERIC;
+    }
+
+    @Override
+    public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
+        if (!configured) {
+            throw new IllegalStateException("Adapter factory has not been configured yet");
+        }
+        return (AlgebricksPartitionConstraint) clusterLocations;
+    }
+
+    @Override
+    public void configure(Map<String, Object> configuration) throws Exception {
+        this.configuration = configuration;
+        JobConf conf = configureJobConf(configuration);
+        confFactory = new ConfFactory(conf);
+
+        clusterLocations = getClusterLocations();
+        int numPartitions = ((AlgebricksAbsolutePartitionConstraint) clusterLocations).getLocations().length;
+
+        InputSplit[] inputSplits = conf.getInputFormat().getSplits(conf, numPartitions);
+        inputSplitsFactory = new InputSplitsFactory(inputSplits);
+
+        readSchedule = hdfsScheduler.getLocationConstraints(inputSplits);
+        executed = new boolean[readSchedule.length];
+        Arrays.fill(executed, false);
+        configured = true;
+
+        atype = (IAType) configuration.get(KEY_SOURCE_DATATYPE);
+        configureFormat(atype);
+    }
+
+    @Override
+    public SupportedOperation getSupportedOperations() {
+        return SupportedOperation.READ;
+    }
+
+    private static AlgebricksPartitionConstraint getClusterLocations() {
+        ArrayList<String> locs = new ArrayList<String>();
+        Map<String, String[]> stores = AsterixAppContextInfo.getInstance().getMetadataProperties().getStores();
+        for (String i : stores.keySet()) {
+            String[] nodeStores = stores.get(i);
+            int numIODevices = AsterixClusterProperties.INSTANCE.getNumberOfIODevices(i);
+            for (int j = 0; j < nodeStores.length; j++) {
+                for (int k = 0; k < numIODevices; k++) {
+                    locs.add(i);
+                }
+            }
+        }
+        String[] cluster = new String[locs.size()];
+        cluster = locs.toArray(cluster);
+        return new AlgebricksAbsolutePartitionConstraint(cluster);
+    }
+
 }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java
index 4fd9411..16e6ef7 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java
@@ -23,11 +23,13 @@
 
 import edu.uci.ics.asterix.external.dataset.adapter.HDFSAdapter;
 import edu.uci.ics.asterix.external.dataset.adapter.HiveAdapter;
+import edu.uci.ics.asterix.metadata.feeds.IAdapterFactory;
+import edu.uci.ics.asterix.metadata.feeds.IAdapterFactory.AdapterType;
 import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
-import edu.uci.ics.asterix.metadata.feeds.IGenericDatasetAdapterFactory;
 import edu.uci.ics.asterix.om.types.IAType;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.hdfs.dataflow.ConfFactory;
 import edu.uci.ics.hyracks.hdfs.dataflow.InputSplitsFactory;
 import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
@@ -36,7 +38,7 @@
  * A factory class for creating an instance of HiveAdapter
  */
 @SuppressWarnings("deprecation")
-public class HiveAdapterFactory implements IGenericDatasetAdapterFactory {
+public class HiveAdapterFactory extends FileSystemAdapterFactory {
     private static final long serialVersionUID = 1L;
 
     public static final String HDFS_ADAPTER_NAME = "hdfs";
@@ -66,7 +68,8 @@
     private InputSplitsFactory inputSplitsFactory;
     private ConfFactory confFactory;
     private transient AlgebricksPartitionConstraint clusterLocations;
-    private boolean setup = false;
+    private boolean configured = false;
+    private IAType atype;
 
     private static final Map<String, String> formatClassNames = initInputFormatMap();
 
@@ -78,8 +81,33 @@
     }
 
     @Override
-    public IDatasourceAdapter createAdapter(Map<String, Object> configuration, IAType atype) throws Exception {
-        if (!setup) {
+    public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx) throws Exception {
+        JobConf conf = confFactory.getConf();
+        InputSplit[] inputSplits = inputSplitsFactory.getSplits();
+        String nodeName = ctx.getJobletContext().getApplicationContext().getNodeId();
+        HiveAdapter hiveAdapter = new HiveAdapter(atype, readSchedule, executed, inputSplits, conf, clusterLocations,
+                nodeName, parserFactory, ctx);
+        return hiveAdapter;
+    }
+
+    @Override
+    public String getName() {
+        return "hive";
+    }
+
+    @Override
+    public AdapterType getAdapterType() {
+        return AdapterType.GENERIC;
+    }
+
+    @Override
+    public SupportedOperation getSupportedOperations() {
+        return SupportedOperation.READ;
+    }
+
+    @Override
+    public void configure(Map<String, Object> configuration) throws Exception {
+        if (!configured) {
             /** set up the factory --serializable stuff --- this if-block should be called only once for each factory instance */
             configureJobConf(configuration);
             JobConf conf = configureJobConf(configuration);
@@ -96,18 +124,11 @@
             executed = new boolean[readSchedule.length];
             Arrays.fill(executed, false);
 
-            setup = true;
+            atype = (IAType) configuration.get(KEY_SOURCE_DATATYPE);
+            configureFormat(atype);
+            configured = true;
         }
-        JobConf conf = confFactory.getConf();
-        InputSplit[] inputSplits = inputSplitsFactory.getSplits();
-        HiveAdapter hiveAdapter = new HiveAdapter(atype, readSchedule, executed, inputSplits, conf, clusterLocations);
-        hiveAdapter.configure(configuration);
-        return hiveAdapter;
-    }
 
-    @Override
-    public String getName() {
-        return "hive";
     }
 
     private JobConf configureJobConf(Map<String, Object> configuration) throws Exception {
@@ -122,7 +143,7 @@
             tablePath = configuration.get(HIVE_WAREHOUSE_DIR) + "/" + tablePath + ".db" + "/"
                     + configuration.get(HIVE_TABLE);
         }
-        configuration.put(HDFSAdapter.KEY_PATH, tablePath);
+        configuration.put(HDFSAdapterFactory.KEY_PATH, tablePath);
         if (!configuration.get(KEY_FORMAT).equals(FORMAT_DELIMITED_TEXT)) {
             throw new IllegalArgumentException("format" + configuration.get(KEY_FORMAT) + " is not supported");
         }
@@ -142,4 +163,11 @@
                 (String) formatClassNames.get(((String) configuration.get(KEY_INPUT_FORMAT)).trim()));
         return conf;
     }
+
+    @Override
+    public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
 }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java
index 02f04e8..3d8bedf 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java
@@ -14,26 +14,35 @@
  */
 package edu.uci.ics.asterix.external.adapter.factory;
 
+import java.io.File;
 import java.util.Map;
 
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
 import edu.uci.ics.asterix.external.dataset.adapter.NCFileSystemAdapter;
 import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
-import edu.uci.ics.asterix.metadata.feeds.IGenericDatasetAdapterFactory;
 import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
 
 /**
  * Factory class for creating an instance of NCFileSystemAdapter. An
  * NCFileSystemAdapter reads external data residing on the local file system of
  * an NC.
  */
-public class NCFileSystemAdapterFactory implements IGenericDatasetAdapterFactory {
+public class NCFileSystemAdapterFactory extends FileSystemAdapterFactory {
     private static final long serialVersionUID = 1L;
+
     public static final String NC_FILE_SYSTEM_ADAPTER_NAME = "localfs";
 
+    private IAType sourceDatatype;
+    private FileSplit[] fileSplits;
+
     @Override
-    public IDatasourceAdapter createAdapter(Map<String, Object> configuration, IAType atype) throws Exception {
-        NCFileSystemAdapter fsAdapter = new NCFileSystemAdapter(atype);
-        fsAdapter.configure(configuration);
+    public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx) throws Exception {
+        NCFileSystemAdapter fsAdapter = new NCFileSystemAdapter(fileSplits, parserFactory, sourceDatatype, ctx);
         return fsAdapter;
     }
 
@@ -41,4 +50,60 @@
     public String getName() {
         return NC_FILE_SYSTEM_ADAPTER_NAME;
     }
+
+    @Override
+    public AdapterType getAdapterType() {
+        return AdapterType.GENERIC;
+    }
+
+    @Override
+    public SupportedOperation getSupportedOperations() {
+        return SupportedOperation.READ;
+    }
+
+    @Override
+    public void configure(Map<String, Object> configuration) throws Exception {
+        this.configuration = configuration;
+        String[] splits = ((String) configuration.get(KEY_PATH)).split(",");
+        IAType sourceDatatype = (IAType) configuration.get(KEY_SOURCE_DATATYPE);
+        configureFileSplits(splits);
+        configureFormat(sourceDatatype);
+    }
+
+    @Override
+    public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
+        return configurePartitionConstraint();
+    }
+
+    private void configureFileSplits(String[] splits) throws AsterixException {
+        if (fileSplits == null) {
+            fileSplits = new FileSplit[splits.length];
+            String nodeName;
+            String nodeLocalPath;
+            int count = 0;
+            String trimmedValue;
+            for (String splitPath : splits) {
+                trimmedValue = splitPath.trim();
+                if (!trimmedValue.contains("://")) {
+                    throw new AsterixException("Invalid path: " + splitPath
+                            + "\nUsage- path=\"Host://Absolute File Path\"");
+                }
+                nodeName = trimmedValue.split(":")[0];
+                nodeLocalPath = trimmedValue.split("://")[1];
+                FileSplit fileSplit = new FileSplit(nodeName, new FileReference(new File(nodeLocalPath)));
+                fileSplits[count++] = fileSplit;
+            }
+        }
+    }
+
+    private AlgebricksPartitionConstraint configurePartitionConstraint() throws AsterixException {
+        String[] locs = new String[fileSplits.length];
+        String location;
+        for (int i = 0; i < fileSplits.length; i++) {
+            location = getNodeResolver().resolveNode(fileSplits[i].getNodeName());
+            locs[i] = location;
+        }
+        return new AlgebricksAbsolutePartitionConstraint(locs);
+    }
+
 }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedTwitterAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedTwitterAdapterFactory.java
index bd181e5..5937013 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedTwitterAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedTwitterAdapterFactory.java
@@ -17,23 +17,26 @@
 import java.util.Map;
 
 import edu.uci.ics.asterix.external.dataset.adapter.PullBasedTwitterAdapter;
+import edu.uci.ics.asterix.metadata.feeds.IAdapterFactory;
 import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
-import edu.uci.ics.asterix.metadata.feeds.ITypedDatasetAdapterFactory;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 
 /**
  * Factory class for creating an instance of PullBasedTwitterAdapter.
  * This adapter provides the functionality of fetching tweets from Twitter service
  * via pull-based Twitter API.
  */
-public class PullBasedTwitterAdapterFactory implements ITypedDatasetAdapterFactory {
+public class PullBasedTwitterAdapterFactory implements IAdapterFactory {
     private static final long serialVersionUID = 1L;
     public static final String PULL_BASED_TWITTER_ADAPTER_NAME = "pull_twitter";
 
+    private Map<String, Object> configuration;
+
     @Override
-    public IDatasourceAdapter createAdapter(Map<String, Object> configuration) throws Exception {
-        PullBasedTwitterAdapter twitterAdapter = new PullBasedTwitterAdapter();
-        twitterAdapter.configure(configuration);
-        return twitterAdapter;
+    public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx) throws Exception {
+        return new PullBasedTwitterAdapter(configuration, ctx);
     }
 
     @Override
@@ -41,4 +44,24 @@
         return PULL_BASED_TWITTER_ADAPTER_NAME;
     }
 
+    @Override
+    public AdapterType getAdapterType() {
+        return AdapterType.TYPED;
+    }
+
+    @Override
+    public SupportedOperation getSupportedOperations() {
+        return SupportedOperation.READ;
+    }
+
+    @Override
+    public void configure(Map<String, Object> configuration) throws Exception {
+        this.configuration = configuration;
+    }
+
+    @Override
+    public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
+        return new AlgebricksCountPartitionConstraint(1);
+    }
+
 }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/RSSFeedAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/RSSFeedAdapterFactory.java
index 6e732ba..d904af9 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/RSSFeedAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/RSSFeedAdapterFactory.java
@@ -14,24 +14,38 @@
  */
 package edu.uci.ics.asterix.external.adapter.factory;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 
 import edu.uci.ics.asterix.external.dataset.adapter.RSSFeedAdapter;
+import edu.uci.ics.asterix.metadata.feeds.IAdapterFactory;
 import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
-import edu.uci.ics.asterix.metadata.feeds.ITypedDatasetAdapterFactory;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 
 /**
  * Factory class for creating an instance of @see {RSSFeedAdapter}.
  * RSSFeedAdapter provides the functionality of fetching an RSS based feed.
  */
-public class RSSFeedAdapterFactory implements ITypedDatasetAdapterFactory {
+public class RSSFeedAdapterFactory implements IAdapterFactory {
     private static final long serialVersionUID = 1L;
     public static final String RSS_FEED_ADAPTER_NAME = "rss_feed";
 
+    public static final String KEY_RSS_URL = "url";
+    public static final String KEY_INTERVAL = "interval";
+
+    private Map<String, Object> configuration;
+    private ARecordType recordType;
+    private List<String> feedURLs = new ArrayList<String>();
+
     @Override
-    public IDatasourceAdapter createAdapter(Map<String, Object> configuration) throws Exception {
-        RSSFeedAdapter rssFeedAdapter = new RSSFeedAdapter();
-        rssFeedAdapter.configure(configuration);
+    public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx) throws Exception {
+        RSSFeedAdapter rssFeedAdapter = new RSSFeedAdapter(configuration, ctx);
         return rssFeedAdapter;
     }
 
@@ -40,4 +54,46 @@
         return "rss_feed";
     }
 
+    @Override
+    public AdapterType getAdapterType() {
+        return AdapterType.TYPED;
+    }
+
+    @Override
+    public SupportedOperation getSupportedOperations() {
+        return SupportedOperation.READ;
+    }
+
+    @Override
+    public void configure(Map<String, Object> configuration) throws Exception {
+        this.configuration = configuration;
+        String rssURLProperty = (String) configuration.get(KEY_RSS_URL);
+        if (rssURLProperty == null) {
+            throw new IllegalArgumentException("no rss url provided");
+        }
+        initializeFeedURLs(rssURLProperty);
+        configurePartitionConstraints();
+        recordType = new ARecordType("FeedRecordType", new String[] { "id", "title", "description", "link" },
+                new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING },
+                false);
+
+    }
+
+    @Override
+    public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
+        return new AlgebricksCountPartitionConstraint(feedURLs.size());
+    }
+
+    private void initializeFeedURLs(String rssURLProperty) {
+        feedURLs.clear();
+        String[] feedURLProperty = rssURLProperty.split(",");
+        for (String feedURL : feedURLProperty) {
+            feedURLs.add(feedURL);
+        }
+    }
+
+    protected void configurePartitionConstraints() {
+
+    }
+
 }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/CNNFeedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/CNNFeedAdapter.java
index cfcfd74..5debc53 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/CNNFeedAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/CNNFeedAdapter.java
@@ -14,95 +14,23 @@
  */
 package edu.uci.ics.asterix.external.dataset.adapter;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
 import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
-import edu.uci.ics.asterix.metadata.feeds.IManagedFeedAdapter;
+import edu.uci.ics.asterix.metadata.feeds.IFeedAdapter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 
 /**
  * An Adapter that provides the functionality of fetching news feed from CNN service
  * The Adapter provides news feed as ADM records.
  */
-public class CNNFeedAdapter extends RSSFeedAdapter implements IDatasourceAdapter, IManagedFeedAdapter {
+public class CNNFeedAdapter extends RSSFeedAdapter implements IDatasourceAdapter, IFeedAdapter {
 
     private static final long serialVersionUID = 2523303758114582251L;
-    private List<String> feedURLs = new ArrayList<String>();
-    private static Map<String, String> topicFeeds = new HashMap<String, String>();
 
-    public static final String KEY_RSS_URL = "topic";
-    public static final String KEY_INTERVAL = "interval";
-    public static final String TOP_STORIES = "topstories";
-    public static final String WORLD = "world";
-    public static final String US = "us";
-    public static final String SPORTS = "sports";
-    public static final String BUSINESS = "business";
-    public static final String POLITICS = "politics";
-    public static final String CRIME = "crime";
-    public static final String TECHNOLOGY = "technology";
-    public static final String HEALTH = "health";
-    public static final String ENTERNTAINMENT = "entertainemnt";
-    public static final String TRAVEL = "travel";
-    public static final String LIVING = "living";
-    public static final String VIDEO = "video";
-    public static final String STUDENT = "student";
-    public static final String POPULAR = "popular";
-    public static final String RECENT = "recent";
-
-    private void initTopics() {
-        topicFeeds.put(TOP_STORIES, "http://rss.cnn.com/rss/cnn_topstories.rss");
-        topicFeeds.put(WORLD, "http://rss.cnn.com/rss/cnn_world.rss");
-        topicFeeds.put(US, "http://rss.cnn.com/rss/cnn_us.rss");
-        topicFeeds.put(SPORTS, "http://rss.cnn.com/rss/si_topstories.rss");
-        topicFeeds.put(BUSINESS, "http://rss.cnn.com/rss/money_latest.rss");
-        topicFeeds.put(POLITICS, "http://rss.cnn.com/rss/cnn_allpolitics.rss");
-        topicFeeds.put(CRIME, "http://rss.cnn.com/rss/cnn_crime.rss");
-        topicFeeds.put(TECHNOLOGY, "http://rss.cnn.com/rss/cnn_tech.rss");
-        topicFeeds.put(HEALTH, "http://rss.cnn.com/rss/cnn_health.rss");
-        topicFeeds.put(ENTERNTAINMENT, "http://rss.cnn.com/rss/cnn_showbiz.rss");
-        topicFeeds.put(LIVING, "http://rss.cnn.com/rss/cnn_living.rss");
-        topicFeeds.put(VIDEO, "http://rss.cnn.com/rss/cnn_freevideo.rss");
-        topicFeeds.put(TRAVEL, "http://rss.cnn.com/rss/cnn_travel.rss");
-        topicFeeds.put(STUDENT, "http://rss.cnn.com/rss/cnn_studentnews.rss");
-        topicFeeds.put(POPULAR, "http://rss.cnn.com/rss/cnn_mostpopular.rss");
-        topicFeeds.put(RECENT, "http://rss.cnn.com/rss/cnn_latest.rss");
-    }
-
-    @Override
-    public void configure(Map<String, Object> arguments) throws Exception {
-        configuration = arguments;
-        String rssURLProperty = (String) configuration.get(KEY_RSS_URL);
-        if (rssURLProperty == null) {
-            throw new IllegalArgumentException("no rss url provided");
-        }
-        initializeFeedURLs(rssURLProperty);
-        configurePartitionConstraints();
-
-    }
-
-    private void initializeFeedURLs(String rssURLProperty) {
-        feedURLs.clear();
-        String[] rssTopics = rssURLProperty.split(",");
-        initTopics();
-        for (String topic : rssTopics) {
-            String feedURL = topicFeeds.get(topic);
-            if (feedURL == null) {
-                throw new IllegalArgumentException(" unknown topic :" + topic + " please choose from the following "
-                        + getValidTopics());
-            }
-            feedURLs.add(feedURL);
-        }
-    }
-
-    private static String getValidTopics() {
-        StringBuilder builder = new StringBuilder();
-        for (String key : topicFeeds.keySet()) {
-            builder.append(key);
-            builder.append(" ");
-        }
-        return new String(builder);
+    public CNNFeedAdapter(Map<String, Object> configuration, IHyracksTaskContext ctx) throws AsterixException {
+        super(configuration, ctx);
     }
 
 }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/FileSystemBasedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/FileSystemBasedAdapter.java
index 6ce18fa..1897272 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/FileSystemBasedAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/FileSystemBasedAdapter.java
@@ -16,145 +16,36 @@
 
 import java.io.IOException;
 import java.io.InputStream;
-import java.util.Map;
-import java.util.logging.Level;
-import java.util.logging.Logger;
 
-import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.external.util.DNSResolverFactory;
-import edu.uci.ics.asterix.external.util.INodeResolver;
-import edu.uci.ics.asterix.external.util.INodeResolverFactory;
-import edu.uci.ics.asterix.metadata.feeds.AbstractDatasourceAdapter;
-import edu.uci.ics.asterix.om.types.ARecordType;
-import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
 import edu.uci.ics.asterix.om.types.IAType;
-import edu.uci.ics.asterix.runtime.operators.file.AdmSchemafullRecordParserFactory;
-import edu.uci.ics.asterix.runtime.operators.file.NtDelimitedDataTupleParserFactory;
-import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
-import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
 import edu.uci.ics.hyracks.dataflow.std.file.ITupleParser;
 import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
 
-public abstract class FileSystemBasedAdapter extends AbstractDatasourceAdapter {
+public abstract class FileSystemBasedAdapter implements IDatasourceAdapter {
 
     private static final long serialVersionUID = 1L;
 
     public static final String NODE_RESOLVER_FACTORY_PROPERTY = "node.Resolver";
-    public static final String KEY_DELIMITER = "delimiter";
-    public static final String KEY_PATH = "path";
-
-    protected ITupleParserFactory parserFactory;
-    protected ITupleParser parser;
-    protected static INodeResolver nodeResolver;
-
-    private static final INodeResolver DEFAULT_NODE_RESOLVER = new DNSResolverFactory().createNodeResolver();
-    private static final Logger LOGGER = Logger.getLogger(FileSystemBasedAdapter.class.getName());
 
     public abstract InputStream getInputStream(int partition) throws IOException;
 
-    public FileSystemBasedAdapter(IAType atype) {
-        this.atype = atype;
+    protected final ITupleParser tupleParser;
+    protected final IAType sourceDatatype;
+    protected IHyracksTaskContext ctx;
+
+    public FileSystemBasedAdapter(ITupleParserFactory parserFactory, IAType sourceDatatype, IHyracksTaskContext ctx) {
+        this.tupleParser = parserFactory.createTupleParser(ctx);
+        this.sourceDatatype = sourceDatatype;
+        this.ctx = ctx;
     }
 
     @Override
     public void start(int partition, IFrameWriter writer) throws Exception {
         InputStream in = getInputStream(partition);
-        parser = getTupleParser();
-        parser.parse(in, writer);
+        tupleParser.parse(in, writer);
     }
 
-    @Override
-    public abstract void initialize(IHyracksTaskContext ctx) throws Exception;
-
-    @Override
-    public abstract void configure(Map<String, Object> arguments) throws Exception;
-
-    @Override
-    public abstract AdapterType getAdapterType();
-
-    @Override
-    public abstract AlgebricksPartitionConstraint getPartitionConstraint() throws Exception;
-
-    protected ITupleParser getTupleParser() throws Exception {
-        return parserFactory.createTupleParser(ctx);
-    }
-
-    protected void configureFormat() throws Exception {
-        String parserFactoryClassname = (String) configuration.get(KEY_PARSER_FACTORY);
-        if (parserFactoryClassname == null) {
-            String specifiedFormat = (String) configuration.get(KEY_FORMAT);
-            if (specifiedFormat == null) {
-                throw new IllegalArgumentException(" Unspecified data format");
-            } else if (FORMAT_DELIMITED_TEXT.equalsIgnoreCase(specifiedFormat)) {
-                parserFactory = getDelimitedDataTupleParserFactory((ARecordType) atype);
-            } else if (FORMAT_ADM.equalsIgnoreCase((String)configuration.get(KEY_FORMAT))) {
-                parserFactory = getADMDataTupleParserFactory((ARecordType) atype);
-            } else {
-                throw new IllegalArgumentException(" format " + configuration.get(KEY_FORMAT) + " not supported");
-            }
-        } else {
-            parserFactory = (ITupleParserFactory) Class.forName(parserFactoryClassname).newInstance();
-        }
-
-    }
-
-    protected ITupleParserFactory getDelimitedDataTupleParserFactory(ARecordType recordType) throws AsterixException {
-        int n = recordType.getFieldTypes().length;
-        IValueParserFactory[] fieldParserFactories = new IValueParserFactory[n];
-        for (int i = 0; i < n; i++) {
-            ATypeTag tag = recordType.getFieldTypes()[i].getTypeTag();
-            IValueParserFactory vpf = typeToValueParserFactMap.get(tag);
-            if (vpf == null) {
-                throw new NotImplementedException("No value parser factory for delimited fields of type " + tag);
-            }
-            fieldParserFactories[i] = vpf;
-        }
-        String delimiterValue = (String) configuration.get(KEY_DELIMITER);
-        if (delimiterValue != null && delimiterValue.length() > 1) {
-            throw new AsterixException("improper delimiter");
-        }
-
-        Character delimiter = delimiterValue.charAt(0);
-        return new NtDelimitedDataTupleParserFactory(recordType, fieldParserFactories, delimiter);
-    }
-
-    protected ITupleParserFactory getADMDataTupleParserFactory(ARecordType recordType) throws AsterixException {
-        try {
-            return new AdmSchemafullRecordParserFactory(recordType);
-        } catch (Exception e) {
-            throw new AsterixException(e);
-        }
-
-    }
-
-    protected INodeResolver getNodeResolver() {
-        if (nodeResolver == null) {
-            nodeResolver = initNodeResolver();
-        }
-        return nodeResolver;
-    }
-
-    private static INodeResolver initNodeResolver() {
-        INodeResolver nodeResolver = null;
-        String configuredNodeResolverFactory = System.getProperty(NODE_RESOLVER_FACTORY_PROPERTY);
-        if (configuredNodeResolverFactory != null) {
-            try {
-                nodeResolver = ((INodeResolverFactory) (Class.forName(configuredNodeResolverFactory).newInstance()))
-                        .createNodeResolver();
-
-            } catch (Exception e) {
-                if (LOGGER.isLoggable(Level.WARNING)) {
-                    LOGGER.log(Level.WARNING, "Unable to create node resolver from the configured classname "
-                            + configuredNodeResolverFactory + "\n" + e.getMessage());
-                }
-                nodeResolver = DEFAULT_NODE_RESOLVER;
-            }
-        } else {
-            nodeResolver = DEFAULT_NODE_RESOLVER;
-        }
-        return nodeResolver;
-    }
 }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java
index f8b381b..11f44ea 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java
@@ -16,7 +16,6 @@
 
 import java.io.IOException;
 import java.io.InputStream;
-import java.util.Map;
 
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.Counters.Counter;
@@ -31,6 +30,7 @@
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
 
 /**
  * Provides functionality for fetching external data stored in an HDFS instance.
@@ -44,34 +44,16 @@
     private transient boolean executed[];
     private transient InputSplit[] inputSplits;
     private transient JobConf conf;
-    private transient AlgebricksPartitionConstraint clusterLocations;
-
     private transient String nodeName;
 
     public HDFSAdapter(IAType atype, String[] readSchedule, boolean[] executed, InputSplit[] inputSplits, JobConf conf,
-            AlgebricksPartitionConstraint clusterLocations) {
-        super(atype);
+            String nodeName, ITupleParserFactory parserFactory, IHyracksTaskContext ctx) {
+        super(parserFactory, atype, ctx);
         this.readSchedule = readSchedule;
         this.executed = executed;
         this.inputSplits = inputSplits;
         this.conf = conf;
-        this.clusterLocations = clusterLocations;
-    }
-
-    @Override
-    public void configure(Map<String, Object> arguments) throws Exception {
-        this.configuration = arguments;
-        configureFormat();
-    }
-
-    public AdapterType getAdapterType() {
-        return AdapterType.READ_WRITE;
-    }
-
-    @Override
-    public void initialize(IHyracksTaskContext ctx) throws Exception {
-        this.ctx = ctx;
-        this.nodeName = ctx.getJobletContext().getApplicationContext().getNodeId();
+        this.nodeName = nodeName;
     }
 
     private Reporter getReporter() {
@@ -227,9 +209,4 @@
 
     }
 
-    @Override
-    public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
-        return clusterLocations;
-    }
-
 }
\ No newline at end of file
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HiveAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HiveAdapter.java
index 39484a6..8e5adfc 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HiveAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HiveAdapter.java
@@ -14,21 +14,22 @@
  */
 package edu.uci.ics.asterix.external.dataset.adapter;
 
-import java.util.Map;
+import java.io.IOException;
+import java.io.InputStream;
 
-import edu.uci.ics.asterix.metadata.feeds.AbstractDatasourceAdapter;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
+
 import edu.uci.ics.asterix.om.types.IAType;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
 
 /**
  * Provides the functionality of fetching data in form of ADM records from a Hive dataset.
  */
 @SuppressWarnings("deprecation")
-public class HiveAdapter extends AbstractDatasourceAdapter {
+public class HiveAdapter extends FileSystemBasedAdapter {
 
     private static final long serialVersionUID = 1L;
 
@@ -42,35 +43,16 @@
     private HDFSAdapter hdfsAdapter;
 
     public HiveAdapter(IAType atype, String[] readSchedule, boolean[] executed, InputSplit[] inputSplits, JobConf conf,
-            AlgebricksPartitionConstraint clusterLocations) {
-        this.hdfsAdapter = new HDFSAdapter(atype, readSchedule, executed, inputSplits, conf, clusterLocations);
-        this.atype = atype;
+            AlgebricksPartitionConstraint clusterLocations, String nodeName, ITupleParserFactory parserFactory,
+            IHyracksTaskContext ctx) {
+        super(parserFactory, atype, ctx);
+        this.hdfsAdapter = new HDFSAdapter(atype, readSchedule, executed, inputSplits, conf, nodeName, parserFactory,
+                ctx);
     }
 
     @Override
-    public AdapterType getAdapterType() {
-        return AdapterType.READ;
-    }
-
-    @Override
-    public void configure(Map<String, Object> arguments) throws Exception {
-        this.configuration = arguments;
-        this.hdfsAdapter.configure(arguments);
-    }
-
-    @Override
-    public void initialize(IHyracksTaskContext ctx) throws Exception {
-        hdfsAdapter.initialize(ctx);
-    }
-
-    @Override
-    public void start(int partition, IFrameWriter writer) throws Exception {
-        hdfsAdapter.start(partition, writer);
-    }
-
-    @Override
-    public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
-        return hdfsAdapter.getPartitionConstraint();
+    public InputStream getInputStream(int partition) throws IOException {
+        return hdfsAdapter.getInputStream(partition);
     }
 
 }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/NCFileSystemAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/NCFileSystemAdapter.java
index 182ccf7..f3075d0 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/NCFileSystemAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/NCFileSystemAdapter.java
@@ -19,15 +19,11 @@
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
-import java.util.Map;
 
-import edu.uci.ics.asterix.common.exceptions.AsterixException;
 import edu.uci.ics.asterix.om.types.IAType;
-import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
 
 /**
  * Factory class for creating an instance of NCFileSystemAdapter. An
@@ -37,59 +33,13 @@
 public class NCFileSystemAdapter extends FileSystemBasedAdapter {
 
     private static final long serialVersionUID = 1L;
-    private FileSplit[] fileSplits;
 
-    public NCFileSystemAdapter(IAType atype) {
-        super(atype);
-    }
+    private final FileSplit[] fileSplits;
 
-    @Override
-    public void configure(Map<String, Object> arguments) throws Exception {
-        this.configuration = arguments;
-        String[] splits = ((String) arguments.get(KEY_PATH)).split(",");
-        configureFileSplits(splits);
-        configureFormat();
-    }
-
-    @Override
-    public void initialize(IHyracksTaskContext ctx) throws Exception {
-        this.ctx = ctx;
-    }
-
-    @Override
-    public AdapterType getAdapterType() {
-        return AdapterType.READ;
-    }
-
-    private void configureFileSplits(String[] splits) throws AsterixException {
-        if (fileSplits == null) {
-            fileSplits = new FileSplit[splits.length];
-            String nodeName;
-            String nodeLocalPath;
-            int count = 0;
-            String trimmedValue;
-            for (String splitPath : splits) {
-                trimmedValue = splitPath.trim();
-                if (!trimmedValue.contains("://")) {
-                    throw new AsterixException("Invalid path: " + splitPath
-                            + "\nUsage- path=\"Host://Absolute File Path\"");
-                }
-                nodeName = trimmedValue.split(":")[0];
-                nodeLocalPath = trimmedValue.split("://")[1];
-                FileSplit fileSplit = new FileSplit(nodeName, new FileReference(new File(nodeLocalPath)));
-                fileSplits[count++] = fileSplit;
-            }
-        }
-    }
-
-    private void configurePartitionConstraint() throws AsterixException {
-        String[] locs = new String[fileSplits.length];
-        String location;
-        for (int i = 0; i < fileSplits.length; i++) {
-            location = getNodeResolver().resolveNode(fileSplits[i].getNodeName());
-            locs[i] = location;
-        }
-        partitionConstraint = new AlgebricksAbsolutePartitionConstraint(locs);
+    public NCFileSystemAdapter(FileSplit[] fileSplits, ITupleParserFactory parserFactory, IAType atype,
+            IHyracksTaskContext ctx) {
+        super(parserFactory, atype, ctx);
+        this.fileSplits = fileSplits;
     }
 
     @Override
@@ -105,11 +55,4 @@
         }
     }
 
-    @Override
-    public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
-        if (partitionConstraint == null) {
-            configurePartitionConstraint();
-        }
-        return partitionConstraint;
-    }
 }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAdapter.java
index 0223700..534b495 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAdapter.java
@@ -20,11 +20,11 @@
 
 import edu.uci.ics.asterix.external.dataset.adapter.IPullBasedFeedClient.InflowState;
 import edu.uci.ics.asterix.metadata.feeds.AbstractFeedDatasourceAdapter;
-import edu.uci.ics.asterix.metadata.feeds.FeedPolicyEnforcer;
-import edu.uci.ics.asterix.metadata.feeds.IManagedFeedAdapter;
-import edu.uci.ics.asterix.metadata.feeds.ITypedDatasourceAdapter;
+import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
+import edu.uci.ics.asterix.metadata.feeds.IFeedAdapter;
 import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
@@ -35,8 +35,8 @@
  * Captures the common logic for obtaining bytes from an external source
  * and packing them into frames as tuples.
  */
-public abstract class PullBasedAdapter extends AbstractFeedDatasourceAdapter implements ITypedDatasourceAdapter,
-        IManagedFeedAdapter {
+public abstract class PullBasedAdapter extends AbstractFeedDatasourceAdapter implements IDatasourceAdapter,
+        IFeedAdapter {
 
     private static final long serialVersionUID = 1L;
     private static final Logger LOGGER = Logger.getLogger(PullBasedAdapter.class.getName());
@@ -50,9 +50,16 @@
     protected boolean alterRequested = false;
     private Map<String, Object> modifiedConfiguration = null;
     private long tupleCount = 0;
+    private final IHyracksTaskContext ctx;
+    protected Map<String, Object> configuration;
 
     public abstract IPullBasedFeedClient getFeedClient(int partition) throws Exception;
 
+    public PullBasedAdapter(Map<String, Object> configuration, IHyracksTaskContext ctx) {
+        this.ctx = ctx;
+        this.configuration = configuration;
+    }
+
     public long getIngestedRecordsCount() {
         return tupleCount;
     }
@@ -122,11 +129,6 @@
         }
     }
 
-    @Override
-    public ARecordType getAdapterOutputType() {
-        return adapterOutputType;
-    }
-
     /**
      * Discontinue the ingestion of data and end the feed.
      * 
@@ -138,4 +140,8 @@
         timer.cancel();
     }
 
+    public Map<String, Object> getConfiguration() {
+        return configuration;
+    }
+
 }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java
index 97b1d77..a9820b7 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java
@@ -16,19 +16,18 @@
 
 import java.util.Map;
 
-import edu.uci.ics.asterix.metadata.feeds.IManagedFeedAdapter;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.metadata.feeds.IFeedAdapter;
 import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.asterix.om.types.BuiltinType;
 import edu.uci.ics.asterix.om.types.IAType;
-import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
-import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 
 /**
  * An adapter that provides the functionality of receiving tweets from the
  * Twitter service in the form of ADM formatted records.
  */
-public class PullBasedTwitterAdapter extends PullBasedAdapter implements IManagedFeedAdapter {
+public class PullBasedTwitterAdapter extends PullBasedAdapter implements IFeedAdapter {
 
     private static final long serialVersionUID = 1L;
 
@@ -36,7 +35,7 @@
     public static final String INTERVAL = "interval";
 
     private ARecordType recordType;
-
+    private final IHyracksTaskContext ctx;
     private PullBasedTwitterFeedClient tweetClient;
 
     @Override
@@ -44,27 +43,17 @@
         return tweetClient;
     }
 
-    @Override
-    public void configure(Map<String, Object> arguments) throws Exception {
-        configuration = arguments;
+    public PullBasedTwitterAdapter(Map<String, Object> configuration, IHyracksTaskContext ctx) throws AsterixException {
+        super(configuration, ctx);
         String[] fieldNames = { "id", "username", "location", "text", "timestamp" };
         IAType[] fieldTypes = { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING,
                 BuiltinType.ASTRING };
         recordType = new ARecordType("FeedRecordType", fieldNames, fieldTypes, false);
-    }
-
-    @Override
-    public void initialize(IHyracksTaskContext ctx) throws Exception {
         this.ctx = ctx;
         tweetClient = new PullBasedTwitterFeedClient(ctx, this);
     }
 
     @Override
-    public AdapterType getAdapterType() {
-        return AdapterType.READ;
-    }
-
-    @Override
     public void stop() {
         tweetClient.stop();
     }
@@ -86,12 +75,4 @@
         return recordType;
     }
 
-    @Override
-    public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
-        if (partitionConstraint == null) {
-            partitionConstraint = new AlgebricksCountPartitionConstraint(1);
-        }
-        return partitionConstraint;
-    }
-
 }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java
index c3c5ae0..bd503f2 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java
@@ -19,18 +19,17 @@
 import java.util.List;
 import java.util.Map;
 
-import edu.uci.ics.asterix.metadata.feeds.IManagedFeedAdapter;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.metadata.feeds.IFeedAdapter;
 import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.asterix.om.types.BuiltinType;
 import edu.uci.ics.asterix.om.types.IAType;
-import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
-import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 
 /**
  * RSSFeedAdapter provides the functionality of fetching an RSS based feed.
  */
-public class RSSFeedAdapter extends PullBasedAdapter implements IManagedFeedAdapter {
+public class RSSFeedAdapter extends PullBasedAdapter implements IFeedAdapter {
 
     private static final long serialVersionUID = 1L;
 
@@ -43,13 +42,18 @@
 
     private IPullBasedFeedClient rssFeedClient;
 
-    public static final String KEY_RSS_URL = "url";
-    public static final String KEY_INTERVAL = "interval";
-
     public boolean isStopRequested() {
         return isStopRequested;
     }
 
+    public RSSFeedAdapter(Map<String, Object> configuration, IHyracksTaskContext ctx) throws AsterixException {
+        super(configuration, ctx);
+        recordType = new ARecordType("FeedRecordType", new String[] { "id", "title", "description", "link" },
+                new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING },
+                false);
+        id_prefix = ctx.getJobletContext().getApplicationContext().getNodeId();
+    }
+
     public void setStopRequested(boolean isStopRequested) {
         this.isStopRequested = isStopRequested;
     }
@@ -66,25 +70,6 @@
         isStopRequested = true;
     }
 
-    @Override
-    public AdapterType getAdapterType() {
-        return AdapterType.READ;
-    }
-
-    @Override
-    public void configure(Map<String, Object> arguments) throws Exception {
-        configuration = arguments;
-        String rssURLProperty = (String) configuration.get(KEY_RSS_URL);
-        if (rssURLProperty == null) {
-            throw new IllegalArgumentException("no rss url provided");
-        }
-        initializeFeedURLs(rssURLProperty);
-        configurePartitionConstraints();
-        recordType = new ARecordType("FeedRecordType", new String[] { "id", "title", "description", "link" },
-                new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING },
-                false);
-    }
-
     private void initializeFeedURLs(String rssURLProperty) {
         feedURLs.clear();
         String[] feedURLProperty = rssURLProperty.split(",");
@@ -94,22 +79,12 @@
     }
 
     protected void reconfigure(Map<String, Object> arguments) {
-        String rssURLProperty = (String) configuration.get(KEY_RSS_URL);
+        String rssURLProperty = (String) configuration.get("KEY_RSS_URL");
         if (rssURLProperty != null) {
             initializeFeedURLs(rssURLProperty);
         }
     }
 
-    protected void configurePartitionConstraints() {
-        partitionConstraint = new AlgebricksCountPartitionConstraint(feedURLs.size());
-    }
-
-    @Override
-    public void initialize(IHyracksTaskContext ctx) throws Exception {
-        this.ctx = ctx;
-        id_prefix = ctx.getJobletContext().getApplicationContext().getNodeId();
-    }
-
     public boolean isAlterRequested() {
         return isAlterRequested;
     }
@@ -126,17 +101,8 @@
         return rssFeedClient;
     }
 
-    @Override
     public ARecordType getAdapterOutputType() {
         return recordType;
     }
 
-    @Override
-    public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
-        if (partitionConstraint == null) {
-            configurePartitionConstraints();
-        }
-        return partitionConstraint;
-    }
-
 }
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataRecordTypes.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataRecordTypes.java
index f922169..f6106e6 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataRecordTypes.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataRecordTypes.java
@@ -162,18 +162,14 @@
     public static final int FEED_DETAILS_ARECORD_DATASOURCE_ADAPTER_FIELD_INDEX = 5;
     public static final int FEED_DETAILS_ARECORD_PROPERTIES_FIELD_INDEX = 6;
     public static final int FEED_DETAILS_ARECORD_FUNCTION_FIELD_INDEX = 7;
-    public static final int FEED_DETAILS_ARECORD_STATE_FIELD_INDEX = 8;
-    public static final int FEED_DETAILS_ARECORD_INGEST_NODES_FIELD_INDEX = 9;
-    public static final int FEED_DETAILS_ARECORD_COMPUTE_NODES_FIELD_INDEX = 10;
 
     private static final ARecordType createFeedDetailsRecordType() throws AsterixException {
         AOrderedListType orderedListType = new AOrderedListType(BuiltinType.ASTRING, null);
         AOrderedListType orderedListOfPropertiesType = new AOrderedListType(DATASOURCE_ADAPTER_PROPERTIES_RECORDTYPE,
                 null);
-        AUnorderedListType unorderedListType = new AUnorderedListType(BuiltinType.ASTRING, null);
 
         String[] fieldNames = { "FileStructure", "PartitioningStrategy", "PartitioningKey", "PrimaryKey", "GroupName",
-                "DatasourceAdapter", "Properties", "Function", "Status", "IngestNodes", "ComputeNodes" };
+                "DatasourceAdapter", "Properties", "Function" };
 
         List<IAType> feedFunctionUnionList = new ArrayList<IAType>();
         feedFunctionUnionList.add(BuiltinType.ANULL);
@@ -181,8 +177,7 @@
         AUnionType feedFunctionUnion = new AUnionType(feedFunctionUnionList, null);
 
         IAType[] fieldTypes = { BuiltinType.ASTRING, BuiltinType.ASTRING, orderedListType, orderedListType,
-                BuiltinType.ASTRING, BuiltinType.ASTRING, orderedListOfPropertiesType, feedFunctionUnion,
-                BuiltinType.ASTRING, unorderedListType, unorderedListType };
+                BuiltinType.ASTRING, BuiltinType.ASTRING, orderedListOfPropertiesType, feedFunctionUnion };
 
         return new ARecordType(null, fieldNames, fieldTypes, true);
     }
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/ClusterManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/ClusterManager.java
index dc072d4..b639559 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/ClusterManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/ClusterManager.java
@@ -46,32 +46,36 @@
 
     private ClusterManager() {
         Cluster asterixCluster = AsterixClusterProperties.INSTANCE.getCluster();
-        String asterixDir = System.getProperty("user.dir") + File.separator + "asterix";
-        String eventHome = asterixCluster.getWorkingDir().getDir();
-        File configFile = new File(System.getProperty("user.dir") + File.separator + "configuration.xml");
-        Configuration configuration = null;
+        String eventHome = asterixCluster == null ? null : asterixCluster.getWorkingDir().getDir();
 
-        try {
-            JAXBContext configCtx = JAXBContext.newInstance(Configuration.class);
-            Unmarshaller unmarshaller = configCtx.createUnmarshaller();
-            configuration = (Configuration) unmarshaller.unmarshal(configFile);
-            AsterixEventService.initialize(configuration, asterixDir, eventHome);
-            client = AsterixEventService.getAsterixEventServiceClient(AsterixClusterProperties.INSTANCE.getCluster());
+        if (asterixCluster != null) {
+            String asterixDir = System.getProperty("user.dir") + File.separator + "asterix";
+            File configFile = new File(System.getProperty("user.dir") + File.separator + "configuration.xml");
+            Configuration configuration = null;
 
-            lookupService = ServiceProvider.INSTANCE.getLookupService();
-            if (!lookupService.isRunning(configuration)) {
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("Lookup service not running. Starting lookup service ...");
+            try {
+                JAXBContext configCtx = JAXBContext.newInstance(Configuration.class);
+                Unmarshaller unmarshaller = configCtx.createUnmarshaller();
+                configuration = (Configuration) unmarshaller.unmarshal(configFile);
+                AsterixEventService.initialize(configuration, asterixDir, eventHome);
+                client = AsterixEventService.getAsterixEventServiceClient(AsterixClusterProperties.INSTANCE
+                        .getCluster());
+
+                lookupService = ServiceProvider.INSTANCE.getLookupService();
+                if (!lookupService.isRunning(configuration)) {
+                    if (LOGGER.isLoggable(Level.INFO)) {
+                        LOGGER.info("Lookup service not running. Starting lookup service ...");
+                    }
+                    lookupService.startService(configuration);
+                } else {
+                    if (LOGGER.isLoggable(Level.INFO)) {
+                        LOGGER.info("Lookup service running");
+                    }
                 }
-                lookupService.startService(configuration);
-            } else {
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("Lookup service running");
-                }
+
+            } catch (Exception e) {
+                throw new IllegalStateException("Unable to initialize cluster manager" + e);
             }
-
-        } catch (Exception e) {
-            throw new IllegalStateException("Unable to initialize cluster manager" + e);
         }
     }
 
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
index b2ccbc0..8c03198 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
@@ -62,11 +62,9 @@
 import edu.uci.ics.asterix.metadata.feeds.FeedIntakeOperatorDescriptor;
 import edu.uci.ics.asterix.metadata.feeds.FeedMessageOperatorDescriptor;
 import edu.uci.ics.asterix.metadata.feeds.IAdapterFactory;
+import edu.uci.ics.asterix.metadata.feeds.IAdapterFactory.SupportedOperation;
 import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
 import edu.uci.ics.asterix.metadata.feeds.IFeedMessage;
-import edu.uci.ics.asterix.metadata.feeds.IGenericDatasetAdapterFactory;
-import edu.uci.ics.asterix.metadata.feeds.ITypedDatasetAdapterFactory;
-import edu.uci.ics.asterix.metadata.feeds.ITypedDatasourceAdapter;
 import edu.uci.ics.asterix.metadata.utils.DatasetUtils;
 import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
 import edu.uci.ics.asterix.om.types.ARecordType;
@@ -119,6 +117,7 @@
 import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.dataset.ResultSetId;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
@@ -168,8 +167,6 @@
     private final AsterixStorageProperties storageProperties;
 
     private static final Map<String, String> adapterFactoryMapping = initializeAdapterFactoryMapping();
-    private static Scheduler hdfsScheduler;
-    public static final String CLUSTER_LOCATIONS = "cluster-locations";
     public static transient String SCHEDULER = "hdfs-scheduler";
 
     public String getPropertyValue(String propertyName) {
@@ -192,16 +189,6 @@
         this.defaultDataverse = defaultDataverse;
         this.stores = AsterixAppContextInfo.getInstance().getMetadataProperties().getStores();
         this.storageProperties = AsterixAppContextInfo.getInstance().getStorageProperties();
-        ICCContext ccContext = AsterixAppContextInfo.getInstance().getCCApplicationContext().getCCContext();
-        try {
-            if (hdfsScheduler == null) {
-                //set the singleton hdfs scheduler
-                hdfsScheduler = new Scheduler(ccContext.getClusterControllerInfo().getClientNetAddress(), ccContext
-                        .getClusterControllerInfo().getClientNetPort());
-            }
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
     }
 
     public void setJobId(JobId jobId) {
@@ -355,8 +342,7 @@
             throw new AlgebricksException("Can only scan datasets of records.");
         }
 
-        IGenericDatasetAdapterFactory adapterFactory;
-        IDatasourceAdapter adapter;
+        IAdapterFactory adapterFactory;
         String adapterName;
         DatasourceAdapter adapterEntity;
         String adapterFactoryClassname;
@@ -366,17 +352,18 @@
                     adapterName);
             if (adapterEntity != null) {
                 adapterFactoryClassname = adapterEntity.getClassname();
-                adapterFactory = (IGenericDatasetAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
+                adapterFactory = (IAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
             } else {
                 adapterFactoryClassname = adapterFactoryMapping.get(adapterName);
                 if (adapterFactoryClassname == null) {
                     throw new AlgebricksException(" Unknown adapter :" + adapterName);
                 }
-                adapterFactory = (IGenericDatasetAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
+                adapterFactory = (IAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
             }
 
-            adapter = ((IGenericDatasetAdapterFactory) adapterFactory).createAdapter(
-                    wrapProperties(datasetDetails.getProperties()), itemType);
+            Map<String, Object> configuration = this.wrapProperties(datasetDetails.getProperties());
+            configuration.put("source-datatype", itemType);
+            adapterFactory.configure(configuration);
         } catch (AlgebricksException ae) {
             throw ae;
         } catch (Exception e) {
@@ -384,21 +371,20 @@
             throw new AlgebricksException("Unable to create adapter " + e);
         }
 
-        if (!(adapter.getAdapterType().equals(IDatasourceAdapter.AdapterType.READ) || adapter.getAdapterType().equals(
-                IDatasourceAdapter.AdapterType.READ_WRITE))) {
+        if (!(adapterFactory.getSupportedOperations().equals(SupportedOperation.READ) || adapterFactory
+                .getSupportedOperations().equals(SupportedOperation.READ_WRITE))) {
             throw new AlgebricksException("external dataset adapter does not support read operation");
         }
-        ARecordType rt = (ARecordType) itemType;
 
         ISerializerDeserializer payloadSerde = format.getSerdeProvider().getSerializerDeserializer(itemType);
         RecordDescriptor scannerDesc = new RecordDescriptor(new ISerializerDeserializer[] { payloadSerde });
 
-        ExternalDataScanOperatorDescriptor dataScanner = new ExternalDataScanOperatorDescriptor(jobSpec,
-                wrapPropertiesEmpty(datasetDetails.getProperties()), rt, scannerDesc, adapterFactory);
+        ExternalDataScanOperatorDescriptor dataScanner = new ExternalDataScanOperatorDescriptor(jobSpec, scannerDesc,
+                adapterFactory);
 
         AlgebricksPartitionConstraint constraint;
         try {
-            constraint = adapter.getPartitionConstraint();
+            constraint = adapterFactory.getPartitionConstraint();
         } catch (Exception e) {
             throw new AlgebricksException(e);
         }
@@ -451,26 +437,28 @@
                 adapterFactoryClassname = adapterFactoryMapping.get(adapterName);
                 if (adapterFactoryClassname != null) {
                 } else {
-                    // adapterName has been provided as a fully qualified
-                    // classname
                     adapterFactoryClassname = adapterName;
                 }
                 adapterFactory = (IAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
             }
 
-            if (adapterFactory instanceof ITypedDatasetAdapterFactory) {
-                adapter = ((ITypedDatasetAdapterFactory) adapterFactory).createAdapter(wrapProperties(datasetDetails
-                        .getProperties()));
-                adapterOutputType = ((ITypedDatasourceAdapter) adapter).getAdapterOutputType();
-            } else if (adapterFactory instanceof IGenericDatasetAdapterFactory) {
-                String outputTypeName = datasetDetails.getProperties().get(IGenericDatasetAdapterFactory.KEY_TYPE_NAME);
-                adapterOutputType = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataset.getDataverseName(),
-                        outputTypeName).getDatatype();
-                adapter = ((IGenericDatasetAdapterFactory) adapterFactory).createAdapter(
-                        wrapProperties(datasetDetails.getProperties()), adapterOutputType);
-            } else {
-                throw new IllegalStateException(" Unknown factory type for " + adapterFactoryClassname);
+            Map<String, Object> configuration = this.wrapProperties(datasetDetails.getProperties());
+       
+            switch (adapterFactory.getAdapterType()) {
+                case TYPED:
+                    adapterOutputType = null;
+                    break;
+                case GENERIC:
+                    String outputTypeName = datasetDetails.getProperties().get("output-type-name");
+                    adapterOutputType = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataset.getDataverseName(),
+                            outputTypeName).getDatatype();
+                    break;
+                default:
+                    throw new IllegalStateException(" Unknown factory type for " + adapterFactoryClassname);
             }
+            configuration.put("source-datatype", adapterOutputType);
+            adapterFactory.configure(configuration);
+
         } catch (AlgebricksException ae) {
             throw ae;
         } catch (Exception e) {
@@ -485,17 +473,13 @@
         FeedPolicy feedPolicy = (FeedPolicy) ((AqlDataSource) dataSource).getProperties().get(
                 BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY);
 
-        // public FeedIntakeOperatorDescriptor(JobSpecification spec, FeedId feedId, String adapter,
-        //        Map<String, Object> arguments, ARecordType atype, RecordDescriptor rDesc, Map<String, String> feedPolicy) {
-
         FeedIntakeOperatorDescriptor feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, new FeedId(
-                dataset.getDataverseName(), dataset.getDatasetName()), adapterFactoryClassname,
-                this.wrapPropertiesEmpty(datasetDetails.getProperties()), (ARecordType) adapterOutputType, feedDesc,
-                feedPolicy.getProperties());
+                dataset.getDataverseName(), dataset.getDatasetName()), adapterFactory, (ARecordType) adapterOutputType,
+                feedDesc, feedPolicy.getProperties());
 
         AlgebricksPartitionConstraint constraint = null;
         try {
-            constraint = adapter.getPartitionConstraint();
+            constraint = adapterFactory.getPartitionConstraint();
         } catch (Exception e) {
             throw new AlgebricksException(e);
         }
@@ -1537,8 +1521,8 @@
     private Map<String, Object> wrapProperties(Map<String, String> properties) {
         Map<String, Object> wrappedProperties = new HashMap<String, Object>();
         wrappedProperties.putAll(properties);
-        wrappedProperties.put(SCHEDULER, hdfsScheduler);
-        wrappedProperties.put(CLUSTER_LOCATIONS, getClusterLocations());
+        //wrappedProperties.put(SCHEDULER, hdfsScheduler);
+        //wrappedProperties.put(CLUSTER_LOCATIONS, getClusterLocations());
         return wrappedProperties;
     }
 
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/FeedDatasetDetails.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/FeedDatasetDetails.java
index 81be3b5..e2268bf 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/FeedDatasetDetails.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/FeedDatasetDetails.java
@@ -23,7 +23,6 @@
 import edu.uci.ics.asterix.builders.IARecordBuilder;
 import edu.uci.ics.asterix.builders.OrderedListBuilder;
 import edu.uci.ics.asterix.builders.RecordBuilder;
-import edu.uci.ics.asterix.builders.UnorderedListBuilder;
 import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
 import edu.uci.ics.asterix.common.functions.FunctionSignature;
@@ -32,7 +31,6 @@
 import edu.uci.ics.asterix.om.base.AMutableString;
 import edu.uci.ics.asterix.om.base.AString;
 import edu.uci.ics.asterix.om.types.AOrderedListType;
-import edu.uci.ics.asterix.om.types.AUnorderedListType;
 import edu.uci.ics.asterix.om.types.BuiltinType;
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -49,9 +47,6 @@
     private final String adapterFactory;
     private final Map<String, String> properties;
     private final FunctionSignature signature;
-    private FeedState feedState;
-    private List<String> ingestNodes;
-    private List<String> computeNodes;
 
     public enum FeedState {
         // INACTIVE state signifies that the feed dataset is not
@@ -66,15 +61,11 @@
 
     public FeedDatasetDetails(FileStructure fileStructure, PartitioningStrategy partitioningStrategy,
             List<String> partitioningKey, List<String> primaryKey, String groupName, String adapterFactory,
-            Map<String, String> properties, FunctionSignature signature, String feedState, List<String> ingestNodes,
-            List<String> computeNodes) {
+            Map<String, String> properties, FunctionSignature signature) {
         super(fileStructure, partitioningStrategy, partitioningKey, primaryKey, groupName);
         this.properties = properties;
         this.adapterFactory = adapterFactory;
         this.signature = signature;
-        this.feedState = feedState.equals(FeedState.ACTIVE.toString()) ? FeedState.ACTIVE : FeedState.INACTIVE;
-        this.ingestNodes = ingestNodes;
-        this.computeNodes = computeNodes;
     }
 
     @Override
@@ -162,44 +153,6 @@
             feedRecordBuilder.addField(MetadataRecordTypes.FEED_DETAILS_ARECORD_FUNCTION_FIELD_INDEX, fieldValue);
         }
 
-        // write field 8
-        fieldValue.reset();
-        aString.setValue(getFeedState().toString());
-        stringSerde.serialize(aString, fieldValue.getDataOutput());
-        feedRecordBuilder.addField(MetadataRecordTypes.FEED_DETAILS_ARECORD_STATE_FIELD_INDEX, fieldValue);
-
-        // write field 9
-        UnorderedListBuilder unorderedlistBuilder = new UnorderedListBuilder();
-        unorderedlistBuilder
-                .reset((AUnorderedListType) MetadataRecordTypes.FEED_DETAILS_RECORDTYPE.getFieldTypes()[MetadataRecordTypes.FEED_DETAILS_ARECORD_INGEST_NODES_FIELD_INDEX]);
-        itemValue = new ArrayBackedValueStorage();
-        if (ingestNodes != null) {
-            for (String node : ingestNodes) {
-                itemValue.reset();
-                aString.setValue(node);
-                listBuilder.addItem(itemValue);
-            }
-        }
-        fieldValue.reset();
-        unorderedlistBuilder.write(fieldValue.getDataOutput(), true);
-        feedRecordBuilder.addField(MetadataRecordTypes.FEED_DETAILS_ARECORD_INGEST_NODES_FIELD_INDEX, fieldValue);
-
-        // write field 10
-        unorderedlistBuilder = new UnorderedListBuilder();
-        unorderedlistBuilder
-                .reset((AUnorderedListType) MetadataRecordTypes.FEED_DETAILS_RECORDTYPE.getFieldTypes()[MetadataRecordTypes.FEED_DETAILS_ARECORD_COMPUTE_NODES_FIELD_INDEX]);
-        itemValue = new ArrayBackedValueStorage();
-        if (computeNodes != null) {
-            for (String node : computeNodes) {
-                itemValue.reset();
-                aString.setValue(node);
-                listBuilder.addItem(itemValue);
-            }
-        }
-        fieldValue.reset();
-        unorderedlistBuilder.write(fieldValue.getDataOutput(), true);
-        feedRecordBuilder.addField(MetadataRecordTypes.FEED_DETAILS_ARECORD_COMPUTE_NODES_FIELD_INDEX, fieldValue);
-
         try {
             feedRecordBuilder.write(out, true);
         } catch (IOException | AsterixException e) {
@@ -235,14 +188,6 @@
         }
     }
 
-    public FeedState getFeedState() {
-        return feedState;
-    }
-
-    public void setFeedState(FeedState feedState) {
-        this.feedState = feedState;
-    }
-
     public String getAdapterFactory() {
         return adapterFactory;
     }
@@ -255,20 +200,4 @@
         return signature;
     }
 
-    public List<String> getIngestNodes() {
-        return ingestNodes;
-    }
-
-    public void setIngestNodes(List<String> ingestNodes) {
-        this.ingestNodes = ingestNodes;
-    }
-
-    public List<String> getComputeNodes() {
-        return computeNodes;
-    }
-
-    public void setComputeNodes(List<String> computeNodes) {
-        this.computeNodes = computeNodes;
-    }
-
 }
\ No newline at end of file
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
index 65b2d78..9ba5848 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
@@ -166,26 +166,8 @@
                             Integer.parseInt(nameComponents[1]));
                 }
 
-                String feedState = ((AString) datasetDetailsRecord
-                        .getValueByPos(MetadataRecordTypes.FEED_DETAILS_ARECORD_STATE_FIELD_INDEX)).getStringValue();
-
-                cursor = ((AUnorderedList) datasetDetailsRecord
-                        .getValueByPos(MetadataRecordTypes.FEED_DETAILS_ARECORD_INGEST_NODES_FIELD_INDEX)).getCursor();
-                List<String> ingestNodes = new ArrayList<String>();
-                while (cursor.next()) {
-                    ingestNodes.add(((AString) cursor.get()).getStringValue());
-                }
-
-                cursor = ((AUnorderedList) datasetDetailsRecord
-                        .getValueByPos(MetadataRecordTypes.FEED_DETAILS_ARECORD_INGEST_NODES_FIELD_INDEX)).getCursor();
-                List<String> computeNodes = new ArrayList<String>();
-                while (cursor.next()) {
-                    computeNodes.add(((AString) cursor.get()).getStringValue());
-                }
-
                 datasetDetails = new FeedDatasetDetails(fileStructure, partitioningStrategy, partitioningKey,
-                        partitioningKey, groupName, adapter, properties, signature, feedState, ingestNodes,
-                        computeNodes);
+                        partitioningKey, groupName, adapter, properties, signature);
                 break;
             }
             case INTERNAL: {
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AbstractDatasourceAdapter.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AbstractDatasourceAdapter.java
index 7684969..9e8e5f7 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AbstractDatasourceAdapter.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AbstractDatasourceAdapter.java
@@ -17,17 +17,9 @@
 import java.util.HashMap;
 import java.util.Map;
 
-import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
-import edu.uci.ics.asterix.om.types.ATypeTag;
 import edu.uci.ics.asterix.om.types.IAType;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.DoubleParserFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.FloatParserFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.IntegerParserFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.LongParserFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
 
 /**
  * Represents the base class that is required to be extended by every
@@ -41,16 +33,6 @@
     protected transient AlgebricksPartitionConstraint partitionConstraint;
     protected IAType atype;
     protected IHyracksTaskContext ctx;
-    protected AdapterType adapterType;
-
-    protected static final HashMap<ATypeTag, IValueParserFactory> typeToValueParserFactMap = new HashMap<ATypeTag, IValueParserFactory>();
-    static {
-        typeToValueParserFactMap.put(ATypeTag.INT32, IntegerParserFactory.INSTANCE);
-        typeToValueParserFactMap.put(ATypeTag.FLOAT, FloatParserFactory.INSTANCE);
-        typeToValueParserFactMap.put(ATypeTag.DOUBLE, DoubleParserFactory.INSTANCE);
-        typeToValueParserFactMap.put(ATypeTag.INT64, LongParserFactory.INSTANCE);
-        typeToValueParserFactMap.put(ATypeTag.STRING, UTF8StringParserFactory.INSTANCE);
-    }
 
     protected static final Map<String, Object> formatToParserFactoryMap = initializeFormatParserFactoryMap();
 
@@ -66,29 +48,4 @@
         return map;
     }
 
-    /**
-     * Get the partition constraint chosen by the adapter.
-     * An adapter may have preferences as to where it needs to be instantiated and used.
-     */
-    public abstract AlgebricksPartitionConstraint getPartitionConstraint() throws Exception;
-
-    /**
-     * Get the configured value from the adapter configuration parameters, corresponding to the an attribute.
-     * 
-     * @param attribute
-     *            The attribute whose value needs to be obtained.
-     */
-    public Object getAdapterProperty(String attribute) {
-        return configuration.get(attribute);
-    }
-
-    /**
-     * Get the adapter configuration parameters.
-     * 
-     * @return A Map<String,String> instance representing the adapter configuration.
-     */
-    public Map<String, Object> getConfiguration() {
-        return configuration;
-    }
-
 }
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AbstractFeedDatasourceAdapter.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AbstractFeedDatasourceAdapter.java
index 08d6972..96dc8dc 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AbstractFeedDatasourceAdapter.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AbstractFeedDatasourceAdapter.java
@@ -12,7 +12,7 @@
 import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityDetails;
 import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityType;
 
-public abstract class AbstractFeedDatasourceAdapter extends AbstractDatasourceAdapter {
+public abstract class AbstractFeedDatasourceAdapter implements IDatasourceAdapter {
 
     private static final long serialVersionUID = 1L;
 
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ExternalDataScanOperatorDescriptor.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ExternalDataScanOperatorDescriptor.java
index 07e1594..c5937f6 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ExternalDataScanOperatorDescriptor.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ExternalDataScanOperatorDescriptor.java
@@ -14,9 +14,6 @@
  */
 package edu.uci.ics.asterix.metadata.feeds;
 
-import java.util.Map;
-
-import edu.uci.ics.asterix.om.types.IAType;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
@@ -34,17 +31,13 @@
 
     private static final long serialVersionUID = 1L;
 
-    private final Map<String, Object> adapterConfiguration;
-    private final IAType atype;
-    private IGenericDatasetAdapterFactory datasourceAdapterFactory;
+    private IAdapterFactory adapterFactory;
 
-    public ExternalDataScanOperatorDescriptor(JobSpecification spec, Map<String, Object> arguments, IAType atype,
-            RecordDescriptor rDesc, IGenericDatasetAdapterFactory dataSourceAdapterFactory) {
+    public ExternalDataScanOperatorDescriptor(JobSpecification spec, RecordDescriptor rDesc,
+            IAdapterFactory dataSourceAdapterFactory) {
         super(spec, 0, 1);
         recordDescriptors[0] = rDesc;
-        this.adapterConfiguration = arguments;
-        this.atype = atype;
-        this.datasourceAdapterFactory = dataSourceAdapterFactory;
+        this.adapterFactory = dataSourceAdapterFactory;
     }
 
     @Override
@@ -58,9 +51,7 @@
                 writer.open();
                 IDatasourceAdapter adapter = null;
                 try {
-                    adapter = ((IGenericDatasetAdapterFactory) datasourceAdapterFactory).createAdapter(
-                            adapterConfiguration, atype);
-                    adapter.initialize(ctx);
+                    adapter = ((IAdapterFactory) adapterFactory).createAdapter(ctx);
                     adapter.start(partition, writer);
                 } catch (Exception e) {
                     throw new HyracksDataException("exception during reading from external data source", e);
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java
index 96be46a..4270fe2 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java
@@ -34,20 +34,16 @@
 
     private static final long serialVersionUID = 1L;
 
-    private final String adapterFactoryClassName;
-    private final Map<String, Object> adapterConfiguration;
     private final IAType atype;
     private final FeedId feedId;
     private final Map<String, String> feedPolicy;
+    private IAdapterFactory adapterFactory;
 
-    private transient IAdapterFactory datasourceAdapterFactory;
-
-    public FeedIntakeOperatorDescriptor(JobSpecification spec, FeedId feedId, String adapter,
-            Map<String, Object> arguments, ARecordType atype, RecordDescriptor rDesc, Map<String, String> feedPolicy) {
+    public FeedIntakeOperatorDescriptor(JobSpecification spec, FeedId feedId, IAdapterFactory adapterFactory,
+            ARecordType atype, RecordDescriptor rDesc, Map<String, String> feedPolicy) {
         super(spec, 1, 1);
         recordDescriptors[0] = rDesc;
-        this.adapterFactoryClassName = adapter;
-        this.adapterConfiguration = arguments;
+        this.adapterFactory = adapterFactory;
         this.atype = atype;
         this.feedId = feedId;
         this.feedPolicy = feedPolicy;
@@ -56,19 +52,9 @@
     public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
             IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
             throws HyracksDataException {
-        ITypedDatasourceAdapter adapter;
+        IDatasourceAdapter adapter;
         try {
-            datasourceAdapterFactory = (IAdapterFactory) Class.forName(adapterFactoryClassName).newInstance();
-            if (datasourceAdapterFactory instanceof IGenericDatasetAdapterFactory) {
-                adapter = (ITypedDatasourceAdapter) ((IGenericDatasetAdapterFactory) datasourceAdapterFactory)
-                        .createAdapter(adapterConfiguration, atype);
-            } else if (datasourceAdapterFactory instanceof ITypedDatasetAdapterFactory) {
-                adapter = (ITypedDatasourceAdapter) ((ITypedDatasetAdapterFactory) datasourceAdapterFactory)
-                        .createAdapter(adapterConfiguration);
-            } else {
-                throw new IllegalStateException(" Unknown adapter factory type for " + adapterFactoryClassName);
-            }
-            adapter.initialize(ctx);
+            adapter = adapterFactory.createAdapter(ctx);
         } catch (Exception e) {
             throw new HyracksDataException("initialization of adapter failed", e);
         }
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
index f6bde26..97153f0 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
@@ -50,23 +50,25 @@
 
     @Override
     public void open() throws HyracksDataException {
-        if (adapter instanceof IManagedFeedAdapter) {
-            feedInboxMonitor = new FeedInboxMonitor((IManagedFeedAdapter) adapter, inbox, partition);
+        if (adapter instanceof IFeedAdapter) {
+            feedInboxMonitor = new FeedInboxMonitor((IFeedAdapter) adapter, inbox, partition);
             AsterixThreadExecutor.INSTANCE.execute(feedInboxMonitor);
             feedManager.registerFeedMsgQueue(feedId, inbox);
         }
         writer.open();
         try {
-            ((AbstractFeedDatasourceAdapter) adapter).setFeedPolicyEnforcer(policyEnforcer);
+            if (adapter instanceof AbstractFeedDatasourceAdapter) {
+                ((AbstractFeedDatasourceAdapter) adapter).setFeedPolicyEnforcer(policyEnforcer);
+            }
             adapter.start(partition, writer);
         } catch (Exception e) {
             e.printStackTrace();
             throw new HyracksDataException(e);
         } finally {
             writer.close();
-            if (adapter instanceof IManagedFeedAdapter) {
+            if (adapter instanceof IFeedAdapter) {
                 try {
-                    ((IManagedFeedAdapter) adapter).stop();
+                    ((IFeedAdapter) adapter).stop();
                 } catch (Exception e) {
                     e.printStackTrace();
                 }
@@ -98,9 +100,9 @@
 class FeedInboxMonitor extends Thread {
 
     private LinkedBlockingQueue<IFeedMessage> inbox;
-    private final IManagedFeedAdapter adapter;
+    private final IFeedAdapter adapter;
 
-    public FeedInboxMonitor(IManagedFeedAdapter adapter, LinkedBlockingQueue<IFeedMessage> inbox, int partition) {
+    public FeedInboxMonitor(IFeedAdapter adapter, LinkedBlockingQueue<IFeedMessage> inbox, int partition) {
         this.inbox = inbox;
         this.adapter = adapter;
     }
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IAdapterFactory.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IAdapterFactory.java
index 85e09d4..cc8e165 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IAdapterFactory.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IAdapterFactory.java
@@ -15,6 +15,10 @@
 package edu.uci.ics.asterix.metadata.feeds;
 
 import java.io.Serializable;
+import java.util.Map;
+
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 
 /**
  * Base interface for IGenericDatasetAdapterFactory and ITypedDatasetAdapterFactory.
@@ -24,9 +28,72 @@
 public interface IAdapterFactory extends Serializable {
 
     /**
+     * A 'GENERIC' adapter can be configured to return a given datatype.
+     * A 'TYPED' adapter returns records with a pre-defined datatype.
+     */
+    public enum AdapterType {
+        GENERIC,
+        TYPED
+    }
+
+    public enum SupportedOperation {
+        READ,
+        WRITE,
+        READ_WRITE
+    }
+
+    /**
+     * Returns the type of adapter indicating if the adapter can be used for
+     * reading from an external data source or writing to an external data
+     * source or can be used for both purposes.
+     * 
+     * @see SupportedOperation
+     * @return
+     */
+    public SupportedOperation getSupportedOperations();
+
+    /**
      * Returns the display name corresponding to the Adapter type that is created by the factory.
      * 
      * @return the display name
      */
     public String getName();
+
+    /**
+     * Returns the type of the adapter (GENERIC or TYPED)
+     * 
+     * @return
+     */
+    public AdapterType getAdapterType();
+
+    /**
+     * @param configuration
+     */
+    public void configure(Map<String, Object> configuration) throws Exception;
+
+    /**
+     * Returns a list of partition constraints. A partition constraint can be a
+     * requirement to execute at a particular location or could be cardinality
+     * constraints indicating the number of instances that need to run in
+     * parallel. example, a IDatasourceAdapter implementation written for data
+     * residing on the local file system of a node cannot run on any other node
+     * and thus has a location partition constraint. The location partition
+     * constraint can be expressed as a node IP address or a node controller id.
+     * In the former case, the IP address is translated to a node controller id
+     * running on the node with the given IP address.
+     * 
+     * @Caller The wrapper operator configures its partition constraints from
+     *         the constraints obtained from the adapter factory.
+     */
+    public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception;
+
+    /**
+     * Creates an instance of IDatasourceAdapter.
+     * 
+     * @param HyracksTaskContext
+     * @return An instance of IDatasourceAdapter.
+     * @throws Exception
+     */
+    public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx) throws Exception;
+
 }
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IDatasourceAdapter.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IDatasourceAdapter.java
index a117f8a..d0f8cdd 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IDatasourceAdapter.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IDatasourceAdapter.java
@@ -15,11 +15,8 @@
 package edu.uci.ics.asterix.metadata.feeds;
 
 import java.io.Serializable;
-import java.util.Map;
 
-import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 
 /**
  * A super interface implemented by a data source adapter. An adapter can be a
@@ -39,101 +36,10 @@
      *         external dataset determines the set of valid operations allowed
      *         on the dataset.
      */
-    public enum AdapterType {
-        READ,
-        WRITE,
-        READ_WRITE
-    }
+   
 
     /**
-     * Returns the type of adapter indicating if the adapter can be used for
-     * reading from an external data source or writing to an external data
-     * source or can be used for both purposes.
-     * 
-     * @Caller: Compiler: The compiler uses this API to verify if an operation
-     *          is supported by the adapter. For example, an write query against
-     *          an external dataset will not compile successfully if the
-     *          external dataset was declared with a read_only adapter.
-     * @see AdapterType
-     * @return
-     */
-    public AdapterType getAdapterType();
-
-    /**
-     * Each adapter instance is configured with a set of parameters that are
-     * key-value pairs. When creating an external or a feed dataset, an adapter
-     * instance is used in conjunction with a set of configuration parameters
-     * for the adapter instance. The configuration parameters are stored
-     * internally with the adapter and can be retrieved using this API.
-     * 
-     * @param propertyKey
-     * @return String the value corresponding to the configuration parameter
-     *         represented by the key- attributeKey.
-     */
-    public Object getAdapterProperty(String propertyKey);
-
-    /**
-     * Configures the IDatasourceAdapter instance.
-     * 
-     * @caller Scenario 1) Called during compilation of DDL statement that
-     *         creates a Feed dataset and associates the adapter with the
-     *         dataset. The (key,value) configuration parameters provided as
-     *         part of the DDL statement are collected by the compiler and
-     *         passed on to this method. The adapter may as part of
-     *         configuration connect with the external data source and determine
-     *         the IAType associated with data residing with the external
-     *         datasource.
-     *         Scenario 2) An adapter instance is created by an ASTERIX operator
-     *         that wraps around the adapter instance. The operator, as part of
-     *         its initialization invokes the configure method. The (key,value)
-     *         configuration parameters are passed on to the operator by the
-     *         compiler. Subsequent to the invocation, the wrapping operator
-     *         obtains the partition constraints (if any). In addition, in the
-     *         case of a read adapter, the wrapping operator obtains the output
-     *         ASTERIX type associated with the data that will be output from
-     *         the adapter.
-     * @param arguments
-     *            A map with key-value pairs that contains the configuration
-     *            parameters for the adapter. The arguments are obtained from
-     *            the metadata. Recall that the DDL to create an external
-     *            dataset or a feed dataset requires using an adapter and
-     *            providing all arguments as a set of (key,value) pairs. These
-     *            arguments are put into the metadata.
-     */
-    public void configure(Map<String, Object> arguments) throws Exception;
-
-    /**
-     * Returns a list of partition constraints. A partition constraint can be a
-     * requirement to execute at a particular location or could be cardinality
-     * constraints indicating the number of instances that need to run in
-     * parallel. example, a IDatasourceAdapter implementation written for data
-     * residing on the local file system of a node cannot run on any other node
-     * and thus has a location partition constraint. The location partition
-     * constraint can be expressed as a node IP address or a node controller id.
-     * In the former case, the IP address is translated to a node controller id
-     * running on the node with the given IP address.
-     * 
-     * @Caller The wrapper operator configures its partition constraints from
-     *         the constraints obtained from the adapter.
-     */
-    public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception;
-
-    /**
-     * Allows the adapter to establish connection with the external data source
-     * expressing intent for data and providing any configuration parameters
-     * required by the external data source for the transfer of data. This
-     * method does not result in any data transfer, but is a prerequisite for
-     * any subsequent data transfer to happen between the external data source
-     * and the adapter.
-     * 
-     * @caller This method is called by the wrapping ASTERIX operator that
-     * @param ctx
-     * @throws Exception
-     */
-    public void initialize(IHyracksTaskContext ctx) throws Exception;
-
-    /**
-     * Triggers the adapter to begin ingestion of data from the external source.
+     * Triggers the adapter to begin ingesting data from the external source.
      * 
      * @param partition
      *            The adapter could be running with a degree of parallelism.
diff --git a/asterix-om/src/main/java/edu/uci/ics/asterix/om/util/AsterixClusterProperties.java b/asterix-om/src/main/java/edu/uci/ics/asterix/om/util/AsterixClusterProperties.java
index 6285385..29e3b3b 100644
--- a/asterix-om/src/main/java/edu/uci/ics/asterix/om/util/AsterixClusterProperties.java
+++ b/asterix-om/src/main/java/edu/uci/ics/asterix/om/util/AsterixClusterProperties.java
@@ -44,16 +44,20 @@
 
     public static final String CLUSTER_CONFIGURATION_FILE = "cluster.xml";
     private final Cluster cluster;
-   
+
     private AsterixClusterProperties() {
         InputStream is = this.getClass().getClassLoader().getResourceAsStream(CLUSTER_CONFIGURATION_FILE);
-        try {
-            JAXBContext ctx = JAXBContext.newInstance(Cluster.class);
-            Unmarshaller unmarshaller = ctx.createUnmarshaller();
-            cluster = (Cluster) unmarshaller.unmarshal(is);
-        
-        } catch (JAXBException e) {
-            throw new IllegalStateException("Failed to read configuration file " + CLUSTER_CONFIGURATION_FILE);
+        if (is != null) {
+            try {
+                JAXBContext ctx = JAXBContext.newInstance(Cluster.class);
+                Unmarshaller unmarshaller = ctx.createUnmarshaller();
+                cluster = (Cluster) unmarshaller.unmarshal(is);
+
+            } catch (JAXBException e) {
+                throw new IllegalStateException("Failed to read configuration file " + CLUSTER_CONFIGURATION_FILE);
+            }
+        } else {
+            cluster = null;
         }
     }
 
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapter.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapter.java
index 32b539c..81d19ae 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapter.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapter.java
@@ -18,24 +18,10 @@
 import java.io.InputStream;
 import java.util.Map;
 
-import edu.uci.ics.asterix.common.exceptions.AsterixException;
 import edu.uci.ics.asterix.external.dataset.adapter.FileSystemBasedAdapter;
-import edu.uci.ics.asterix.metadata.feeds.IManagedFeedAdapter;
-import edu.uci.ics.asterix.metadata.feeds.ITypedDatasourceAdapter;
+import edu.uci.ics.asterix.metadata.feeds.IFeedAdapter;
 import edu.uci.ics.asterix.om.types.ARecordType;
-import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.asterix.runtime.operators.file.ADMDataParser;
-import edu.uci.ics.asterix.runtime.operators.file.AbstractTupleParser;
-import edu.uci.ics.asterix.runtime.operators.file.DelimitedDataParser;
-import edu.uci.ics.asterix.runtime.operators.file.IDataParser;
-import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
-import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
-import edu.uci.ics.hyracks.dataflow.std.file.ITupleParser;
 import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
 
 /**
@@ -44,19 +30,16 @@
  * the source file has been ingested.
  */
 
-public class RateControlledFileSystemBasedAdapter extends FileSystemBasedAdapter implements ITypedDatasourceAdapter,
-        IManagedFeedAdapter {
+public class RateControlledFileSystemBasedAdapter extends FileSystemBasedAdapter implements IFeedAdapter {
 
     private static final long serialVersionUID = 1L;
     private FileSystemBasedAdapter coreAdapter;
-    private String format;
 
     public RateControlledFileSystemBasedAdapter(ARecordType atype, Map<String, Object> configuration,
-            FileSystemBasedAdapter coreAdapter, String format) throws Exception {
-        super(atype);
-        this.configuration = configuration;
+            FileSystemBasedAdapter coreAdapter, String format, ITupleParserFactory parserFactory,
+            IHyracksTaskContext ctx) throws Exception {
+        super(parserFactory, atype, ctx);
         this.coreAdapter = coreAdapter;
-        this.format = format;
     }
 
     @Override
@@ -65,183 +48,14 @@
     }
 
     @Override
-    public void initialize(IHyracksTaskContext ctx) throws Exception {
-        coreAdapter.initialize(ctx);
-        this.ctx = ctx;
-    }
-
-    @Override
-    public void configure(Map<String, Object> arguments) throws Exception {
-        coreAdapter.configure(arguments);
-    }
-
-    @Override
-    public AdapterType getAdapterType() {
-        return coreAdapter.getAdapterType();
-    }
-
-    @Override
-    protected ITupleParser getTupleParser() throws Exception {
-        ITupleParser parser = null;
-        if (format.equals(FORMAT_DELIMITED_TEXT)) {
-            parser = getRateControlledDelimitedDataTupleParser((ARecordType) atype);
-        } else if (format.equals(FORMAT_ADM)) {
-            parser = getRateControlledADMDataTupleParser((ARecordType) atype);
-        } else {
-            throw new IllegalArgumentException(" format " + configuration.get(KEY_FORMAT) + " not supported");
-        }
-        return parser;
-
-    }
-
-    protected ITupleParser getRateControlledDelimitedDataTupleParser(ARecordType recordType) throws AsterixException {
-        ITupleParser parser;
-        int n = recordType.getFieldTypes().length;
-        IValueParserFactory[] fieldParserFactories = new IValueParserFactory[n];
-        for (int i = 0; i < n; i++) {
-            ATypeTag tag = recordType.getFieldTypes()[i].getTypeTag();
-            IValueParserFactory vpf = typeToValueParserFactMap.get(tag);
-            if (vpf == null) {
-                throw new NotImplementedException("No value parser factory for delimited fields of type " + tag);
-            }
-            fieldParserFactories[i] = vpf;
-
-        }
-        String delimiterValue = (String) configuration.get(KEY_DELIMITER);
-        if (delimiterValue != null && delimiterValue.length() > 1) {
-            throw new AsterixException("improper delimiter");
-        }
-
-        Character delimiter = delimiterValue.charAt(0);
-        parser = new RateControlledTupleParserFactory(recordType, fieldParserFactories, delimiter, configuration)
-                .createTupleParser(ctx);
-        return parser;
-    }
-
-    protected ITupleParser getRateControlledADMDataTupleParser(ARecordType recordType) throws AsterixException {
-        ITupleParser parser = null;
-        try {
-            parser = new RateControlledTupleParserFactory(recordType, configuration).createTupleParser(ctx);
-            return parser;
-        } catch (Exception e) {
-            throw new AsterixException(e);
-        }
-
-    }
-
-    @Override
-    public ARecordType getAdapterOutputType() {
-        return (ARecordType) atype;
-    }
-
-    @Override
     public void alter(Map<String, Object> properties) {
-        ((RateControlledTupleParser) parser).setInterTupleInterval(Long.parseLong((String) properties
+        ((RateControlledTupleParser) tupleParser).setInterTupleInterval(Long.parseLong((String) properties
                 .get(RateControlledTupleParser.INTER_TUPLE_INTERVAL)));
     }
 
     @Override
     public void stop() {
-        ((RateControlledTupleParser) parser).stop();
+        ((RateControlledTupleParser) tupleParser).stop();
     }
 
-    @Override
-    public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
-        return coreAdapter.getPartitionConstraint();
-    }
-}
-
-class RateControlledTupleParserFactory implements ITupleParserFactory {
-
-    private static final long serialVersionUID = 1L;
-
-    private final ARecordType recordType;
-    private final IDataParser dataParser;
-    private final Map<String, Object> configuration;
-
-    public RateControlledTupleParserFactory(ARecordType recordType, IValueParserFactory[] valueParserFactories,
-            char fieldDelimiter, Map<String, Object> configuration) {
-        this.recordType = recordType;
-        dataParser = new DelimitedDataParser(recordType, valueParserFactories, fieldDelimiter);
-        this.configuration = configuration;
-    }
-
-    public RateControlledTupleParserFactory(ARecordType recordType, Map<String, Object> configuration) {
-        this.recordType = recordType;
-        dataParser = new ADMDataParser();
-        this.configuration = configuration;
-    }
-
-    @Override
-    public ITupleParser createTupleParser(IHyracksTaskContext ctx) {
-        return new RateControlledTupleParser(ctx, recordType, dataParser, configuration);
-    }
-
-}
-
-class RateControlledTupleParser extends AbstractTupleParser {
-
-    private final IDataParser dataParser;
-    private long interTupleInterval;
-    private boolean delayConfigured;
-    private boolean continueIngestion = true;
-
-    public static final String INTER_TUPLE_INTERVAL = "tuple-interval";
-
-    public RateControlledTupleParser(IHyracksTaskContext ctx, ARecordType recType, IDataParser dataParser,
-            Map<String, Object> configuration) {
-        super(ctx, recType);
-        this.dataParser = dataParser;
-        String propValue = (String) configuration.get(INTER_TUPLE_INTERVAL);
-        if (propValue != null) {
-            interTupleInterval = Long.parseLong(propValue);
-        } else {
-            interTupleInterval = 0;
-        }
-        delayConfigured = interTupleInterval != 0;
-    }
-
-    public void setInterTupleInterval(long val) {
-        this.interTupleInterval = val;
-        this.delayConfigured = val > 0;
-    }
-
-    public void stop() {
-        continueIngestion = false;
-    }
-
-    @Override
-    public IDataParser getDataParser() {
-        return dataParser;
-    }
-
-    @Override
-    public void parse(InputStream in, IFrameWriter writer) throws HyracksDataException {
-
-        appender.reset(frame, true);
-        IDataParser parser = getDataParser();
-        try {
-            parser.initialize(in, recType, true);
-            while (continueIngestion) {
-                tb.reset();
-                if (!parser.parse(tb.getDataOutput())) {
-                    break;
-                }
-                tb.addFieldEndOffset();
-                if (delayConfigured) {
-                    Thread.sleep(interTupleInterval);
-                }
-                addTupleToFrame(writer);
-            }
-            if (appender.getTupleCount() > 0) {
-                FrameUtils.flushFrame(frame, writer);
-            }
-        } catch (AsterixException ae) {
-            throw new HyracksDataException(ae);
-        } catch (IOException ioe) {
-            throw new HyracksDataException(ioe);
-        } catch (InterruptedException ie) {
-            throw new HyracksDataException(ie);
-        }
-    }
 }
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapterFactory.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapterFactory.java
index ff3108f..16812d9 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapterFactory.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapterFactory.java
@@ -14,14 +14,32 @@
  */
 package edu.uci.ics.asterix.tools.external.data;
 
+import java.io.IOException;
+import java.io.InputStream;
 import java.util.Map;
 
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.external.adapter.factory.FileSystemAdapterFactory;
+import edu.uci.ics.asterix.external.adapter.factory.HDFSAdapterFactory;
+import edu.uci.ics.asterix.external.adapter.factory.NCFileSystemAdapterFactory;
 import edu.uci.ics.asterix.external.dataset.adapter.FileSystemBasedAdapter;
+import edu.uci.ics.asterix.metadata.feeds.IAdapterFactory;
 import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
-import edu.uci.ics.asterix.metadata.feeds.IGenericDatasetAdapterFactory;
 import edu.uci.ics.asterix.om.types.ARecordType;
-import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.runtime.operators.file.ADMDataParser;
+import edu.uci.ics.asterix.runtime.operators.file.AbstractTupleParser;
+import edu.uci.ics.asterix.runtime.operators.file.DelimitedDataParser;
+import edu.uci.ics.asterix.runtime.operators.file.IDataParser;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
+import edu.uci.ics.hyracks.dataflow.std.file.ITupleParser;
+import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
 
 /**
  * Factory class for creating @see{RateControllerFileSystemBasedAdapter} The
@@ -29,38 +47,24 @@
  * on the local file system or on HDFS. The feed ends when the content of the
  * source file has been ingested.
  */
-public class RateControlledFileSystemBasedAdapterFactory implements IGenericDatasetAdapterFactory {
+public class RateControlledFileSystemBasedAdapterFactory extends FileSystemAdapterFactory {
     private static final long serialVersionUID = 1L;
-    
+
     public static final String KEY_FILE_SYSTEM = "fs";
     public static final String LOCAL_FS = "localfs";
     public static final String HDFS = "hdfs";
     public static final String KEY_PATH = "path";
     public static final String KEY_FORMAT = "format";
 
-    private IGenericDatasetAdapterFactory adapterFactory;
+    private IAdapterFactory adapterFactory;
     private String format;
-    private boolean setup = false;
+    private Map<String, Object> configuration;
+    private ARecordType atype;
 
     @Override
-    public IDatasourceAdapter createAdapter(Map<String, Object> configuration, IAType type) throws Exception {
-        if (!setup) {
-            checkRequiredArgs(configuration);
-            String fileSystem = (String) configuration.get(KEY_FILE_SYSTEM);
-            String adapterFactoryClass = null;
-            if (fileSystem.equalsIgnoreCase(LOCAL_FS)) {
-                adapterFactoryClass = "edu.uci.ics.asterix.external.adapter.factory.NCFileSystemAdapterFactory";
-            } else if (fileSystem.equals(HDFS)) {
-                adapterFactoryClass = "edu.uci.ics.asterix.external.adapter.factory.HDFSAdapterFactory";
-            } else {
-                throw new AsterixException("Unsupported file system type " + fileSystem);
-            }
-            format = (String) configuration.get(KEY_FORMAT);
-            adapterFactory = (IGenericDatasetAdapterFactory) Class.forName(adapterFactoryClass).newInstance();
-            setup = true;
-        }
-        return new RateControlledFileSystemBasedAdapter((ARecordType) type, configuration,
-                (FileSystemBasedAdapter) adapterFactory.createAdapter(configuration, type), format);
+    public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx) throws Exception {
+        FileSystemBasedAdapter coreAdapter = (FileSystemBasedAdapter) adapterFactory.createAdapter(ctx);
+        return new RateControlledFileSystemBasedAdapter(atype, configuration, coreAdapter, format, parserFactory, ctx);
     }
 
     @Override
@@ -72,7 +76,7 @@
         if (configuration.get(KEY_FILE_SYSTEM) == null) {
             throw new Exception("File system type not specified. (fs=?) File system could be 'localfs' or 'hdfs'");
         }
-        if (configuration.get(IGenericDatasetAdapterFactory.KEY_TYPE_NAME) == null) {
+        if (configuration.get(KEY_SOURCE_DATATYPE) == null) {
             throw new Exception("Record type not specified (output-type-name=?)");
         }
         if (configuration.get(KEY_PATH) == null) {
@@ -83,4 +87,186 @@
         }
     }
 
+    @Override
+    public AdapterType getAdapterType() {
+        return AdapterType.GENERIC;
+    }
+
+    @Override
+    public SupportedOperation getSupportedOperations() {
+        return SupportedOperation.READ;
+    }
+
+    @Override
+    public void configure(Map<String, Object> configuration) throws Exception {
+        this.configuration = configuration;
+        checkRequiredArgs(configuration);
+        String fileSystem = (String) configuration.get(KEY_FILE_SYSTEM);
+        String adapterFactoryClass = null;
+        if (fileSystem.equalsIgnoreCase(LOCAL_FS)) {
+            adapterFactoryClass = NCFileSystemAdapterFactory.class.getName();
+        } else if (fileSystem.equals(HDFS)) {
+            adapterFactoryClass = HDFSAdapterFactory.class.getName();
+        } else {
+            throw new AsterixException("Unsupported file system type " + fileSystem);
+        }
+        format = (String) configuration.get(KEY_FORMAT);
+        adapterFactory = (IAdapterFactory) Class.forName(adapterFactoryClass).newInstance();
+        adapterFactory.configure(configuration);
+
+        atype = (ARecordType) configuration.get(KEY_SOURCE_DATATYPE);
+        configureFormat();
+    }
+
+    @Override
+    public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
+        return adapterFactory.getPartitionConstraint();
+    }
+
+    private void configureFormat() throws AsterixException {
+        switch (format) {
+            case FORMAT_ADM:
+                parserFactory = new RateControlledTupleParserFactory(atype, configuration);
+                break;
+
+            case FORMAT_DELIMITED_TEXT:
+                String delimiterValue = (String) configuration.get(KEY_DELIMITER);
+                if (delimiterValue != null && delimiterValue.length() > 1) {
+                    throw new AsterixException("improper delimiter");
+                }
+                IValueParserFactory[] valueParserFactories = getValueParserFactories(atype);
+                parserFactory = new RateControlledTupleParserFactory(atype, valueParserFactories,
+                        delimiterValue.charAt(0), configuration);
+                break;
+        }
+    }
+
+    protected IValueParserFactory[] getValueParserFactories(ARecordType recordType) throws AsterixException {
+        int n = recordType.getFieldTypes().length;
+        IValueParserFactory[] fieldParserFactories = new IValueParserFactory[n];
+        for (int i = 0; i < n; i++) {
+            ATypeTag tag = recordType.getFieldTypes()[i].getTypeTag();
+            IValueParserFactory vpf = typeToValueParserFactMap.get(tag);
+            if (vpf == null) {
+                throw new NotImplementedException("No value parser factory for delimited fields of type " + tag);
+            }
+            fieldParserFactories[i] = vpf;
+
+        }
+        return fieldParserFactories;
+    }
+
+}
+
+class RateControlledTupleParserFactory implements ITupleParserFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    private final ARecordType recordType;
+    private final Map<String, Object> configuration;
+    private IValueParserFactory[] valueParserFactories;
+    private char delimiter;
+    private final ParserType parserType;
+
+    public enum ParserType {
+        ADM,
+        DELIMITED_DATA
+    }
+
+    public RateControlledTupleParserFactory(ARecordType recordType, IValueParserFactory[] valueParserFactories,
+            char fieldDelimiter, Map<String, Object> configuration) {
+        this.recordType = recordType;
+        this.valueParserFactories = valueParserFactories;
+        this.delimiter = fieldDelimiter;
+        this.configuration = configuration;
+        this.parserType = ParserType.DELIMITED_DATA;
+    }
+
+    public RateControlledTupleParserFactory(ARecordType recordType, Map<String, Object> configuration) {
+        this.recordType = recordType;
+        this.configuration = configuration;
+        this.parserType = ParserType.ADM;
+    }
+
+    @Override
+    public ITupleParser createTupleParser(IHyracksTaskContext ctx) {
+        IDataParser dataParser = null;
+        switch (parserType) {
+            case ADM:
+                dataParser = new ADMDataParser();
+                break;
+            case DELIMITED_DATA:
+                dataParser = new DelimitedDataParser(recordType, valueParserFactories, delimiter);
+                break;
+        }
+        return new RateControlledTupleParser(ctx, recordType, dataParser, configuration);
+    }
+
+}
+
+class RateControlledTupleParser extends AbstractTupleParser {
+
+    private final IDataParser dataParser;
+    private long interTupleInterval;
+    private boolean delayConfigured;
+    private boolean continueIngestion = true;
+
+    public static final String INTER_TUPLE_INTERVAL = "tuple-interval";
+
+    public RateControlledTupleParser(IHyracksTaskContext ctx, ARecordType recType, IDataParser dataParser,
+            Map<String, Object> configuration) {
+        super(ctx, recType);
+        this.dataParser = dataParser;
+        String propValue = (String) configuration.get(INTER_TUPLE_INTERVAL);
+        if (propValue != null) {
+            interTupleInterval = Long.parseLong(propValue);
+        } else {
+            interTupleInterval = 0;
+        }
+        delayConfigured = interTupleInterval != 0;
+    }
+
+    public void setInterTupleInterval(long val) {
+        this.interTupleInterval = val;
+        this.delayConfigured = val > 0;
+    }
+
+    public void stop() {
+        continueIngestion = false;
+    }
+
+    @Override
+    public IDataParser getDataParser() {
+        return dataParser;
+    }
+
+    @Override
+    public void parse(InputStream in, IFrameWriter writer) throws HyracksDataException {
+
+        appender.reset(frame, true);
+        IDataParser parser = getDataParser();
+        try {
+            parser.initialize(in, recType, true);
+            while (continueIngestion) {
+                tb.reset();
+                if (!parser.parse(tb.getDataOutput())) {
+                    break;
+                }
+                tb.addFieldEndOffset();
+                if (delayConfigured) {
+                    Thread.sleep(interTupleInterval);
+                }
+                addTupleToFrame(writer);
+            }
+            if (appender.getTupleCount() > 0) {
+                FrameUtils.flushFrame(frame, writer);
+            }
+        } catch (AsterixException ae) {
+            throw new HyracksDataException(ae);
+        } catch (IOException ioe) {
+            throw new HyracksDataException(ioe);
+        } catch (InterruptedException ie) {
+            throw new HyracksDataException(ie);
+        }
+    }
 }
\ No newline at end of file
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapter.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapter.java
index 63ae8f2..fe4212d 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapter.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapter.java
@@ -38,7 +38,9 @@
     private static final long serialVersionUID = 1L;
     private Map<String, Object> configuration;
 
-    public SyntheticTwitterFeedAdapter(Map<String, Object> configuration) throws AsterixException {
+    public SyntheticTwitterFeedAdapter(Map<String, Object> configuration, IHyracksTaskContext ctx)
+            throws AsterixException {
+        super(configuration, ctx);
         this.configuration = configuration;
 
         String[] userFieldNames = new String[] { "screen-name", "lang", "friends_count", "statuses_count", "name",
@@ -58,31 +60,11 @@
     }
 
     @Override
-    public AdapterType getAdapterType() {
-        return AdapterType.READ;
-    }
-
-    @Override
-    public void configure(Map<String, Object> configuration) throws Exception {
-        this.configuration = configuration;
-
-    }
-
-    @Override
-    public void initialize(IHyracksTaskContext ctx) throws Exception {
-        this.ctx = ctx;
-    }
-
-    @Override
     public IPullBasedFeedClient getFeedClient(int partition) throws Exception {
         return new SyntheticTwitterFeedClient(configuration, adapterOutputType, partition);
     }
 
-    @Override
-    public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
-        return new AlgebricksCountPartitionConstraint(1);
-    }
-
+  
     private static class SyntheticTwitterFeedClient extends PullBasedFeedClient implements IPullBasedFeedClient {
 
         private static final Logger LOGGER = Logger.getLogger(SyntheticTwitterFeedClient.class.getName());
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapterFactory.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapterFactory.java
index 7832f11..48b1d20 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapterFactory.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapterFactory.java
@@ -16,8 +16,11 @@
 
 import java.util.Map;
 
+import edu.uci.ics.asterix.metadata.feeds.IAdapterFactory;
 import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
-import edu.uci.ics.asterix.metadata.feeds.ITypedDatasetAdapterFactory;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 
 /**
  * Factory class for creating @see{RateControllerFileSystemBasedAdapter} The
@@ -25,21 +28,43 @@
  * on the local file system or on HDFS. The feed ends when the content of the
  * source file has been ingested.
  */
-public class SyntheticTwitterFeedAdapterFactory implements ITypedDatasetAdapterFactory {
+public class SyntheticTwitterFeedAdapterFactory implements IAdapterFactory {
 
     /**
      * 
      */
     private static final long serialVersionUID = 1L;
 
+    private Map<String, Object> configuration;
+
     @Override
     public String getName() {
         return "synthetic_twitter_feed";
     }
 
     @Override
-    public IDatasourceAdapter createAdapter(Map<String, Object> configuration) throws Exception {
-        return new SyntheticTwitterFeedAdapter(configuration);
+    public AdapterType getAdapterType() {
+        return AdapterType.TYPED;
+    }
+
+    @Override
+    public SupportedOperation getSupportedOperations() {
+        return SupportedOperation.READ;
+    }
+
+    @Override
+    public void configure(Map<String, Object> configuration) throws Exception {
+        this.configuration = configuration;
+    }
+
+    @Override
+    public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
+        return new AlgebricksCountPartitionConstraint(1);
+    }
+
+    @Override
+    public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx) throws Exception {
+        return new SyntheticTwitterFeedAdapter(configuration, ctx);
     }
 
 }
\ No newline at end of file