checkpoint
diff --git a/asterix-events/src/main/java/edu/uci/ics/asterix/event/util/AsterixConstants.java b/asterix-events/src/main/java/edu/uci/ics/asterix/event/util/AsterixConstants.java
index ff600ce..c29a96e 100644
--- a/asterix-events/src/main/java/edu/uci/ics/asterix/event/util/AsterixConstants.java
+++ b/asterix-events/src/main/java/edu/uci/ics/asterix/event/util/AsterixConstants.java
@@ -1,3 +1,17 @@
+/*
+x * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package edu.uci.ics.asterix.event.util;
 
 public class AsterixConstants {
diff --git a/asterix-events/src/main/java/edu/uci/ics/asterix/event/util/PatternCreator.java b/asterix-events/src/main/java/edu/uci/ics/asterix/event/util/PatternCreator.java
index e06d66c..9af9307 100644
--- a/asterix-events/src/main/java/edu/uci/ics/asterix/event/util/PatternCreator.java
+++ b/asterix-events/src/main/java/edu/uci/ics/asterix/event/util/PatternCreator.java
@@ -499,6 +499,9 @@
 
         for (Node node : cluster.getNode()) {
             nodeid = new Nodeid(new Value(null, node.getId()));
+            if (node.getLogDir() != null) {
+                pargs = node.getLogDir();
+            }
             event = new Event("file_delete", nodeid, pargs);
             patternList.add(new Pattern(null, 1, null, event));
         }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/CNNFeedAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/CNNFeedAdapterFactory.java
index bf89ccb..fa66715 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/CNNFeedAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/CNNFeedAdapterFactory.java
@@ -19,7 +19,7 @@
 import java.util.List;
 import java.util.Map;
 
-import edu.uci.ics.asterix.external.dataset.adapter.CNNFeedAdapter;
+import edu.uci.ics.asterix.external.dataset.adapter.RSSFeedAdapter;
 import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
 import edu.uci.ics.asterix.metadata.feeds.ITypedAdapterFactory;
 import edu.uci.ics.asterix.om.types.ARecordType;
@@ -81,7 +81,7 @@
 
     @Override
     public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception {
-        CNNFeedAdapter cnnFeedAdapter = new CNNFeedAdapter(configuration, recordType, ctx);
+        RSSFeedAdapter cnnFeedAdapter = new RSSFeedAdapter(configuration, recordType, ctx);
         return cnnFeedAdapter;
     }
 
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java
index 054fee3..669dc61 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java
@@ -14,49 +14,23 @@
  */
 package edu.uci.ics.asterix.external.adapter.factory;
 
-import java.util.Arrays;
-import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-
 import edu.uci.ics.asterix.external.dataset.adapter.HDFSAdapter;
 import edu.uci.ics.asterix.external.dataset.adapter.HiveAdapter;
 import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
 import edu.uci.ics.asterix.metadata.feeds.IGenericAdapterFactory;
 import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.asterix.om.types.IAType;
-import edu.uci.ics.asterix.om.util.AsterixClusterProperties;
-import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.hdfs.dataflow.ConfFactory;
-import edu.uci.ics.hyracks.hdfs.dataflow.InputSplitsFactory;
-import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
 
 /**
  * A factory class for creating an instance of HiveAdapter
  */
-@SuppressWarnings("deprecation")
 public class HiveAdapterFactory extends StreamBasedAdapterFactory implements IGenericAdapterFactory {
     private static final long serialVersionUID = 1L;
 
-    public static final String HDFS_ADAPTER_NAME = "hdfs";
-    public static final String CLUSTER_LOCATIONS = "cluster-locations";
-    public static transient String SCHEDULER = "hdfs-scheduler";
-
-    public static final String KEY_HDFS_URL = "hdfs";
-    public static final String KEY_PATH = "path";
-    public static final String KEY_INPUT_FORMAT = "input-format";
-    public static final String INPUT_FORMAT_TEXT = "text-input-format";
-    public static final String INPUT_FORMAT_SEQUENCE = "sequence-input-format";
-
-    public static final String KEY_FORMAT = "format";
-    public static final String KEY_PARSER_FACTORY = "parser";
-    public static final String FORMAT_DELIMITED_TEXT = "delimited-text";
-    public static final String FORMAT_ADM = "adm";
-
     public static final String HIVE_DATABASE = "database";
     public static final String HIVE_TABLE = "table";
     public static final String HIVE_HOME = "hive-home";
@@ -64,30 +38,19 @@
     public static final String HIVE_WAREHOUSE_DIR = "warehouse-dir";
     public static final String HIVE_METASTORE_RAWSTORE_IMPL = "rawstore-impl";
 
-    private String[] readSchedule;
-    private boolean executed[];
-    private InputSplitsFactory inputSplitsFactory;
-    private ConfFactory confFactory;
-    private transient AlgebricksPartitionConstraint clusterLocations;
+    private HDFSAdapterFactory hdfsAdapterFactory;
+    private HDFSAdapter hdfsAdapter;
     private boolean configured = false;
     private IAType atype;
 
-    private static final Map<String, String> formatClassNames = initInputFormatMap();
-
-    private static Map<String, String> initInputFormatMap() {
-        Map<String, String> formatClassNames = new HashMap<String, String>();
-        formatClassNames.put(INPUT_FORMAT_TEXT, "org.apache.hadoop.mapred.TextInputFormat");
-        formatClassNames.put(INPUT_FORMAT_SEQUENCE, "org.apache.hadoop.mapred.SequenceFileInputFormat");
-        return formatClassNames;
+    public HiveAdapterFactory() {
+        hdfsAdapterFactory = new HDFSAdapterFactory();
     }
 
     @Override
     public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception {
-        JobConf conf = confFactory.getConf();
-        InputSplit[] inputSplits = inputSplitsFactory.getSplits();
-        String nodeName = ctx.getJobletContext().getApplicationContext().getNodeId();
-        HiveAdapter hiveAdapter = new HiveAdapter(atype, readSchedule, executed, inputSplits, conf, clusterLocations,
-                nodeName, parserFactory, ctx);
+        hdfsAdapter = (HDFSAdapter) hdfsAdapterFactory.createAdapter(ctx, partition);
+        HiveAdapter hiveAdapter = new HiveAdapter(atype, hdfsAdapter, parserFactory, ctx);
         return hiveAdapter;
     }
 
@@ -109,32 +72,12 @@
     @Override
     public void configure(Map<String, String> configuration, ARecordType outputType) throws Exception {
         if (!configured) {
-            /** set up the factory --serializable stuff --- this if-block should be called only once for each factory instance */
-            configureJobConf(configuration);
-            JobConf conf = configureJobConf(configuration);
-            confFactory = new ConfFactory(conf);
-
-            clusterLocations = AsterixClusterProperties.INSTANCE.getClusterLocations();
-            int numPartitions = ((AlgebricksAbsolutePartitionConstraint) clusterLocations).getLocations().length;
-
-            InputSplit[] inputSplits = conf.getInputFormat().getSplits(conf, numPartitions);
-            inputSplitsFactory = new InputSplitsFactory(inputSplits);
-
-            Scheduler scheduler = HDFSAdapterFactory.hdfsScheduler;
-            readSchedule = scheduler.getLocationConstraints(inputSplits);
-            executed = new boolean[readSchedule.length];
-            Arrays.fill(executed, false);
-
-            atype = (IAType) outputType;
-            configureFormat(atype);
-            configured = true;
+            populateConfiguration(configuration);
+            hdfsAdapterFactory.configure(configuration, outputType);
         }
-
     }
 
-    private JobConf configureJobConf(Map<String, String> configuration) throws Exception {
-        JobConf conf = new JobConf();
-
+    private void populateConfiguration(Map<String, String> configuration) throws Exception {
         /** configure hive */
         String database = (String) configuration.get(HIVE_DATABASE);
         String tablePath = null;
@@ -154,21 +97,11 @@
             throw new IllegalArgumentException("file input format"
                     + configuration.get(HDFSAdapterFactory.KEY_INPUT_FORMAT) + " is not supported");
         }
-
-        /** configure hdfs */
-        conf.set("fs.default.name", ((String) configuration.get(KEY_HDFS_URL)).trim());
-        conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
-        conf.setClassLoader(HDFSAdapter.class.getClassLoader());
-        conf.set("mapred.input.dir", ((String) configuration.get(KEY_PATH)).trim());
-        conf.set("mapred.input.format.class",
-                (String) formatClassNames.get(((String) configuration.get(KEY_INPUT_FORMAT)).trim()));
-        return conf;
     }
 
     @Override
     public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
-        // TODO Auto-generated method stub
-        return null;
+        return hdfsAdapterFactory.getPartitionConstraint();
     }
 
 }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/CNNFeedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/CNNFeedAdapter.java
deleted file mode 100644
index 017b511..0000000
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/CNNFeedAdapter.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.asterix.external.dataset.adapter;
-
-import java.util.Map;
-
-import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
-import edu.uci.ics.asterix.metadata.feeds.IFeedAdapter;
-import edu.uci.ics.asterix.om.types.ARecordType;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-
-/**
- * An Adapter that provides the functionality of fetching news feed from CNN service
- * The Adapter provides news feed as ADM records.
- */
-public class CNNFeedAdapter extends RSSFeedAdapter implements IDatasourceAdapter, IFeedAdapter {
-
-    private static final long serialVersionUID = 1L;
-
-    public CNNFeedAdapter(Map<String, String> configuration, ARecordType recordType, IHyracksTaskContext ctx)
-            throws AsterixException {
-        super(configuration, recordType, ctx);
-    }
-
-}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java
index 1e694e2..1a046a5 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java
@@ -27,7 +27,6 @@
 import org.apache.hadoop.mapred.TextInputFormat;
 
 import edu.uci.ics.asterix.om.types.IAType;
-import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HiveAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HiveAdapter.java
index f4ff44e..6280635 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HiveAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HiveAdapter.java
@@ -17,11 +17,7 @@
 import java.io.IOException;
 import java.io.InputStream;
 
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-
 import edu.uci.ics.asterix.om.types.IAType;
-import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
@@ -29,26 +25,16 @@
 /**
  * Provides the functionality of fetching data in form of ADM records from a Hive dataset.
  */
-@SuppressWarnings("deprecation")
 public class HiveAdapter extends FileSystemBasedAdapter {
 
     private static final long serialVersionUID = 1L;
 
-    public static final String HIVE_DATABASE = "database";
-    public static final String HIVE_TABLE = "table";
-    public static final String HIVE_HOME = "hive-home";
-    public static final String HIVE_METASTORE_URI = "metastore-uri";
-    public static final String HIVE_WAREHOUSE_DIR = "warehouse-dir";
-    public static final String HIVE_METASTORE_RAWSTORE_IMPL = "rawstore-impl";
-
     private HDFSAdapter hdfsAdapter;
 
-    public HiveAdapter(IAType atype, String[] readSchedule, boolean[] executed, InputSplit[] inputSplits, JobConf conf,
-            AlgebricksPartitionConstraint clusterLocations, String nodeName, ITupleParserFactory parserFactory,
-            IHyracksTaskContext ctx) throws HyracksDataException {
+    public HiveAdapter(IAType atype, HDFSAdapter hdfsAdapter, ITupleParserFactory parserFactory, IHyracksTaskContext ctx)
+            throws HyracksDataException {
         super(parserFactory, atype, ctx);
-        this.hdfsAdapter = new HDFSAdapter(atype, readSchedule, executed, inputSplits, conf, nodeName, parserFactory,
-                ctx);
+        this.hdfsAdapter = hdfsAdapter;
     }
 
     @Override
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/IPullBasedFeedClient.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/IPullBasedFeedClient.java
index 2e79679..be3a2fd 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/IPullBasedFeedClient.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/IPullBasedFeedClient.java
@@ -39,14 +39,4 @@
      */
     public InflowState nextTuple(DataOutput dataOutput, int timeout) throws AsterixException;
 
-    /**
-     * Provides logic for any corrective action that feed client needs to execute on
-     * encountering an exception.
-     * 
-     * @param e
-     *            The exception encountered during fetching of data from external source
-     * @throws AsterixException
-     */
-    public void resetOnFailure(Exception e) throws AsterixException;
-
 }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAdapter.java
index 3f3ca50..193cce4 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAdapter.java
@@ -20,9 +20,8 @@
 import java.util.logging.Logger;
 
 import edu.uci.ics.asterix.external.dataset.adapter.IPullBasedFeedClient.InflowState;
-import edu.uci.ics.asterix.metadata.feeds.AbstractFeedDatasourceAdapter;
-import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
-import edu.uci.ics.asterix.metadata.feeds.IFeedAdapter;
+import edu.uci.ics.asterix.metadata.feeds.FeedPolicyEnforcer;
+import edu.uci.ics.asterix.metadata.feeds.IPullBasedFeedAdapter;
 import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -36,8 +35,7 @@
  * the common logic for obtaining bytes from an external source and packing them
  * into frames as tuples.
  */
-public abstract class PullBasedAdapter extends AbstractFeedDatasourceAdapter implements IDatasourceAdapter,
-        IFeedAdapter {
+public abstract class PullBasedAdapter implements IPullBasedFeedAdapter {
 
     private static final long serialVersionUID = 1L;
     private static final Logger LOGGER = Logger.getLogger(PullBasedAdapter.class.getName());
@@ -55,6 +53,16 @@
     private final IHyracksTaskContext ctx;
     private int frameTupleCount = 0;
 
+    protected FeedPolicyEnforcer policyEnforcer;
+
+    public FeedPolicyEnforcer getPolicyEnforcer() {
+        return policyEnforcer;
+    }
+
+    public void setFeedPolicyEnforcer(FeedPolicyEnforcer policyEnforcer) {
+        this.policyEnforcer = policyEnforcer;
+    }
+
     public abstract IPullBasedFeedClient getFeedClient(int partition) throws Exception;
 
     public PullBasedAdapter(Map<String, String> configuration, IHyracksTaskContext ctx) {
@@ -112,7 +120,6 @@
                     failureException.printStackTrace();
                     boolean continueIngestion = policyEnforcer.continueIngestionPostSoftwareFailure(failureException);
                     if (continueIngestion) {
-                        pullBasedFeedClient.resetOnFailure(failureException);
                         tupleBuilder.reset();
                         continue;
                     } else {
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java
index 5aafef4..e9cc21b 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java
@@ -49,4 +49,9 @@
         return recordType;
     }
 
+    @Override
+    public DataExchangeMode getDataExchangeMode() {
+        return DataExchangeMode.PULL;
+    }
+
 }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java
index 0cd14b8..75f8fba 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java
@@ -17,7 +17,6 @@
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
-import java.util.logging.Logger;
 
 import twitter4j.Query;
 import twitter4j.QueryResult;
@@ -25,7 +24,6 @@
 import twitter4j.Twitter;
 import twitter4j.TwitterException;
 import twitter4j.TwitterFactory;
-import edu.uci.ics.asterix.common.exceptions.AsterixException;
 import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ARecordSerializerDeserializer;
 import edu.uci.ics.asterix.om.base.AMutableRecord;
 import edu.uci.ics.asterix.om.base.AMutableString;
@@ -51,8 +49,6 @@
     private ARecordType recordType;
     private int nextTweetIndex = 0;
 
-    private static final Logger LOGGER = Logger.getLogger(PullBasedTwitterFeedClient.class.getName());
-
     public PullBasedTwitterFeedClient(IHyracksTaskContext ctx, PullBasedTwitterAdapter adapter) {
         twitter = new TwitterFactory().getInstance();
         mutableFields = new IAObject[] { new AMutableString(null), new AMutableString(null), new AMutableString(null),
@@ -92,11 +88,6 @@
         return InflowState.DATA_AVAILABLE;
     }
 
-    @Override
-    public void resetOnFailure(Exception e) throws AsterixException {
-        // TOOO: implement resetting logic for Twitter
-    }
-
     private void initialize(Map<String, String> params) {
         this.keywords = (String) params.get(PullBasedTwitterAdapter.QUERY);
         this.requestInterval = Integer.parseInt((String) params.get(PullBasedTwitterAdapter.INTERVAL));
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java
index 6ace5c5..4eea034 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java
@@ -30,16 +30,14 @@
 
     private static final long serialVersionUID = 1L;
 
+    private static final String KEY_RSS_URL = "rss_url";
+
     private List<String> feedURLs = new ArrayList<String>();
-    private boolean isStopRequested = false;
     private String id_prefix = "";
 
     private IPullBasedFeedClient rssFeedClient;
-    private ARecordType recordType;
 
-    public boolean isStopRequested() {
-        return isStopRequested;
-    }
+    private ARecordType recordType;
 
     public RSSFeedAdapter(Map<String, String> configuration, ARecordType recordType, IHyracksTaskContext ctx)
             throws AsterixException {
@@ -48,15 +46,6 @@
         this.recordType = recordType;
     }
 
-    public void setStopRequested(boolean isStopRequested) {
-        this.isStopRequested = isStopRequested;
-    }
-
-    @Override
-    public void stop() {
-        isStopRequested = true;
-    }
-
     private void initializeFeedURLs(String rssURLProperty) {
         feedURLs.clear();
         String[] feedURLProperty = rssURLProperty.split(",");
@@ -66,7 +55,7 @@
     }
 
     protected void reconfigure(Map<String, String> arguments) {
-        String rssURLProperty = configuration.get("KEY_RSS_URL");
+        String rssURLProperty = configuration.get(KEY_RSS_URL);
         if (rssURLProperty != null) {
             initializeFeedURLs(rssURLProperty);
         }
@@ -84,4 +73,9 @@
         return recordType;
     }
 
+    @Override
+    public DataExchangeMode getDataExchangeMode() {
+        return DataExchangeMode.PULL;
+    }
+
 }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedClient.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedClient.java
index 0522d42..41ed923 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedClient.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedClient.java
@@ -18,7 +18,6 @@
 import java.net.URL;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
 import java.util.Queue;
 
 import com.sun.syndication.feed.synd.SyndEntryImpl;
@@ -42,7 +41,6 @@
 @SuppressWarnings("rawtypes")
 public class RSSFeedClient extends PullBasedFeedClient {
 
-    private final String feedURL;
     private long id = 0;
     private String idPrefix;
     private boolean feedModified = false;
@@ -67,9 +65,8 @@
     }
 
     public RSSFeedClient(RSSFeedAdapter adapter, String feedURL, String id_prefix) throws MalformedURLException {
-        this.feedURL = feedURL;
         this.idPrefix = id_prefix;
-        feedUrl = new URL(feedURL);
+        this.feedUrl = new URL(feedURL);
         feedInfoCache = HashMapFeedInfoCache.getInstance();
         fetcher = new HttpURLFeedFetcher(feedInfoCache);
         listener = new FetcherEventListenerImpl(this);
@@ -132,12 +129,6 @@
         }
     }
 
-    @Override
-    public void resetOnFailure(Exception e) {
-        // TODO Auto-generated method stub
-
-    }
-
 }
 
 class FetcherEventListenerImpl implements FetcherListener {
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/StreamBasedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/StreamBasedAdapter.java
index 1871111..f09a841 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/StreamBasedAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/StreamBasedAdapter.java
@@ -5,7 +5,6 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import edu.uci.ics.asterix.metadata.feeds.AdapterRuntimeManager;
 import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
 import edu.uci.ics.asterix.om.types.IAType;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
@@ -20,14 +19,11 @@
 
     protected static final Logger LOGGER = Logger.getLogger(StreamBasedAdapter.class.getName());
 
-    public static final String NODE_RESOLVER_FACTORY_PROPERTY = "node.Resolver";
-
     public abstract InputStream getInputStream(int partition) throws IOException;
 
     protected final ITupleParser tupleParser;
+
     protected final IAType sourceDatatype;
-    protected IHyracksTaskContext ctx;
-    protected AdapterRuntimeManager runtimeManager;
 
     public StreamBasedAdapter(ITupleParserFactory parserFactory, IAType sourceDatatype, IHyracksTaskContext ctx)
             throws HyracksDataException {
@@ -46,4 +42,5 @@
             }
         }
     }
+
 }
diff --git a/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/command/CreateCommand.java b/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/command/CreateCommand.java
index 63d2f33..5b024ec 100644
--- a/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/command/CreateCommand.java
+++ b/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/command/CreateCommand.java
@@ -16,9 +16,6 @@
 
 import java.io.File;
 
-import javax.xml.bind.JAXBContext;
-import javax.xml.bind.Unmarshaller;
-
 import org.kohsuke.args4j.Option;
 
 import edu.uci.ics.asterix.common.configuration.AsterixConfiguration;
diff --git a/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/driver/InstallerDriver.java b/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/driver/InstallerDriver.java
index 53ec136..8461518 100644
--- a/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/driver/InstallerDriver.java
+++ b/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/driver/InstallerDriver.java
@@ -24,7 +24,6 @@
 import edu.uci.ics.asterix.event.service.AsterixEventService;
 import edu.uci.ics.asterix.event.service.ILookupService;
 import edu.uci.ics.asterix.event.service.ServiceProvider;
-import edu.uci.ics.asterix.event.util.PatternCreator;
 import edu.uci.ics.asterix.installer.command.CommandHandler;
 import edu.uci.ics.asterix.installer.schema.conf.Configuration;
 
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AbstractFeedDatasourceAdapter.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AbstractFeedDatasourceAdapter.java
deleted file mode 100644
index 2dc7c56..0000000
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AbstractFeedDatasourceAdapter.java
+++ /dev/null
@@ -1,17 +0,0 @@
-package edu.uci.ics.asterix.metadata.feeds;
-
-public abstract class AbstractFeedDatasourceAdapter implements IDatasourceAdapter {
-
-    private static final long serialVersionUID = 1L;
-
-    protected FeedPolicyEnforcer policyEnforcer;
-
-    public FeedPolicyEnforcer getPolicyEnforcer() {
-        return policyEnforcer;
-    }
-
-    public void setFeedPolicyEnforcer(FeedPolicyEnforcer policyEnforcer) {
-        this.policyEnforcer = policyEnforcer;
-    }
-
-}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
index 3dbd2e2..24c6281 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
@@ -27,6 +27,7 @@
 import edu.uci.ics.asterix.common.feeds.FeedRuntimeManager;
 import edu.uci.ics.asterix.common.feeds.IFeedManager;
 import edu.uci.ics.asterix.metadata.feeds.AdapterRuntimeManager.State;
+import edu.uci.ics.asterix.metadata.feeds.IFeedAdapter.DataExchangeMode;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
@@ -79,8 +80,8 @@
                 adapterRuntimeMgr = new AdapterRuntimeManager(feedId, adapter, feedFrameWriter, partition, inbox,
                         feedManager);
 
-                if (adapter instanceof AbstractFeedDatasourceAdapter) {
-                    ((AbstractFeedDatasourceAdapter) adapter).setFeedPolicyEnforcer(policyEnforcer);
+                if (adapter.getDataExchangeMode().equals(DataExchangeMode.PULL)) {
+                    ((IPullBasedFeedAdapter) adapter).setFeedPolicyEnforcer(policyEnforcer);
                 }
                 if (LOGGER.isLoggable(Level.INFO)) {
                     LOGGER.info("Beginning new feed:" + feedId);
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedAdapter.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedAdapter.java
index f525420..55abd73 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedAdapter.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedAdapter.java
@@ -20,6 +20,16 @@
  */
 public interface IFeedAdapter extends IDatasourceAdapter {
 
+    public enum DataExchangeMode {
+        PULL,
+        PUSH
+    }
+
+    /**
+     * @return
+     */
+    public DataExchangeMode getDataExchangeMode();
+
     /**
      * Discontinue the ingestion of data and end the feed.
      * 
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IPullBasedFeedAdapter.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IPullBasedFeedAdapter.java
new file mode 100644
index 0000000..50641b0
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IPullBasedFeedAdapter.java
@@ -0,0 +1,28 @@
+/*
+x * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.metadata.feeds;
+
+public interface IPullBasedFeedAdapter extends IFeedAdapter {
+
+    /**
+     * @return
+     */
+    public FeedPolicyEnforcer getPolicyEnforcer();
+
+    /**
+     * @param feedPolicyEnforcer
+     */
+    public void setFeedPolicyEnforcer(FeedPolicyEnforcer feedPolicyEnforcer);
+}
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/DataGenerator.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/DataGenerator.java
index b6f693a..ce22887 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/DataGenerator.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/DataGenerator.java
@@ -1,3 +1,17 @@
+/*
+x * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package edu.uci.ics.asterix.tools.external.data;
 
 import java.nio.CharBuffer;
@@ -7,8 +21,6 @@
 import java.util.List;
 import java.util.Random;
 
-import edu.uci.ics.asterix.tools.external.data.TweetGenerator.DataMode;
-
 public class DataGenerator {
 
     private RandomDateGenerator randDateGen;
@@ -32,20 +44,13 @@
     public class TweetMessageIterator implements Iterator<TweetMessage> {
 
         private final int duration;
-        private final GULongIDGenerator[] idGens;
+        private final GULongIDGenerator idGen;
         private long startTime = 0;
-        private int uidGenInUse = 0;
-        private TweetMessage dummyMessage;
-        private DataMode dataMode;
 
-        public TweetMessageIterator(int duration, GULongIDGenerator[] idGens, DataMode dataMode) {
+        public TweetMessageIterator(int duration, GULongIDGenerator idGen) {
             this.duration = duration;
-            this.idGens = idGens;
+            this.idGen = idGen;
             this.startTime = System.currentTimeMillis();
-            if (dataMode.equals(DataMode.REUSE_DATA)) {
-                dummyMessage = next();
-            }
-            this.dataMode = dataMode;
         }
 
         @Override
@@ -56,32 +61,18 @@
         @Override
         public TweetMessage next() {
             TweetMessage msg = null;
-            switch (dataMode) {
-                case NEW_DATA:
-                    getTwitterUser(null);
-                    Message message = randMessageGen.getNextRandomMessage();
-                    Point location = randLocationGen.getRandomPoint();
-                    DateTime sendTime = randDateGen.getNextRandomDatetime();
-                    twMessage.reset(idGens[uidGenInUse].getNextULong(), twUser, location, sendTime,
-                            message.getReferredTopics(), message);
-                    msg = twMessage;
-                    break;
-                case REUSE_DATA:
-                    dummyMessage.setTweetid(idGens[uidGenInUse].getNextULong());
-                    msg = dummyMessage;
-                    break;
-            }
+            getTwitterUser(null);
+            Message message = randMessageGen.getNextRandomMessage();
+            Point location = randLocationGen.getRandomPoint();
+            DateTime sendTime = randDateGen.getNextRandomDatetime();
+            twMessage.reset(idGen.getNextULong(), twUser, location, sendTime, message.getReferredTopics(), message);
+            msg = twMessage;
             return msg;
         }
 
         @Override
         public void remove() {
             // TODO Auto-generated method stub
-
-        }
-
-        public void toggleUidKeySpace() {
-            uidGenInUse = (uidGenInUse + 1) % idGens.length;
         }
 
     }
@@ -130,10 +121,10 @@
         public RandomDateGenerator(Date startDate, Date endDate) {
             this.startDate = startDate;
             this.endDate = endDate;
-            yearDifference = endDate.getYear() - startDate.getYear() + 1;
-            workingDate = new Date();
-            recentDate = new Date();
-            dateTime = new DateTime();
+            this.yearDifference = endDate.getYear() - startDate.getYear() + 1;
+            this.workingDate = new Date();
+            this.recentDate = new Date();
+            this.dateTime = new DateTime();
         }
 
         public Date getStartDate() {
@@ -1167,4 +1158,5 @@
             "Lexicone", "Fax-fax", "Viatechi", "Inchdox", "Kongreen", "Doncare", "Y-geohex", "Opeelectronics",
             "Medflex", "Dancode", "Roundhex", "Labzatron", "Newhotplus", "Sancone", "Ronholdings", "Quoline",
             "zoomplus", "Fix-touch", "Codetechno", "Tanzumbam", "Indiex", "Canline" };
+
 }
\ No newline at end of file
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/GenericSocketFeedAdapter.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/GenericSocketFeedAdapter.java
index 278565a..20b9be1 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/GenericSocketFeedAdapter.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/GenericSocketFeedAdapter.java
@@ -5,7 +5,6 @@
 import java.net.ServerSocket;
 import java.net.Socket;
 import java.util.logging.Level;
-import java.util.logging.Logger;
 
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
 import edu.uci.ics.asterix.external.dataset.adapter.StreamBasedAdapter;
@@ -19,8 +18,6 @@
 
     private static final long serialVersionUID = 1L;
 
-    private static final Logger LOGGER = Logger.getLogger(GenericSocketFeedAdapter.class.getName());
-
     private SocketFeedServer socketFeedServer;
 
     public GenericSocketFeedAdapter(ITupleParserFactory parserFactory, ARecordType outputtype, int port,
@@ -89,4 +86,8 @@
         socketFeedServer.stop();
     }
 
+    public DataExchangeMode getDataExchangeMode() {
+        return DataExchangeMode.PUSH;
+    }
+
 }
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/GenericSocketFeedAdapterFactory.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/GenericSocketFeedAdapterFactory.java
index 4fd0453..dbe3611 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/GenericSocketFeedAdapterFactory.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/GenericSocketFeedAdapterFactory.java
@@ -1,5 +1,5 @@
 /*
-x * Copyright 2009-2012 by The Regents of the University of California
+ * Copyright 2009-2012 by The Regents of the University of California
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
@@ -28,7 +28,6 @@
 import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.asterix.om.util.AsterixRuntimeUtil;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -102,11 +101,11 @@
             mode = Mode.valueOf(modeValue.trim().toUpperCase());
         }
         String socketsValue = configuration.get(KEY_SOCKETS);
-        Map<String, Set<String>> ncMap = AsterixRuntimeUtil.getNodeControllerMap();
-        List<String> ncs = AsterixRuntimeUtil.getAllNodeControllers();
         if (socketsValue == null) {
             throw new IllegalArgumentException("\'sockets\' parameter not specified as part of adaptor configuration");
         }
+        Map<String, Set<String>> ncMap = AsterixRuntimeUtil.getNodeControllerMap();
+        List<String> ncs = AsterixRuntimeUtil.getAllNodeControllers();
         String[] socketsArray = socketsValue.split(",");
         Random random = new Random();
         for (String socket : socketsArray) {
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapter.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapter.java
index a92cb55..8f169f2 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapter.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapter.java
@@ -51,5 +51,10 @@
     public void stop() {
         ((RateControlledTupleParser) tupleParser).stop();
     }
+    
+    @Override
+    public DataExchangeMode getDataExchangeMode() {
+        return DataExchangeMode.PULL;
+    }
 
 }
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TweetGenerator.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TweetGenerator.java
index 28ff14d..9c390c1 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TweetGenerator.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TweetGenerator.java
@@ -1,9 +1,25 @@
+/*
+x * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package edu.uci.ics.asterix.tools.external.data;
 
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
 import edu.uci.ics.asterix.tools.external.data.DataGenerator.InitializationInfo;
 import edu.uci.ics.asterix.tools.external.data.DataGenerator.TweetMessage;
@@ -11,19 +27,17 @@
 
 public class TweetGenerator {
 
-    public static final String NUM_KEY_SPACES = "num-key-spaces";
+    private static Logger LOGGER = Logger.getLogger(TweetGenerator.class.getName());
+
     public static final String KEY_DURATION = "duration";
     public static final String KEY_TPS = "tps";
-    public static final String KEY_TPUT_DURATION = "tput-duration";
     public static final String KEY_GUID_SEED = "guid-seed";
-    public static final String KEY_FRAME_WRITER_MODE = "frame-writer-mode";
-    public static final String KEY_DATA_MODE = "data-mode";
 
     public static final String OUTPUT_FORMAT = "output-format";
     public static final String OUTPUT_FORMAT_ARECORD = "arecord";
     public static final String OUTPUT_FORMAT_ADM_STRING = "adm-string";
 
-    private static final int DEFAULT_DURATION = 60;
+    private static final int DEFAULT_DURATION = 60; //seconds
     private static final int DEFAULT_GUID_SEED = 0;
 
     private int duration;
@@ -35,69 +49,35 @@
     private OutputStream os;
     private DataGenerator dataGenerator = null;
     private ByteBuffer outputBuffer = ByteBuffer.allocate(32 * 1024);
-    private GULongIDGenerator[] uidGenerators;
-    private int numUidGenerators;
-    private FrameWriterMode frameWriterMode;
-    private DataMode dataMode;
+    private GULongIDGenerator uidGenerator;
 
     public int getTweetCount() {
         return tweetCount;
     }
 
-    public enum DataMode {
-        REUSE_DATA,
-        NEW_DATA
-    }
-
-    public enum FrameWriterMode {
-        DUMMY_NO_PARSING,
-        PARSING
-    }
-
     public TweetGenerator(Map<String, String> configuration, int partition, String format, OutputStream os)
             throws Exception {
         this.partition = partition;
         String value = configuration.get(KEY_DURATION);
-        duration = value != null ? Integer.parseInt(value) : DEFAULT_DURATION;
-
-        value = configuration.get(KEY_DATA_MODE);
-        dataMode = value != null ? DataMode.valueOf(value) : DataMode.NEW_DATA;
-        numUidGenerators = configuration.get(NUM_KEY_SPACES) != null ? Integer.parseInt(configuration
-                .get(NUM_KEY_SPACES)) : 1;
-        uidGenerators = new GULongIDGenerator[numUidGenerators];
-
+        this.duration = value != null ? Integer.parseInt(value) : DEFAULT_DURATION;
         int guidSeed = configuration.get(KEY_GUID_SEED) != null ? Integer.parseInt(configuration.get(KEY_GUID_SEED))
                 : DEFAULT_GUID_SEED;
-
-        for (int i = 0; i < uidGenerators.length; i++) {
-            uidGenerators[i] = new GULongIDGenerator(partition, (byte) (i + guidSeed));
-        }
-
-        InitializationInfo info = new InitializationInfo();
-        dataGenerator = new DataGenerator(info);
-        value = configuration.get(KEY_FRAME_WRITER_MODE);
-        frameWriterMode = value != null ? FrameWriterMode.valueOf(value.toUpperCase()) : FrameWriterMode.PARSING;
-        dataMode = configuration.get(KEY_DATA_MODE) != null ? DataMode.valueOf(configuration.get(KEY_DATA_MODE))
-                : DataMode.NEW_DATA;
-        tweetIterator = dataGenerator.new TweetMessageIterator(duration, uidGenerators, dataMode);
+        uidGenerator = new GULongIDGenerator(partition, (byte) (guidSeed));
+        dataGenerator = new DataGenerator(new InitializationInfo());
+        tweetIterator = dataGenerator.new TweetMessageIterator(duration, uidGenerator);
         this.os = os;
     }
 
     private void writeTweetString(TweetMessage tweetMessage) throws IOException {
         String tweet = tweetMessage.toString() + "\n";
         tweetCount++;
-        if (frameWriterMode.equals(FrameWriterMode.PARSING)) {
-            byte[] b = tweet.getBytes();
-            if (outputBuffer.position() + b.length > outputBuffer.limit()) {
-                flush();
-                numFlushedTweets += frameTweetCount;
-                frameTweetCount = 0;
-                outputBuffer.put(b);
-                frameTweetCount++;
-            } else {
-                outputBuffer.put(b);
-                frameTweetCount++;
-            }
+        byte[] b = tweet.getBytes();
+        if (outputBuffer.position() + b.length > outputBuffer.limit()) {
+            flush();
+            numFlushedTweets += frameTweetCount;
+            frameTweetCount = 0;
+            outputBuffer.put(b);
+            frameTweetCount++;
         }
     }
 
@@ -110,13 +90,14 @@
         os.write(outputBuffer.array(), 0, outputBuffer.limit());
         outputBuffer.position(0);
         outputBuffer.limit(32 * 1024);
-        tweetIterator.toggleUidKeySpace();
     }
 
     public boolean setNextRecordBatch(int numTweetsInBatch) throws Exception {
         boolean moreData = tweetIterator.hasNext();
         if (!moreData) {
-            System.out.println("TWEET COUNT: [" + partition + "]" + tweetCount);
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("Reached end of batch. Tweet Count: [" + partition + "]" + tweetCount);
+            }
             return false;
         } else {
             int count = 0;
@@ -127,5 +108,4 @@
             return true;
         }
     }
-
 }
\ No newline at end of file
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapter.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapter.java
index 7bbfae1..9e3c4dd 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapter.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapter.java
@@ -1,3 +1,17 @@
+/*
+x * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package edu.uci.ics.asterix.tools.external.data;
 
 import java.io.IOException;
@@ -54,7 +68,7 @@
         return inputStream;
     }
 
-    private static class TwitterServer {
+    public static class TwitterServer {
         private final DataProvider dataProvider;
         private final ExecutorService executorService;
 
@@ -74,7 +88,7 @@
 
     }
 
-    private static class DataProvider implements Runnable {
+    public static class DataProvider implements Runnable {
 
         public static final String KEY_MODE = "mode";
 
@@ -116,29 +130,30 @@
             long startBatch;
             long endBatch;
 
-            while (true) {
-                try {
-                    while (moreData && continuePush) {
-                        switch (mode) {
-                            case AGGRESSIVE:
-                                moreData = tweetGenerator.setNextRecordBatch(batchSize);
-                                break;
-                            case CONTROLLED:
-                                startBatch = System.currentTimeMillis();
-                                moreData = tweetGenerator.setNextRecordBatch(batchSize);
-                                endBatch = System.currentTimeMillis();
-                                if (endBatch - startBatch < 1000) {
-                                    Thread.sleep(1000 - (endBatch - startBatch));
+            try {
+                while (moreData && continuePush) {
+                    switch (mode) {
+                        case AGGRESSIVE:
+                            moreData = tweetGenerator.setNextRecordBatch(batchSize);
+                            break;
+                        case CONTROLLED:
+                            startBatch = System.currentTimeMillis();
+                            moreData = tweetGenerator.setNextRecordBatch(batchSize);
+                            endBatch = System.currentTimeMillis();
+                            if (endBatch - startBatch < 1000) {
+                                Thread.sleep(1000 - (endBatch - startBatch));
+                            } else {
+                                if (LOGGER.isLoggable(Level.WARNING)) {
+                                    LOGGER.warning("Unable to reach the required tps of " + batchSize);
                                 }
-                                break;
-                        }
+                            }
+                            break;
                     }
-                    os.close();
-                    break;
-                } catch (Exception e) {
-                    if (LOGGER.isLoggable(Level.WARNING)) {
-                        LOGGER.warning("Exception in adaptor " + e.getMessage());
-                    }
+                }
+                os.close();
+            } catch (Exception e) {
+                if (LOGGER.isLoggable(Level.WARNING)) {
+                    LOGGER.warning("Exception in adaptor " + e.getMessage());
                 }
             }
         }
@@ -154,5 +169,9 @@
         twitterServer.stop();
     }
 
-   
+    @Override
+    public DataExchangeMode getDataExchangeMode() {
+        return DataExchangeMode.PUSH;
+    }
+
 }
\ No newline at end of file