modified PullBasedAdaptor to account for the case when data is not available at the external data source for sufficiently longer duration of time
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java
index f0e51b2..09cf5e5 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java
@@ -32,6 +32,8 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import org.apache.commons.lang3.StringUtils;
+
 import edu.uci.ics.asterix.api.common.APIFramework.DisplayFormat;
 import edu.uci.ics.asterix.api.common.SessionConfig;
 import edu.uci.ics.asterix.aql.base.Statement;
@@ -41,6 +43,7 @@
 import edu.uci.ics.asterix.aql.expression.Identifier;
 import edu.uci.ics.asterix.aql.translator.AqlTranslator;
 import edu.uci.ics.asterix.common.exceptions.ACIDException;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
 import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
 import edu.uci.ics.asterix.common.feeds.SuperFeedManager;
 import edu.uci.ics.asterix.event.schema.cluster.Cluster;
@@ -353,80 +356,52 @@
                 Map<String, String> feedActivityDetails = new HashMap<String, String>();
                 StringBuilder ingestLocs = new StringBuilder();
                 for (OperatorDescriptorId ingestOpId : ingestOperatorIds) {
-                    feedInfo.ingestLocations.addAll(info.getOperatorLocations().get(ingestOpId));
+                    Map<Integer, String> operatorLocations = info.getOperatorLocations().get(ingestOpId);
+                    int nOperatorInstances = operatorLocations.size();
+                    for (int i = 0; i < nOperatorInstances; i++) {
+                        feedInfo.ingestLocations.add(operatorLocations.get(i));
+                    }
                 }
                 StringBuilder computeLocs = new StringBuilder();
                 for (OperatorDescriptorId computeOpId : computeOperatorIds) {
-                    List<String> locations = info.getOperatorLocations().get(computeOpId);
-                    if (locations != null) {
-                        feedInfo.computeLocations.addAll(locations);
+                    Map<Integer, String> operatorLocations = info.getOperatorLocations().get(computeOpId);
+                    if (operatorLocations != null) {
+                        int nOperatorInstances = operatorLocations.size();
+                        for (int i = 0; i < nOperatorInstances; i++) {
+                            feedInfo.computeLocations.add(operatorLocations.get(i));
+                        }
                     } else {
                         feedInfo.computeLocations.addAll(feedInfo.ingestLocations);
                     }
                 }
+
                 StringBuilder storageLocs = new StringBuilder();
                 for (OperatorDescriptorId storageOpId : storageOperatorIds) {
-                    feedInfo.storageLocations.addAll(info.getOperatorLocations().get(storageOpId));
+                    Map<Integer, String> operatorLocations = info.getOperatorLocations().get(storageOpId);
+                    int nOperatorInstances = operatorLocations.size();
+                    for (int i = 0; i < nOperatorInstances; i++) {
+                        feedInfo.storageLocations.add(operatorLocations.get(i));
+                    }
                 }
 
-                for (String ingestLoc : feedInfo.ingestLocations) {
-                    ingestLocs.append(ingestLoc);
-                    ingestLocs.append(",");
-                }
-                if (ingestLocs.length() > 1) {
-                    ingestLocs.deleteCharAt(ingestLocs.length() - 1);
-                }
-                for (String computeLoc : feedInfo.computeLocations) {
-                    computeLocs.append(computeLoc);
-                    computeLocs.append(",");
-                }
-                if (computeLocs.length() > 1) {
-                    computeLocs.deleteCharAt(computeLocs.length() - 1);
-                }
-                for (String storageLoc : feedInfo.storageLocations) {
-                    storageLocs.append(storageLoc);
-                    storageLocs.append(",");
-                }
-                if (storageLocs.length() > 1) {
-                    storageLocs.deleteCharAt(storageLocs.length() - 1);
-                }
+                ingestLocs.append(StringUtils.join(feedInfo.ingestLocations, ","));
+                computeLocs.append(StringUtils.join(feedInfo.computeLocations, ","));
+                storageLocs.append(StringUtils.join(feedInfo.storageLocations, ","));
 
                 feedActivityDetails.put(FeedActivity.FeedActivityDetails.INGEST_LOCATIONS, ingestLocs.toString());
                 feedActivityDetails.put(FeedActivity.FeedActivityDetails.COMPUTE_LOCATIONS, computeLocs.toString());
                 feedActivityDetails.put(FeedActivity.FeedActivityDetails.STORAGE_LOCATIONS, storageLocs.toString());
-                feedActivityDetails.put(FeedActivity.FeedActivityDetails.FEED_POLICY_NAME,
-                        feedInfo.feedPolicy.get(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY));
+                String policyName = feedInfo.feedPolicy.get(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY);
+                feedActivityDetails.put(FeedActivity.FeedActivityDetails.FEED_POLICY_NAME, policyName);
 
-                int superFeedManagerIndex = new Random().nextInt(feedInfo.ingestLocations.size());
-                String superFeedManagerHost = feedInfo.ingestLocations.get(superFeedManagerIndex);
-
-                Cluster cluster = AsterixClusterProperties.INSTANCE.getCluster();
-                String instanceName = cluster.getInstanceName();
-                String node = superFeedManagerHost.substring(instanceName.length() + 1);
-                String hostIp = null;
-                for (Node n : cluster.getNode()) {
-                    if (n.getId().equals(node)) {
-                        hostIp = n.getClusterIp();
-                        break;
+                FeedPolicyAccessor policyAccessor = new FeedPolicyAccessor(feedInfo.feedPolicy);
+                if (policyAccessor.collectStatistics() || policyAccessor.isElastic()) {
+                    if (LOGGER.isLoggable(Level.INFO)) {
+                        LOGGER.info("Feed " + feedInfo.feedConnectionId + " requires Super Feed Manager");
                     }
-                }
-                if (hostIp == null) {
-                    throw new IllegalStateException("Unknown node " + superFeedManagerHost);
+                    configureSuperFeedManager(feedInfo, feedActivityDetails);
                 }
 
-                feedActivityDetails.put(FeedActivity.FeedActivityDetails.SUPER_FEED_MANAGER_HOST, hostIp);
-                feedActivityDetails.put(FeedActivity.FeedActivityDetails.SUPER_FEED_MANAGER_PORT, ""
-                        + superFeedManagerPort);
-
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("Super Feed Manager for " + feedInfo.feedConnectionId + " is " + hostIp + " node "
-                            + superFeedManagerHost);
-                }
-
-                FeedManagerElectMessage feedMessage = new FeedManagerElectMessage(hostIp, superFeedManagerHost,
-                        superFeedManagerPort, feedInfo.feedConnectionId);
-                superFeedManagerPort += SuperFeedManager.PORT_RANGE_ASSIGNED;
-                messengerOutbox.add(new FeedMessengerMessage(feedMessage, feedInfo));
                 MetadataManager.INSTANCE.acquireWriteLatch();
                 MetadataTransactionContext mdTxnCtx = null;
                 try {
@@ -450,6 +425,41 @@
 
         }
 
+        private void configureSuperFeedManager(FeedInfo feedInfo, Map<String, String> feedActivityDetails) {
+            // TODO Auto-generated method stub
+            int superFeedManagerIndex = new Random().nextInt(feedInfo.ingestLocations.size());
+            String superFeedManagerHost = feedInfo.ingestLocations.get(superFeedManagerIndex);
+
+            Cluster cluster = AsterixClusterProperties.INSTANCE.getCluster();
+            String instanceName = cluster.getInstanceName();
+            String node = superFeedManagerHost.substring(instanceName.length() + 1);
+            String hostIp = null;
+            for (Node n : cluster.getNode()) {
+                if (n.getId().equals(node)) {
+                    hostIp = n.getClusterIp();
+                    break;
+                }
+            }
+            if (hostIp == null) {
+                throw new IllegalStateException("Unknown node " + superFeedManagerHost);
+            }
+
+            feedActivityDetails.put(FeedActivity.FeedActivityDetails.SUPER_FEED_MANAGER_HOST, hostIp);
+            feedActivityDetails
+                    .put(FeedActivity.FeedActivityDetails.SUPER_FEED_MANAGER_PORT, "" + superFeedManagerPort);
+
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("Super Feed Manager for " + feedInfo.feedConnectionId + " is " + hostIp + " node "
+                        + superFeedManagerHost);
+            }
+
+            FeedManagerElectMessage feedMessage = new FeedManagerElectMessage(hostIp, superFeedManagerHost,
+                    superFeedManagerPort, feedInfo.feedConnectionId);
+            superFeedManagerPort += SuperFeedManager.PORT_RANGE_ASSIGNED;
+            messengerOutbox.add(new FeedMessengerMessage(feedMessage, feedInfo));
+
+        }
+
         private void handleJobFinishMessage(FeedInfo feedInfo, Message message) {
             MetadataManager.INSTANCE.acquireWriteLatch();
             MetadataTransactionContext mdTxnCtx = null;
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/IPullBasedFeedClient.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/IPullBasedFeedClient.java
index ee28c3a..2e79679 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/IPullBasedFeedClient.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/IPullBasedFeedClient.java
@@ -15,7 +15,6 @@
 package edu.uci.ics.asterix.external.dataset.adapter;
 
 import java.io.DataOutput;
-import java.util.Map;
 
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
 
@@ -28,15 +27,17 @@
     }
 
     /**
-     * Writes the next fetched tuple into the provided instance of DatatOutput.
+     * Writes the next fetched tuple into the provided instance of DatatOutput. Invocation of this method blocks until
+     * a new tuple has been written or the specified time has expired.
      * 
      * @param dataOutput
      *            The receiving channel for the feed client to write ADM records to.
-     * @return true if a record was written to the DataOutput instance
-     *         false if no record was written to the DataOutput instance indicating non-availability of new data.
+     * @param timeout
+     *            Threshold time (expressed in seconds) for the next tuple to be obtained from the externa source.
+     * @return
      * @throws AsterixException
      */
-    public InflowState nextTuple(DataOutput dataOutput) throws AsterixException;
+    public InflowState nextTuple(DataOutput dataOutput, int timeout) throws AsterixException;
 
     /**
      * Provides logic for any corrective action that feed client needs to execute on
@@ -48,11 +49,4 @@
      */
     public void resetOnFailure(Exception e) throws AsterixException;
 
-    /**
-     * @param configuration
-     */
-    public boolean alter(Map<String, String> configuration);
-
-    public void stop();
-
 }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAdapter.java
index e2a4b76..3f3ca50 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAdapter.java
@@ -41,6 +41,7 @@
 
     private static final long serialVersionUID = 1L;
     private static final Logger LOGGER = Logger.getLogger(PullBasedAdapter.class.getName());
+    private static final int timeout = 5; // seconds
 
     protected ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(1);
     protected IPullBasedFeedClient pullBasedFeedClient;
@@ -52,6 +53,7 @@
     private ByteBuffer frame;
     private long tupleCount = 0;
     private final IHyracksTaskContext ctx;
+    private int frameTupleCount = 0;
 
     public abstract IPullBasedFeedClient getFeedClient(int partition) throws Exception;
 
@@ -72,24 +74,36 @@
 
         pullBasedFeedClient = getFeedClient(partition);
         InflowState inflowState = null;
+
         while (continueIngestion) {
             tupleBuilder.reset();
             try {
-                inflowState = pullBasedFeedClient.nextTuple(tupleBuilder.getDataOutput());
+                // blocking call
+                inflowState = pullBasedFeedClient.nextTuple(tupleBuilder.getDataOutput(), timeout);
                 switch (inflowState) {
                     case DATA_AVAILABLE:
                         tupleBuilder.addFieldEndOffset();
                         appendTupleToFrame(writer);
-                        tupleCount++;
+                        frameTupleCount++;
                         break;
                     case NO_MORE_DATA:
                         if (LOGGER.isLoggable(Level.INFO)) {
                             LOGGER.info("Reached end of feed");
                         }
                         FrameUtils.flushFrame(frame, writer);
+                        tupleCount += frameTupleCount;
+                        frameTupleCount = 0;
                         continueIngestion = false;
                         break;
                     case DATA_NOT_AVAILABLE:
+                        if (frameTupleCount > 0) {
+                            FrameUtils.flushFrame(frame, writer);
+                            tupleCount += frameTupleCount;
+                            frameTupleCount = 0;
+                        }
+                        if (LOGGER.isLoggable(Level.WARNING)) {
+                            LOGGER.warning("Timed out on obtaining data from pull based adaptor. Trying again!");
+                        }
                         break;
                 }
 
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedFeedClient.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedFeedClient.java
index 8efe919..e728787 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedFeedClient.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedFeedClient.java
@@ -16,6 +16,8 @@
 
 import java.io.DataOutput;
 import java.io.IOException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
 import edu.uci.ics.asterix.builders.IARecordBuilder;
 import edu.uci.ics.asterix.builders.RecordBuilder;
@@ -45,6 +47,8 @@
 
 public abstract class PullBasedFeedClient implements IPullBasedFeedClient {
 
+    protected static final Logger LOGGER = Logger.getLogger(PullBasedFeedClient.class.getName());
+
     protected ARecordSerializerDeserializer recordSerDe;
     protected AMutableRecord mutableRecord;
     protected boolean messageReceived;
@@ -69,28 +73,36 @@
     public abstract InflowState setNextRecord() throws Exception;
 
     @Override
-    public InflowState nextTuple(DataOutput dataOutput) throws AsterixException {
+    public InflowState nextTuple(DataOutput dataOutput, int timeout) throws AsterixException {
         try {
-            System.out.println("Setting next record");
-            InflowState state = setNextRecord();
-            boolean first = true;
-            switch (state) {
-                case DATA_AVAILABLE:
-                    IAType t = mutableRecord.getType();
-                    ATypeTag tag = t.getTypeTag();
-                    dataOutput.writeByte(tag.serialize());
-                    if (first) {
+            InflowState state = null;
+            int waitCount = 0;
+            boolean continueWait = true;
+            while ((state == null || state.equals(InflowState.DATA_NOT_AVAILABLE)) && continueWait) {
+                state = setNextRecord();
+                switch (state) {
+                    case DATA_AVAILABLE:
+                        IAType t = mutableRecord.getType();
+                        ATypeTag tag = t.getTypeTag();
+                        dataOutput.writeByte(tag.serialize());
                         recordBuilder.reset(mutableRecord.getType());
-                        first = false;
-                    }
-                    recordBuilder.init();
-                    writeRecord(mutableRecord, dataOutput, recordBuilder);
-                    break;
-
-                case DATA_NOT_AVAILABLE:
-                    break;
-                case NO_MORE_DATA:
-                    break;
+                        recordBuilder.init();
+                        writeRecord(mutableRecord, dataOutput, recordBuilder);
+                        break;
+                    case DATA_NOT_AVAILABLE:
+                        if (waitCount > timeout) {
+                            continueWait = false;
+                        } else {
+                            if (LOGGER.isLoggable(Level.WARNING)) {
+                                LOGGER.warning("Waiting to obtaing data from pull based adaptor");
+                            }
+                            Thread.sleep(1000);
+                            waitCount++;
+                        }
+                        break;
+                    case NO_MORE_DATA:
+                        break;
+                }
             }
             return state;
         } catch (Exception e) {
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java
index 019d1b7..5aafef4 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java
@@ -33,7 +33,6 @@
     public static final String INTERVAL = "interval";
 
     private ARecordType recordType;
-    private final IHyracksTaskContext ctx;
     private PullBasedTwitterFeedClient tweetClient;
 
     @Override
@@ -43,15 +42,9 @@
 
     public PullBasedTwitterAdapter(Map<String, String> configuration, IHyracksTaskContext ctx) throws AsterixException {
         super(configuration, ctx);
-        this.ctx = ctx;
         tweetClient = new PullBasedTwitterFeedClient(ctx, this);
     }
 
-    @Override
-    public void stop() {
-        tweetClient.stop();
-    }
-
     public ARecordType getAdapterOutputType() {
         return recordType;
     }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java
index 7a5aeea..0cd14b8 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java
@@ -42,7 +42,6 @@
 
     private String keywords;
     private Query query;
-    private String id_prefix;
     private Twitter twitter;
     private int requestInterval = 10; // seconds
     private QueryResult result;
@@ -55,7 +54,6 @@
     private static final Logger LOGGER = Logger.getLogger(PullBasedTwitterFeedClient.class.getName());
 
     public PullBasedTwitterFeedClient(IHyracksTaskContext ctx, PullBasedTwitterAdapter adapter) {
-        this.id_prefix = ctx.getJobletContext().getApplicationContext().getNodeId();
         twitter = new TwitterFactory().getInstance();
         mutableFields = new IAObject[] { new AMutableString(null), new AMutableString(null), new AMutableString(null),
                 new AMutableString(null), new AMutableString(null) };
@@ -99,17 +97,6 @@
         // TOOO: implement resetting logic for Twitter
     }
 
-    @Override
-    public boolean alter(Map<String, String> configuration) {
-        // TODO Auto-generated method stub
-        return false;
-    }
-
-    @Override
-    public void stop() {
-        // TODO Auto-generated method stub
-    }
-
     private void initialize(Map<String, String> params) {
         this.keywords = (String) params.get(PullBasedTwitterAdapter.QUERY);
         this.requestInterval = Integer.parseInt((String) params.get(PullBasedTwitterAdapter.INTERVAL));
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedClient.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedClient.java
index 8a4b301..0522d42 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedClient.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedClient.java
@@ -114,7 +114,6 @@
     @SuppressWarnings("unchecked")
     private void fetchFeed() {
         try {
-            System.err.println("Retrieving feed " + feedURL);
             // Retrieve the feed.
             // We will get a Feed Polled Event and then a
             // Feed Retrieved event (assuming the feed is valid)
@@ -139,18 +138,6 @@
 
     }
 
-    @Override
-    public boolean alter(Map<String, String> configuration) {
-        // TODO Auto-generated method stub
-        return false;
-    }
-
-    @Override
-    public void stop() {
-        // TODO Auto-generated method stub
-
-    }
-
 }
 
 class FetcherEventListenerImpl implements FetcherListener {
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/StreamBasedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/StreamBasedAdapter.java
index f89a7ff..1871111 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/StreamBasedAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/StreamBasedAdapter.java
@@ -2,6 +2,8 @@
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
 import edu.uci.ics.asterix.metadata.feeds.AdapterRuntimeManager;
 import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
@@ -16,6 +18,8 @@
 
     private static final long serialVersionUID = 1L;
 
+    protected static final Logger LOGGER = Logger.getLogger(StreamBasedAdapter.class.getName());
+
     public static final String NODE_RESOLVER_FACTORY_PROPERTY = "node.Resolver";
 
     public abstract InputStream getInputStream(int partition) throws IOException;
@@ -34,7 +38,12 @@
     @Override
     public void start(int partition, IFrameWriter writer) throws Exception {
         InputStream in = getInputStream(partition);
-        tupleParser.parse(in, writer);
+        if (in != null) {
+            tupleParser.parse(in, writer);
+        } else {
+            if (LOGGER.isLoggable(Level.WARNING)) {
+                LOGGER.warning("Could not obtain input stream for parsing from adaptor " + this + "[" + partition + "]");
+            }
+        }
     }
-
 }
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AdapterRuntimeManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AdapterRuntimeManager.java
index 1380d9e..b9a5e73 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AdapterRuntimeManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AdapterRuntimeManager.java
@@ -138,6 +138,7 @@
                 adapter.start(partition, writer);
                 runtimeManager.setState(State.FINISHED_INGESTION);
             } catch (Exception e) {
+                e.printStackTrace();
                 if (LOGGER.isLoggable(Level.SEVERE)) {
                     LOGGER.severe("Exception during feed ingestion " + e.getMessage());
                 }
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/BuiltinFeedPolicies.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/BuiltinFeedPolicies.java
index da5fdde..fb55749 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/BuiltinFeedPolicies.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/BuiltinFeedPolicies.java
@@ -35,7 +35,7 @@
     public static final FeedPolicy[] policies = new FeedPolicy[] { BRITTLE, BASIC, BASIC_MONITORED,
             FAULT_TOLERANT_BASIC_MONITORED, ELASTIC };
 
-    public static final FeedPolicy DEFAULT_POLICY = FAULT_TOLERANT_BASIC_MONITORED;
+    public static final FeedPolicy DEFAULT_POLICY = BASIC;
 
     public static final String CONFIG_FEED_POLICY_KEY = "policy";
 
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedUtil.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedUtil.java
index d3693bf..3297c2d 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedUtil.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedUtil.java
@@ -244,7 +244,12 @@
                     ((ITypedAdapterFactory) adapterFactory).configure(configuration);
                     break;
                 case GENERIC:
-                    String outputTypeName = configuration.get("output-type-name");
+                    String outputTypeName = configuration.get(IGenericAdapterFactory.KEY_TYPE_NAME);
+                    if (outputTypeName == null) {
+                        throw new IllegalArgumentException(
+                                "You must specify the datatype associated with the incoming data. Datatype is specified by the "
+                                        + IGenericAdapterFactory.KEY_TYPE_NAME + " configuration parameter");
+                    }
                     adapterOutputType = (ARecordType) MetadataManager.INSTANCE.getDatatype(mdTxnCtx,
                             feed.getDataverseName(), outputTypeName).getDatatype();
                     ((IGenericAdapterFactory) adapterFactory).configure(configuration, (ARecordType) adapterOutputType);
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IGenericAdapterFactory.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IGenericAdapterFactory.java
index 54613d0..16c3c80 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IGenericAdapterFactory.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IGenericAdapterFactory.java
@@ -20,6 +20,8 @@
 
 public interface IGenericAdapterFactory extends IAdapterFactory {
 
+    public static final String KEY_TYPE_NAME = "type-name";
+
     public void configure(Map<String, String> configuration, ARecordType outputType) throws Exception;
 
 }
diff --git a/asterix-tools/pom.xml b/asterix-tools/pom.xml
index 861dd79..43123f0 100644
--- a/asterix-tools/pom.xml
+++ b/asterix-tools/pom.xml
@@ -154,6 +154,12 @@
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
+			<groupId>edu.uci.ics.asterix</groupId>
+			<artifactId>asterix-metadata</artifactId>
+			<version>0.8.1-SNAPSHOT</version>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
 			<groupId>org.apache.httpcomponents</groupId>
 			<artifactId>httpclient</artifactId>
 			<version>4.2.2</version>
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/DataGenerator.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/DataGenerator.java
index 7c18670..b6f693a 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/DataGenerator.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/DataGenerator.java
@@ -1,25 +1,5 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
 package edu.uci.ics.asterix.tools.external.data;
 
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileReader;
-import java.io.FileWriter;
-import java.io.IOException;
 import java.nio.CharBuffer;
 import java.util.ArrayList;
 import java.util.HashSet;
@@ -27,233 +7,81 @@
 import java.util.List;
 import java.util.Random;
 
-import javax.xml.parsers.DocumentBuilder;
-import javax.xml.parsers.DocumentBuilderFactory;
-import javax.xml.parsers.ParserConfigurationException;
-import javax.xml.transform.OutputKeys;
-import javax.xml.transform.Transformer;
-import javax.xml.transform.TransformerException;
-import javax.xml.transform.TransformerFactory;
-import javax.xml.transform.dom.DOMSource;
-import javax.xml.transform.stream.StreamResult;
-
-import org.w3c.dom.Document;
-import org.w3c.dom.Element;
-import org.w3c.dom.Node;
-import org.w3c.dom.NodeList;
+import edu.uci.ics.asterix.tools.external.data.TweetGenerator.DataMode;
 
 public class DataGenerator {
 
     private RandomDateGenerator randDateGen;
+
     private RandomNameGenerator randNameGen;
-    private RandomEmploymentGenerator randEmpGen;
+
     private RandomMessageGenerator randMessageGen;
+
     private RandomLocationGenerator randLocationGen;
 
-    private DistributionHandler fbDistHandler;
-    private DistributionHandler twDistHandler;
-
-    private int totalFbMessages;
-    private int numFbOnlyUsers;
-    private int totalTwMessages;
-    private int numTwOnlyUsers;
-
-    private int numCommonUsers;
-
-    private int fbUserId;
-    private int twUserId;
-
-    private int fbMessageId;
-    private int twMessageId;
-
     private Random random = new Random();
 
-    private String commonUserFbSuffix = "_fb";
-    private String commonUserTwSuffix = "_tw";
-
-    private String outputDir;
-
-    private PartitionConfiguration partition;
-
-    private FacebookUser fbUser = new FacebookUser();
     private TwitterUser twUser = new TwitterUser();
 
-    private FacebookMessage fbMessage = new FacebookMessage();
     private TweetMessage twMessage = new TweetMessage();
 
-    private int duration;
-
-    public DataGenerator(String[] args) throws Exception {
-        String controllerInstallDir = args[0];
-        String partitionConfXML = controllerInstallDir + "/output/partition-conf.xml";
-        String partitionName = args[1];
-        partition = XMLUtil.getPartitionConfiguration(partitionConfXML, partitionName);
-
-        // 1
-        randDateGen = new RandomDateGenerator(new Date(1, 1, 2005), new Date(8, 20, 2012));
-
-        String firstNameFile = controllerInstallDir + "/metadata/firstNames.txt";
-        String lastNameFile = controllerInstallDir + "/metadata/lastNames.txt";
-        String vendorFile = controllerInstallDir + "/metadata/vendors.txt";
-        String jargonFile = controllerInstallDir + "/metadata/jargon.txt";
-        String orgList = controllerInstallDir + "/metadata/org_list.txt";
-
-        randNameGen = new RandomNameGenerator(firstNameFile, lastNameFile);
-        randEmpGen = new RandomEmploymentGenerator(90, new Date(1, 1, 2000), new Date(8, 20, 2012), orgList);
-        randLocationGen = new RandomLocationGenerator(24, 49, 66, 98);
-        randMessageGen = new RandomMessageGenerator(vendorFile, jargonFile);
-
-        totalFbMessages = partition.getTargetPartition().getFbMessageIdMax()
-                - partition.getTargetPartition().getFbMessageIdMin() + 1;
-        numFbOnlyUsers = (partition.getTargetPartition().getFbUserKeyMax()
-                - partition.getTargetPartition().getFbUserKeyMin() + 1)
-                - partition.getTargetPartition().getCommonUsers();
-
-        totalTwMessages = partition.getTargetPartition().getTwMessageIdMax()
-                - partition.getTargetPartition().getTwMessageIdMin() + 1;
-        numTwOnlyUsers = (partition.getTargetPartition().getTwUserKeyMax()
-                - partition.getTargetPartition().getTwUserKeyMin() + 1)
-                - partition.getTargetPartition().getCommonUsers();
-
-        numCommonUsers = partition.getTargetPartition().getCommonUsers();
-        fbDistHandler = new DistributionHandler(totalFbMessages, 0.5, numFbOnlyUsers + numCommonUsers);
-        twDistHandler = new DistributionHandler(totalTwMessages, 0.5, numTwOnlyUsers + numCommonUsers);
-
-        fbUserId = partition.getTargetPartition().getFbUserKeyMin();
-        twUserId = partition.getTargetPartition().getTwUserKeyMin();
-
-        fbMessageId = partition.getTargetPartition().getFbMessageIdMin();
-        twMessageId = partition.getTargetPartition().getTwMessageIdMin();
-
-        outputDir = partition.getSourcePartition().getPath();
-    }
-
     public DataGenerator(InitializationInfo info) {
         initialize(info);
     }
 
-    private void generateFacebookOnlyUsers(int numFacebookUsers) throws IOException {
-        FileAppender appender = FileUtil.getFileAppender(outputDir + "/" + "fb_users.adm", true, true);
-        FileAppender messageAppender = FileUtil.getFileAppender(outputDir + "/" + "fb_message.adm", true, true);
-
-        for (int i = 0; i < numFacebookUsers; i++) {
-            getFacebookUser(null);
-            appender.appendToFile(fbUser.toString());
-            generateFacebookMessages(fbUser, messageAppender, -1);
-        }
-        appender.close();
-        messageAppender.close();
-    }
-
-    private void generateTwitterOnlyUsers(int numTwitterUsers) throws IOException {
-        FileAppender appender = FileUtil.getFileAppender(outputDir + "/" + "tw_users.adm", true, true);
-        FileAppender messageAppender = FileUtil.getFileAppender(outputDir + "/" + "tw_message.adm", true, true);
-
-        for (int i = 0; i < numTwitterUsers; i++) {
-            getTwitterUser(null);
-            appender.appendToFile(twUser.toString());
-            generateTwitterMessages(twUser, messageAppender, -1);
-        }
-        appender.close();
-        messageAppender.close();
-    }
-
-    private void generateCommonUsers() throws IOException {
-        FileAppender fbAppender = FileUtil.getFileAppender(outputDir + "/" + "fb_users.adm", true, false);
-        FileAppender twAppender = FileUtil.getFileAppender(outputDir + "/" + "tw_users.adm", true, false);
-        FileAppender fbMessageAppender = FileUtil.getFileAppender(outputDir + "/" + "fb_message.adm", true, false);
-        FileAppender twMessageAppender = FileUtil.getFileAppender(outputDir + "/" + "tw_message.adm", true, false);
-
-        for (int i = 0; i < numCommonUsers; i++) {
-            getFacebookUser(commonUserFbSuffix);
-            fbAppender.appendToFile(fbUser.toString());
-            generateFacebookMessages(fbUser, fbMessageAppender, -1);
-
-            getCorrespondingTwitterUser(fbUser);
-            twAppender.appendToFile(twUser.toString());
-            generateTwitterMessages(twUser, twMessageAppender, -1);
-        }
-
-        fbAppender.close();
-        twAppender.close();
-        fbMessageAppender.close();
-        twMessageAppender.close();
-    }
-
-    private void generateFacebookMessages(FacebookUser user, FileAppender appender, int numMsg) throws IOException {
-        Message message;
-        int numMessages = 0;
-        if (numMsg == -1) {
-            numMessages = fbDistHandler
-                    .getFromDistribution(fbUserId - partition.getTargetPartition().getFbUserKeyMin());
-        }
-        for (int i = 0; i < numMessages; i++) {
-            message = randMessageGen.getNextRandomMessage();
-            Point location = randLocationGen.getRandomPoint();
-            fbMessage.reset(fbMessageId++, user.getId(), random.nextInt(totalFbMessages + 1), location, message);
-            appender.appendToFile(fbMessage.toString());
-        }
-    }
-
-    private void generateTwitterMessages(TwitterUser user, FileAppender appender, int numMsg) throws IOException {
-        Message message;
-        int numMessages = 0;
-        if (numMsg == -1) {
-            numMessages = twDistHandler
-                    .getFromDistribution(twUserId - partition.getTargetPartition().getTwUserKeyMin());
-        }
-
-        for (int i = 0; i < numMessages; i++) {
-            message = randMessageGen.getNextRandomMessage();
-            Point location = randLocationGen.getRandomPoint();
-            DateTime sendTime = randDateGen.getNextRandomDatetime();
-            twMessage.reset(twMessageId, user, location, sendTime, message.getReferredTopics(), message);
-            twMessageId++;
-            appender.appendToFile(twMessage.toString());
-        }
-    }
-
-    public Iterator<TweetMessage> getTwitterMessageIterator(int partition, byte seed) {
-        return new TweetMessageIterator(duration, partition, seed);
-    }
-
     public class TweetMessageIterator implements Iterator<TweetMessage> {
 
         private final int duration;
+        private final GULongIDGenerator[] idGens;
         private long startTime = 0;
-        private final GULongIDGenerator idGen;
+        private int uidGenInUse = 0;
+        private TweetMessage dummyMessage;
+        private DataMode dataMode;
 
-        public TweetMessageIterator(int duration, int partition, byte seed) {
+        public TweetMessageIterator(int duration, GULongIDGenerator[] idGens, DataMode dataMode) {
             this.duration = duration;
-            this.idGen = new GULongIDGenerator(partition, seed);
+            this.idGens = idGens;
+            this.startTime = System.currentTimeMillis();
+            if (dataMode.equals(DataMode.REUSE_DATA)) {
+                dummyMessage = next();
+            }
+            this.dataMode = dataMode;
         }
 
         @Override
         public boolean hasNext() {
-            if (startTime == 0) {
-                startTime = System.currentTimeMillis();
-            }
-            return System.currentTimeMillis() - startTime < duration * 1000;
+            return System.currentTimeMillis() - startTime <= duration * 1000;
         }
 
         @Override
         public TweetMessage next() {
-            getTwitterUser(null);
-            Message message = randMessageGen.getNextRandomMessage();
-            Point location = randLocationGen.getRandomPoint();
-            DateTime sendTime = randDateGen.getNextRandomDatetime();
-            twMessage.reset(idGen.getNextULong(), twUser, location, sendTime, message.getReferredTopics(), message);
-            twMessageId++;
-            if (twUserId > numTwOnlyUsers) {
-                twUserId = 1;
+            TweetMessage msg = null;
+            switch (dataMode) {
+                case NEW_DATA:
+                    getTwitterUser(null);
+                    Message message = randMessageGen.getNextRandomMessage();
+                    Point location = randLocationGen.getRandomPoint();
+                    DateTime sendTime = randDateGen.getNextRandomDatetime();
+                    twMessage.reset(idGens[uidGenInUse].getNextULong(), twUser, location, sendTime,
+                            message.getReferredTopics(), message);
+                    msg = twMessage;
+                    break;
+                case REUSE_DATA:
+                    dummyMessage.setTweetid(idGens[uidGenInUse].getNextULong());
+                    msg = dummyMessage;
+                    break;
             }
-            return twMessage;
-
+            return msg;
         }
 
         @Override
         public void remove() {
             // TODO Auto-generated method stub
+
+        }
+
+        public void toggleUidKeySpace() {
+            uidGenInUse = (uidGenInUse + 1) % idGens.length;
         }
 
     }
@@ -266,74 +94,13 @@
         public String[] vendors = DataGenerator.vendors;
         public String[] jargon = DataGenerator.jargon;
         public String[] org_list = DataGenerator.org_list;
-        public int percentEmployed = 90;
-        public Date employmentStartDate = new Date(1, 1, 2000);
-        public Date employmentEndDate = new Date(31, 12, 2012);
-        public int totalFbMessages;
-        public int numFbOnlyUsers;
-        public int totalTwMessages;
-        public int numTwOnlyUsers = 5000;
-        public int numCommonUsers;
-        public int fbUserIdMin;
-        public int fbMessageIdMin;
-        public int twUserIdMin;
-        public int twMessageIdMin;
-        public int timeDurationInSecs = 60;
-
     }
 
     public void initialize(InitializationInfo info) {
         randDateGen = new RandomDateGenerator(info.startDate, info.endDate);
         randNameGen = new RandomNameGenerator(info.firstNames, info.lastNames);
-        randEmpGen = new RandomEmploymentGenerator(info.percentEmployed, info.employmentStartDate,
-                info.employmentEndDate, info.org_list);
         randLocationGen = new RandomLocationGenerator(24, 49, 66, 98);
         randMessageGen = new RandomMessageGenerator(info.vendors, info.jargon);
-        fbDistHandler = new DistributionHandler(info.totalFbMessages, 0.5, info.numFbOnlyUsers + info.numCommonUsers);
-        twDistHandler = new DistributionHandler(info.totalTwMessages, 0.5, info.numTwOnlyUsers + info.numCommonUsers);
-        fbUserId = info.fbUserIdMin;
-        twUserId = info.twUserIdMin;
-
-        fbMessageId = info.fbMessageIdMin;
-        twMessageId = info.twMessageIdMin;
-        duration = info.timeDurationInSecs;
-    }
-
-    public static void main(String args[]) throws Exception {
-        if (args.length < 2) {
-            printUsage();
-            System.exit(1);
-        }
-
-        DataGenerator dataGenerator = new DataGenerator(args);
-        dataGenerator.generateData();
-    }
-
-    public static void printUsage() {
-        System.out.println(" Error: Invalid number of arguments ");
-        System.out.println(" Usage :" + " DataGenerator <path to configuration file> <partition name> ");
-    }
-
-    public void generateData() throws IOException {
-        generateFacebookOnlyUsers(numFbOnlyUsers);
-        generateTwitterOnlyUsers(numTwOnlyUsers);
-        generateCommonUsers();
-        System.out.println("Partition :" + partition.getTargetPartition().getName() + " finished");
-    }
-
-    public void getFacebookUser(String usernameSuffix) {
-        String suggestedName = randNameGen.getRandomName();
-        String[] nameComponents = suggestedName.split(" ");
-        String name = nameComponents[0] + nameComponents[1];
-        if (usernameSuffix != null) {
-            name = name + usernameSuffix;
-        }
-        String alias = nameComponents[0];
-        String userSince = randDateGen.getNextRandomDatetime().toString();
-        int numFriends = random.nextInt(25);
-        int[] friendIds = RandomUtil.getKFromN(numFriends, (numFbOnlyUsers + numCommonUsers));
-        Employment emp = randEmpGen.getRandomEmployment();
-        fbUser.reset(fbUserId++, alias, name, userSince, friendIds, emp);
     }
 
     public void getTwitterUser(String usernameSuffix) {
@@ -348,17 +115,6 @@
         int statusesCount = random.nextInt(500); // draw from Zipfian
         int followersCount = random.nextInt((int) (200));
         twUser.reset(screenName, numFriends, statusesCount, name, followersCount);
-        twUserId++;
-    }
-
-    public void getCorrespondingTwitterUser(FacebookUser fbUser) {
-        String screenName = fbUser.getName().substring(0, fbUser.getName().lastIndexOf(commonUserFbSuffix))
-                + commonUserTwSuffix;
-        String name = screenName.split(" ")[0];
-        int numFriends = random.nextInt((int) ((numTwOnlyUsers + numCommonUsers)));
-        int statusesCount = random.nextInt(500); // draw from Zipfian
-        int followersCount = random.nextInt((int) (numTwOnlyUsers + numCommonUsers));
-        twUser.reset(screenName, numFriends, statusesCount, name, followersCount);
     }
 
     public static class RandomDateGenerator {
@@ -427,15 +183,6 @@
             return recentDate;
         }
 
-        public static void main(String args[]) throws Exception {
-            RandomDateGenerator dgen = new RandomDateGenerator(new Date(1, 1, 2005), new Date(8, 20, 2012));
-            while (true) {
-                Date nextDate = dgen.getNextRandomDate();
-                if (nextDate.getDay() == 0) {
-                    throw new Exception("invalid date " + nextDate);
-                }
-            }
-        }
     }
 
     public static class DateTime extends Date {
@@ -443,15 +190,15 @@
         private String hour = "10";
         private String min = "10";
         private String sec = "00";
-        private long chrononTime;
 
         public DateTime(int month, int day, int year, String hour, String min, String sec) {
             super(month, day, year);
             this.hour = hour;
             this.min = min;
             this.sec = sec;
-            chrononTime = new java.util.Date(year, month, day, Integer.parseInt(hour), Integer.parseInt(min),
-                    Integer.parseInt(sec)).getTime();
+        }
+
+        public DateTime() {
         }
 
         public void reset(int month, int day, int year, String hour, String min, String sec) {
@@ -461,11 +208,6 @@
             this.hour = hour;
             this.min = min;
             this.sec = sec;
-            chrononTime = new java.util.Date(year, month, day, Integer.parseInt(hour), Integer.parseInt(min),
-                    Integer.parseInt(sec)).getTime();
-        }
-
-        public DateTime() {
         }
 
         public DateTime(Date date) {
@@ -483,22 +225,6 @@
             this.sec = (sec < 10) ? "0" : "" + sec;
         }
 
-        public long getChrononTime() {
-            return chrononTime;
-        }
-
-        public String getHour() {
-            return hour;
-        }
-
-        public String getMin() {
-            return min;
-        }
-
-        public String getSec() {
-            return sec;
-        }
-
         public String toString() {
             StringBuilder builder = new StringBuilder();
             builder.append("datetime");
@@ -532,10 +258,6 @@
             length = 0;
         }
 
-        public char[] getMessage() {
-            return message;
-        }
-
         public List<String> getReferredTopics() {
             return referredTopics;
         }
@@ -598,11 +320,6 @@
 
         private final String[] connectors = new String[] { "_", "#", "$", "@" };
 
-        public RandomNameGenerator(String firstNameFilePath, String lastNameFilePath) throws IOException {
-            firstNames = FileUtil.listyFile(new File(firstNameFilePath)).toArray(new String[] {});
-            lastNames = FileUtil.listyFile(new File(lastNameFilePath)).toArray(new String[] {});
-        }
-
         public RandomNameGenerator(String[] firstNames, String[] lastNames) {
             this.firstNames = firstNames;
             this.lastNames = lastNames;
@@ -631,12 +348,6 @@
 
         private final MessageTemplate messageTemplate;
 
-        public RandomMessageGenerator(String vendorFilePath, String jargonFilePath) throws IOException {
-            List<String> vendors = FileUtil.listyFile(new File(vendorFilePath));
-            List<String> jargon = FileUtil.listyFile(new File(jargonFilePath));
-            this.messageTemplate = new MessageTemplate(vendors, jargon);
-        }
-
         public RandomMessageGenerator(String[] vendors, String[] jargon) {
             List<String> vendorList = new ArrayList<String>();
             for (String v : vendors) {
@@ -737,106 +448,15 @@
         }
     }
 
-    public static class FileUtil {
-
-        public static List<String> listyFile(File file) throws IOException {
-            BufferedReader reader = new BufferedReader(new FileReader(file));
-            String line;
-            List<String> list = new ArrayList<String>();
-            while (true) {
-                line = reader.readLine();
-                if (line == null) {
-                    break;
-                }
-                list.add(line);
-            }
-            reader.close();
-            return list;
-        }
-
-        public static FileAppender getFileAppender(String filePath, boolean createIfNotExists, boolean overwrite)
-                throws IOException {
-            return new FileAppender(filePath, createIfNotExists, overwrite);
-        }
-    }
-
-    public static class FileAppender {
-
-        private final BufferedWriter writer;
-
-        public FileAppender(String filePath, boolean createIfNotExists, boolean overwrite) throws IOException {
-            File file = new File(filePath);
-            if (!file.exists()) {
-                if (createIfNotExists) {
-                    new File(file.getParent()).mkdirs();
-                } else {
-                    throw new IOException("path " + filePath + " does not exists");
-                }
-            }
-            this.writer = new BufferedWriter(new FileWriter(file, !overwrite));
-        }
-
-        public void appendToFile(String content) throws IOException {
-            writer.append(content);
-            writer.append("\n");
-        }
-
-        public void close() throws IOException {
-            writer.close();
-        }
-    }
-
-    public static class RandomEmploymentGenerator {
-
-        private final int percentEmployed;
-        private final Random random = new Random();
-        private final RandomDateGenerator randDateGen;
-        private final List<String> organizations;
-        private Employment emp;
-
-        public RandomEmploymentGenerator(int percentEmployed, Date beginEmpDate, Date endEmpDate, String orgListPath)
-                throws IOException {
-            this.percentEmployed = percentEmployed;
-            this.randDateGen = new RandomDateGenerator(beginEmpDate, endEmpDate);
-            organizations = FileUtil.listyFile(new File(orgListPath));
-            emp = new Employment();
-        }
-
-        public RandomEmploymentGenerator(int percentEmployed, Date beginEmpDate, Date endEmpDate, String[] orgList) {
-            this.percentEmployed = percentEmployed;
-            this.randDateGen = new RandomDateGenerator(beginEmpDate, endEmpDate);
-            organizations = new ArrayList<String>();
-            for (String org : orgList) {
-                organizations.add(org);
-            }
-            emp = new Employment();
-        }
-
-        public Employment getRandomEmployment() {
-            int empployed = random.nextInt(100) + 1;
-            boolean isEmployed = false;
-            if (empployed <= percentEmployed) {
-                isEmployed = true;
-            }
-            Date startDate = randDateGen.getNextRandomDate();
-            Date endDate = null;
-            if (!isEmployed) {
-                endDate = randDateGen.getNextRecentDate(startDate);
-            }
-            String org = organizations.get(random.nextInt(organizations.size()));
-            emp.reset(org, startDate, endDate);
-            return emp;
-        }
-    }
-
     public static class RandomLocationGenerator {
 
+        private Random random = new Random();
+
         private final int beginLat;
         private final int endLat;
         private final int beginLong;
         private final int endLong;
 
-        private Random random = new Random();
         private Point point;
 
         public RandomLocationGenerator(int beginLat, int endLat, int beginLong, int endLong) {
@@ -862,417 +482,6 @@
 
     }
 
-    public static class PartitionConfiguration {
-
-        private final TargetPartition targetPartition;
-        private final SourcePartition sourcePartition;
-
-        public PartitionConfiguration(SourcePartition sourcePartition, TargetPartition targetPartition) {
-            this.sourcePartition = sourcePartition;
-            this.targetPartition = targetPartition;
-        }
-
-        public TargetPartition getTargetPartition() {
-            return targetPartition;
-        }
-
-        public SourcePartition getSourcePartition() {
-            return sourcePartition;
-        }
-
-    }
-
-    public static class SourcePartition {
-
-        private final String name;
-        private final String host;
-        private final String path;
-
-        public SourcePartition(String name, String host, String path) {
-            this.name = name;
-            this.host = host;
-            this.path = path;
-        }
-
-        public String getName() {
-            return name;
-        }
-
-        public String getHost() {
-            return host;
-        }
-
-        public String getPath() {
-            return path;
-        }
-    }
-
-    public static class TargetPartition {
-        private final String name;
-        private final String host;
-        private final String path;
-        private final int fbUserKeyMin;
-        private final int fbUserKeyMax;
-        private final int twUserKeyMin;
-        private final int twUserKeyMax;
-        private final int fbMessageIdMin;
-        private final int fbMessageIdMax;
-        private final int twMessageIdMin;
-        private final int twMessageIdMax;
-        private final int commonUsers;
-
-        public TargetPartition(String partitionName, String host, String path, int fbUserKeyMin, int fbUserKeyMax,
-                int twUserKeyMin, int twUserKeyMax, int fbMessageIdMin, int fbMessageIdMax, int twMessageIdMin,
-                int twMessageIdMax, int commonUsers) {
-            this.name = partitionName;
-            this.host = host;
-            this.path = path;
-            this.fbUserKeyMin = fbUserKeyMin;
-            this.fbUserKeyMax = fbUserKeyMax;
-            this.twUserKeyMin = twUserKeyMin;
-            this.twUserKeyMax = twUserKeyMax;
-            this.twMessageIdMin = twMessageIdMin;
-            this.twMessageIdMax = twMessageIdMax;
-            this.fbMessageIdMin = fbMessageIdMin;
-            this.fbMessageIdMax = fbMessageIdMax;
-            this.commonUsers = commonUsers;
-        }
-
-        public String toString() {
-            StringBuilder builder = new StringBuilder();
-            builder.append(name);
-            builder.append(" ");
-            builder.append(host);
-            builder.append("\n");
-            builder.append(path);
-            builder.append("\n");
-            builder.append("fbUser:key:min");
-            builder.append(fbUserKeyMin);
-
-            builder.append("\n");
-            builder.append("fbUser:key:max");
-            builder.append(fbUserKeyMax);
-
-            builder.append("\n");
-            builder.append("twUser:key:min");
-            builder.append(twUserKeyMin);
-
-            builder.append("\n");
-            builder.append("twUser:key:max");
-            builder.append(twUserKeyMax);
-
-            builder.append("\n");
-            builder.append("fbMessage:key:min");
-            builder.append(fbMessageIdMin);
-
-            builder.append("\n");
-            builder.append("fbMessage:key:max");
-            builder.append(fbMessageIdMax);
-
-            builder.append("\n");
-            builder.append("twMessage:key:min");
-            builder.append(twMessageIdMin);
-
-            builder.append("\n");
-            builder.append("twMessage:key:max");
-            builder.append(twMessageIdMax);
-
-            builder.append("\n");
-            builder.append("twMessage:key:max");
-            builder.append(twMessageIdMax);
-
-            builder.append("\n");
-            builder.append("commonUsers");
-            builder.append(commonUsers);
-
-            return new String(builder);
-        }
-
-        public String getName() {
-            return name;
-        }
-
-        public String getHost() {
-            return host;
-        }
-
-        public int getFbUserKeyMin() {
-            return fbUserKeyMin;
-        }
-
-        public int getFbUserKeyMax() {
-            return fbUserKeyMax;
-        }
-
-        public int getTwUserKeyMin() {
-            return twUserKeyMin;
-        }
-
-        public int getTwUserKeyMax() {
-            return twUserKeyMax;
-        }
-
-        public int getFbMessageIdMin() {
-            return fbMessageIdMin;
-        }
-
-        public int getFbMessageIdMax() {
-            return fbMessageIdMax;
-        }
-
-        public int getTwMessageIdMin() {
-            return twMessageIdMin;
-        }
-
-        public int getTwMessageIdMax() {
-            return twMessageIdMax;
-        }
-
-        public int getCommonUsers() {
-            return commonUsers;
-        }
-
-        public String getPath() {
-            return path;
-        }
-    }
-
-    public static class Employment {
-
-        private String organization;
-        private Date startDate;
-        private Date endDate;
-
-        public Employment(String organization, Date startDate, Date endDate) {
-            this.organization = organization;
-            this.startDate = startDate;
-            this.endDate = endDate;
-        }
-
-        public Employment() {
-        }
-
-        public void reset(String organization, Date startDate, Date endDate) {
-            this.organization = organization;
-            this.startDate = startDate;
-            this.endDate = endDate;
-        }
-
-        public String getOrganization() {
-            return organization;
-        }
-
-        public Date getStartDate() {
-            return startDate;
-        }
-
-        public Date getEndDate() {
-            return endDate;
-        }
-
-        public String toString() {
-            StringBuilder builder = new StringBuilder("");
-            builder.append("{");
-            builder.append("\"organization-name\":");
-            builder.append("\"" + organization + "\"");
-            builder.append(",");
-            builder.append("\"start-date\":");
-            builder.append(startDate);
-            if (endDate != null) {
-                builder.append(",");
-                builder.append("\"end-date\":");
-                builder.append(endDate);
-            }
-            builder.append("}");
-            return new String(builder);
-        }
-
-    }
-
-    public static class FacebookMessage {
-
-        private int messageId;
-        private int authorId;
-        private int inResponseTo;
-        private Point senderLocation;
-        private Message message;
-
-        public int getMessageId() {
-            return messageId;
-        }
-
-        public int getAuthorID() {
-            return authorId;
-        }
-
-        public Point getSenderLocation() {
-            return senderLocation;
-        }
-
-        public Message getMessage() {
-            return message;
-        }
-
-        public int getInResponseTo() {
-            return inResponseTo;
-        }
-
-        public FacebookMessage() {
-
-        }
-
-        public FacebookMessage(int messageId, int authorId, int inResponseTo, Point senderLocation, Message message) {
-            this.messageId = messageId;
-            this.authorId = authorId;
-            this.inResponseTo = inResponseTo;
-            this.senderLocation = senderLocation;
-            this.message = message;
-        }
-
-        public void reset(int messageId, int authorId, int inResponseTo, Point senderLocation, Message message) {
-            this.messageId = messageId;
-            this.authorId = authorId;
-            this.inResponseTo = inResponseTo;
-            this.senderLocation = senderLocation;
-            this.message = message;
-        }
-
-        public String toString() {
-            StringBuilder builder = new StringBuilder();
-            builder.append("{");
-            builder.append("\"message-id\":");
-            builder.append(messageId);
-            builder.append(",");
-            builder.append("\"author-id\":");
-            builder.append(authorId);
-            builder.append(",");
-            builder.append("\"in-response-to\":");
-            builder.append(inResponseTo);
-            builder.append(",");
-            builder.append("\"sender-location\":");
-            builder.append(senderLocation);
-            builder.append(",");
-            builder.append("\"message\":");
-            builder.append("\"");
-            for (int i = 0; i < message.getLength(); i++) {
-                builder.append(message.charAt(i));
-            }
-            builder.append("\"");
-            builder.append("}");
-            return new String(builder);
-        }
-    }
-
-    public static class FacebookUser {
-
-        private int id;
-        private String alias;
-        private String name;
-        private String userSince;
-        private int[] friendIds;
-        private Employment employment;
-
-        public FacebookUser() {
-
-        }
-
-        public FacebookUser(int id, String alias, String name, String userSince, int[] friendIds, Employment employment) {
-            this.id = id;
-            this.alias = alias;
-            this.name = name;
-            this.userSince = userSince;
-            this.friendIds = friendIds;
-            this.employment = employment;
-        }
-
-        public int getId() {
-            return id;
-        }
-
-        public String getAlias() {
-            return alias;
-        }
-
-        public String getName() {
-            return name;
-        }
-
-        public String getUserSince() {
-            return userSince;
-        }
-
-        public int[] getFriendIds() {
-            return friendIds;
-        }
-
-        public Employment getEmployment() {
-            return employment;
-        }
-
-        public String toString() {
-            StringBuilder builder = new StringBuilder();
-            builder.append("{");
-            builder.append("\"id\":" + id);
-            builder.append(",");
-            builder.append("\"alias\":" + "\"" + alias + "\"");
-            builder.append(",");
-            builder.append("\"name\":" + "\"" + name + "\"");
-            builder.append(",");
-            builder.append("\"user-since\":" + userSince);
-            builder.append(",");
-            builder.append("\"friend-ids\":");
-            builder.append("{{");
-            for (int i = 0; i < friendIds.length; i++) {
-                builder.append(friendIds[i]);
-                builder.append(",");
-            }
-            if (friendIds.length > 0) {
-                builder.deleteCharAt(builder.lastIndexOf(","));
-            }
-            builder.append("}}");
-            builder.append(",");
-            builder.append("\"employment\":");
-            builder.append("[");
-            builder.append(employment.toString());
-            builder.append("]");
-            builder.append("}");
-            return builder.toString();
-        }
-
-        public void setId(int id) {
-            this.id = id;
-        }
-
-        public void setAlias(String alias) {
-            this.alias = alias;
-        }
-
-        public void setName(String name) {
-            this.name = name;
-        }
-
-        public void setUserSince(String userSince) {
-            this.userSince = userSince;
-        }
-
-        public void setFriendIds(int[] friendIds) {
-            this.friendIds = friendIds;
-        }
-
-        public void setEmployment(Employment employment) {
-            this.employment = employment;
-        }
-
-        public void reset(int id, String alias, String name, String userSince, int[] friendIds, Employment employment) {
-            this.id = id;
-            this.alias = alias;
-            this.name = name;
-            this.userSince = userSince;
-            this.friendIds = friendIds;
-            this.employment = employment;
-        }
-    }
-
     public static class TweetMessage {
 
         private long tweetid;
@@ -1283,7 +492,6 @@
         private Message messageText;
 
         public TweetMessage() {
-
         }
 
         public TweetMessage(long tweetid, TwitterUser user, Point senderLocation, DateTime sendTime,
@@ -1461,478 +669,6 @@
 
     }
 
-    public static class DistributionHandler {
-
-        private final ZipfGenerator zipfGen;
-        private final int totalUsers;
-        private final int totalMessages;
-        private Random random = new Random();
-
-        public DistributionHandler(int totalMessages, double skew, int totalNumUsers) {
-            zipfGen = new ZipfGenerator(totalMessages, skew);
-            totalUsers = totalNumUsers;
-            this.totalMessages = totalMessages;
-        }
-
-        public int getFromDistribution(int rank) {
-            double prob = zipfGen.getProbability(rank);
-            int numMessages = (int) (prob * totalMessages);
-            return numMessages;
-        }
-
-        public static void main(String args[]) {
-            int totalMessages = 1000 * 4070;
-            double skew = 0.5;
-            int totalUsers = 4070;
-            DistributionHandler d = new DistributionHandler(totalMessages, skew, totalUsers);
-            int sum = 0;
-            for (int i = totalUsers; i >= 1; i--) {
-                float contrib = d.getFromDistribution(i);
-                sum += contrib;
-                System.out.println(i + ":" + contrib);
-            }
-
-            System.out.println("SUM" + ":" + sum);
-
-        }
-    }
-
-    public static class ZipfGenerator {
-
-        private Random rnd = new Random(System.currentTimeMillis());
-        private int size;
-        private double skew;
-        private double bottom = 0;
-
-        public ZipfGenerator(int size, double skew) {
-            this.size = size;
-            this.skew = skew;
-            for (int i = 1; i < size; i++) {
-                this.bottom += (1 / Math.pow(i, this.skew));
-            }
-        }
-
-        // the next() method returns an rank id. The frequency of returned rank
-        // ids are follows Zipf distribution.
-        public int next() {
-            int rank;
-            double friquency = 0;
-            double dice;
-            rank = rnd.nextInt(size);
-            friquency = (1.0d / Math.pow(rank, this.skew)) / this.bottom;
-            dice = rnd.nextDouble();
-            while (!(dice < friquency)) {
-                rank = rnd.nextInt(size);
-                friquency = (1.0d / Math.pow(rank, this.skew)) / this.bottom;
-                dice = rnd.nextDouble();
-            }
-            return rank;
-        }
-
-        // This method returns a probability that the given rank occurs.
-        public double getProbability(int rank) {
-            return (1.0d / Math.pow(rank, this.skew)) / this.bottom;
-        }
-
-        public static void main(String[] args) throws IOException {
-            int total = (int) (3.7 * 1000 * 1000);
-            int skew = 2;
-            int numUsers = 1000 * 1000;
-            /*
-             * if (args.length != 2) { System.out.println("usage:" +
-             * "./zipf size skew"); System.exit(-1); }
-             */
-            BufferedWriter buf = new BufferedWriter(new FileWriter(new File("/tmp/zip_output")));
-            ZipfGenerator zipf = new ZipfGenerator(total, skew);
-            double sum = 0;
-            for (int i = 1; i <= numUsers; i++) {
-                double prob = zipf.getProbability(i);
-                double contribution = (double) (prob * total);
-                String contrib = i + ":" + contribution;
-                buf.write(contrib);
-                buf.write("\n");
-                System.out.println(contrib);
-                sum += contribution;
-            }
-            System.out.println("sum is :" + sum);
-        }
-    }
-
-    public static class PartitionElement implements ILibraryElement {
-        private final String name;
-        private final String host;
-        private final int fbUserKeyMin;
-        private final int fbUserKeyMax;
-        private final int twUserKeyMin;
-        private final int twUserKeyMax;
-        private final int fbMessageIdMin;
-        private final int fbMessageIdMax;
-        private final int twMessageIdMin;
-        private final int twMessageIdMax;
-
-        public PartitionElement(String partitionName, String host, int fbUserKeyMin, int fbUserKeyMax,
-                int twUserKeyMin, int twUserKeyMax, int fbMessageIdMin, int fbMessageIdMax, int twMessageIdMin,
-                int twMessageIdMax) {
-            this.name = partitionName;
-            this.host = host;
-            this.fbUserKeyMin = fbUserKeyMin;
-            this.fbUserKeyMax = fbUserKeyMax;
-            this.twUserKeyMin = twUserKeyMax;
-            this.twUserKeyMax = twUserKeyMax;
-            this.twMessageIdMin = twMessageIdMin;
-            this.twMessageIdMax = twMessageIdMax;
-            this.fbMessageIdMin = fbMessageIdMin;
-            this.fbMessageIdMax = fbMessageIdMax;
-        }
-
-        public String toString() {
-            StringBuilder builder = new StringBuilder();
-            builder.append(name);
-            builder.append(" ");
-            builder.append(host);
-            builder.append("\n");
-            builder.append("fbUser:key:min");
-            builder.append(fbUserKeyMin);
-
-            builder.append("\n");
-            builder.append("fbUser:key:max");
-            builder.append(fbUserKeyMax);
-
-            builder.append("\n");
-            builder.append("twUser:key:min");
-            builder.append(twUserKeyMin);
-
-            builder.append("\n");
-            builder.append("twUser:key:max");
-            builder.append(twUserKeyMax);
-
-            builder.append("\n");
-            builder.append("fbMessage:key:min");
-            builder.append(fbMessageIdMin);
-
-            builder.append("\n");
-            builder.append("fbMessage:key:max");
-            builder.append(fbMessageIdMax);
-
-            builder.append("\n");
-            builder.append("twMessage:key:min");
-            builder.append(twMessageIdMin);
-
-            builder.append("\n");
-            builder.append("twMessage:key:max");
-            builder.append(twMessageIdMax);
-
-            builder.append("\n");
-            builder.append("twMessage:key:max");
-            builder.append(twUserKeyMin);
-
-            return new String(builder);
-        }
-
-        @Override
-        public String getName() {
-            return "Partition";
-        }
-
-    }
-
-    interface ILibraryElement {
-
-        public enum ElementType {
-            PARTITION
-        }
-
-        public String getName();
-
-    }
-
-    public static class Configuration {
-
-        private final float numMB;
-        private final String unit;
-
-        private final List<SourcePartition> sourcePartitions;
-        private List<TargetPartition> targetPartitions;
-
-        public Configuration(float numMB, String unit, List<SourcePartition> partitions) throws IOException {
-            this.numMB = numMB;
-            this.unit = unit;
-            this.sourcePartitions = partitions;
-
-        }
-
-        public float getNumMB() {
-            return numMB;
-        }
-
-        public String getUnit() {
-            return unit;
-        }
-
-        public List<SourcePartition> getSourcePartitions() {
-            return sourcePartitions;
-        }
-
-        public List<TargetPartition> getTargetPartitions() {
-            return targetPartitions;
-        }
-
-        public void setTargetPartitions(List<TargetPartition> targetPartitions) {
-            this.targetPartitions = targetPartitions;
-        }
-
-    }
-
-    public static class XMLUtil {
-
-        public static void writeToXML(Configuration conf, String filePath) throws IOException,
-                ParserConfigurationException, TransformerException {
-
-            DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance();
-            DocumentBuilder docBuilder = docFactory.newDocumentBuilder();
-
-            // root elements
-            Document doc = docBuilder.newDocument();
-            Element rootElement = doc.createElement("Partitions");
-            doc.appendChild(rootElement);
-
-            int index = 0;
-            for (TargetPartition partition : conf.getTargetPartitions()) {
-                writePartitionElement(conf.getSourcePartitions().get(index), partition, rootElement, doc);
-            }
-
-            TransformerFactory transformerFactory = TransformerFactory.newInstance();
-            Transformer transformer = transformerFactory.newTransformer();
-
-            transformer.setOutputProperty(OutputKeys.ENCODING, "utf-8");
-            transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "no");
-            transformer.setOutputProperty(OutputKeys.INDENT, "yes");
-            transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "2");
-
-            DOMSource source = new DOMSource(doc);
-            StreamResult result = new StreamResult(new File(filePath));
-
-            transformer.transform(source, result);
-
-        }
-
-        public static void writePartitionInfo(Configuration conf, String filePath) throws IOException {
-            BufferedWriter bw = new BufferedWriter(new FileWriter(filePath));
-            for (SourcePartition sp : conf.getSourcePartitions()) {
-                bw.write(sp.getHost() + ":" + sp.getName() + ":" + sp.getPath());
-                bw.write("\n");
-            }
-            bw.close();
-        }
-
-        public static Document getDocument(String filePath) throws Exception {
-            File inputFile = new File(filePath);
-            DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
-            DocumentBuilder db = dbf.newDocumentBuilder();
-            Document doc = db.parse(inputFile);
-            doc.getDocumentElement().normalize();
-            return doc;
-        }
-
-        public static Configuration getConfiguration(String filePath) throws Exception {
-            Configuration conf = getConfiguration(getDocument(filePath));
-            PartitionMetrics metrics = new PartitionMetrics(conf.getNumMB(), conf.getUnit(), conf.getSourcePartitions()
-                    .size());
-            List<TargetPartition> targetPartitions = getTargetPartitions(metrics, conf.getSourcePartitions());
-            conf.setTargetPartitions(targetPartitions);
-            return conf;
-        }
-
-        public static Configuration getConfiguration(Document document) throws IOException {
-            Element rootEle = document.getDocumentElement();
-            NodeList nodeList = rootEle.getChildNodes();
-            float size = Float.parseFloat(getStringValue((Element) nodeList, "size"));
-            String unit = getStringValue((Element) nodeList, "unit");
-            List<SourcePartition> sourcePartitions = getSourcePartitions(document);
-            return new Configuration(size, unit, sourcePartitions);
-        }
-
-        public static List<SourcePartition> getSourcePartitions(Document document) {
-            Element rootEle = document.getDocumentElement();
-            NodeList nodeList = rootEle.getElementsByTagName("partition");
-            List<SourcePartition> sourcePartitions = new ArrayList<SourcePartition>();
-            for (int i = 0; i < nodeList.getLength(); i++) {
-                Node node = nodeList.item(i);
-                sourcePartitions.add(getSourcePartition((Element) node));
-            }
-            return sourcePartitions;
-        }
-
-        public static SourcePartition getSourcePartition(Element functionElement) {
-            String name = getStringValue(functionElement, "name");
-            String host = getStringValue(functionElement, "host");
-            String path = getStringValue(functionElement, "path");
-            SourcePartition sp = new SourcePartition(name, host, path);
-            return sp;
-        }
-
-        public static String getStringValue(Element element, String tagName) {
-            String textValue = null;
-            NodeList nl = element.getElementsByTagName(tagName);
-            if (nl != null && nl.getLength() > 0) {
-                Element el = (Element) nl.item(0);
-                textValue = el.getFirstChild().getNodeValue();
-            }
-            return textValue;
-        }
-
-        public static PartitionConfiguration getPartitionConfiguration(String filePath, String partitionName)
-                throws Exception {
-            PartitionConfiguration pconf = getPartitionConfigurations(getDocument(filePath), partitionName);
-            return pconf;
-        }
-
-        public static PartitionConfiguration getPartitionConfigurations(Document document, String partitionName)
-                throws IOException {
-
-            Element rootEle = document.getDocumentElement();
-            NodeList nodeList = rootEle.getElementsByTagName("Partition");
-
-            for (int i = 0; i < nodeList.getLength(); i++) {
-                Node node = nodeList.item(i);
-                Element nodeElement = (Element) node;
-                String name = getStringValue(nodeElement, "name");
-                if (!name.equalsIgnoreCase(partitionName)) {
-                    continue;
-                }
-                String host = getStringValue(nodeElement, "host");
-                String path = getStringValue(nodeElement, "path");
-
-                String fbUserKeyMin = getStringValue(nodeElement, "fbUserKeyMin");
-                String fbUserKeyMax = getStringValue(nodeElement, "fbUserKeyMax");
-                String twUserKeyMin = getStringValue(nodeElement, "twUserKeyMin");
-                String twUserKeyMax = getStringValue(nodeElement, "twUserKeyMax");
-                String fbMessageKeyMin = getStringValue(nodeElement, "fbMessageKeyMin");
-
-                String fbMessageKeyMax = getStringValue(nodeElement, "fbMessageKeyMax");
-                String twMessageKeyMin = getStringValue(nodeElement, "twMessageKeyMin");
-                String twMessageKeyMax = getStringValue(nodeElement, "twMessageKeyMax");
-                String numCommonUsers = getStringValue(nodeElement, "numCommonUsers");
-
-                SourcePartition sp = new SourcePartition(name, host, path);
-
-                TargetPartition tp = new TargetPartition(partitionName, host, path, Integer.parseInt(fbUserKeyMin),
-                        Integer.parseInt(fbUserKeyMax), Integer.parseInt(twUserKeyMin), Integer.parseInt(twUserKeyMax),
-                        Integer.parseInt(fbMessageKeyMin), Integer.parseInt(fbMessageKeyMax),
-                        Integer.parseInt(twMessageKeyMin), Integer.parseInt(twMessageKeyMax),
-                        Integer.parseInt(numCommonUsers));
-                PartitionConfiguration pc = new PartitionConfiguration(sp, tp);
-                return pc;
-            }
-            return null;
-        }
-
-        public static List<TargetPartition> getTargetPartitions(PartitionMetrics metrics,
-                List<SourcePartition> sourcePartitions) {
-            List<TargetPartition> partitions = new ArrayList<TargetPartition>();
-            int fbUserKeyMin = 1;
-            int twUserKeyMin = 1;
-            int fbMessageIdMin = 1;
-            int twMessageIdMin = 1;
-
-            for (SourcePartition sp : sourcePartitions) {
-                int fbUserKeyMax = fbUserKeyMin + metrics.getFbOnlyUsers() + metrics.getCommonUsers() - 1;
-                int twUserKeyMax = twUserKeyMin + metrics.getTwitterOnlyUsers() + metrics.getCommonUsers() - 1;
-
-                int fbMessageIdMax = fbMessageIdMin + metrics.getFbMessages() - 1;
-                int twMessageIdMax = twMessageIdMin + metrics.getTwMessages() - 1;
-                TargetPartition pe = new TargetPartition(sp.getName(), sp.getHost(), sp.getPath(), fbUserKeyMin,
-                        fbUserKeyMax, twUserKeyMin, twUserKeyMax, fbMessageIdMin, fbMessageIdMax, twMessageIdMin,
-                        twMessageIdMax, metrics.getCommonUsers());
-                partitions.add(pe);
-
-                fbUserKeyMin = fbUserKeyMax + 1;
-                twUserKeyMin = twUserKeyMax + 1;
-
-                fbMessageIdMin = fbMessageIdMax + 1;
-                twMessageIdMin = twMessageIdMax + 1;
-            }
-
-            return partitions;
-        }
-
-        public static void writePartitionElement(SourcePartition sourcePartition, TargetPartition partition,
-                Element rootElement, Document doc) {
-            // staff elements
-            Element pe = doc.createElement("Partition");
-            rootElement.appendChild(pe);
-
-            // name element
-            Element name = doc.createElement("name");
-            name.appendChild(doc.createTextNode("" + partition.getName()));
-            pe.appendChild(name);
-
-            // host element
-            Element host = doc.createElement("host");
-            host.appendChild(doc.createTextNode("" + partition.getHost()));
-            pe.appendChild(host);
-
-            // path element
-            Element path = doc.createElement("path");
-            path.appendChild(doc.createTextNode("" + partition.getPath()));
-            pe.appendChild(path);
-
-            // fbUserKeyMin element
-            Element fbUserKeyMin = doc.createElement("fbUserKeyMin");
-            fbUserKeyMin.appendChild(doc.createTextNode("" + partition.getFbUserKeyMin()));
-            pe.appendChild(fbUserKeyMin);
-
-            // fbUserKeyMax element
-            Element fbUserKeyMax = doc.createElement("fbUserKeyMax");
-            fbUserKeyMax.appendChild(doc.createTextNode("" + partition.getFbUserKeyMax()));
-            pe.appendChild(fbUserKeyMax);
-
-            // twUserKeyMin element
-            Element twUserKeyMin = doc.createElement("twUserKeyMin");
-            twUserKeyMin.appendChild(doc.createTextNode("" + partition.getTwUserKeyMin()));
-            pe.appendChild(twUserKeyMin);
-
-            // twUserKeyMax element
-            Element twUserKeyMax = doc.createElement("twUserKeyMax");
-            twUserKeyMax.appendChild(doc.createTextNode("" + partition.getTwUserKeyMax()));
-            pe.appendChild(twUserKeyMax);
-
-            // fbMessgeKeyMin element
-            Element fbMessageKeyMin = doc.createElement("fbMessageKeyMin");
-            fbMessageKeyMin.appendChild(doc.createTextNode("" + partition.getFbMessageIdMin()));
-            pe.appendChild(fbMessageKeyMin);
-
-            // fbMessgeKeyMin element
-            Element fbMessageKeyMax = doc.createElement("fbMessageKeyMax");
-            fbMessageKeyMax.appendChild(doc.createTextNode("" + partition.getFbMessageIdMax()));
-            pe.appendChild(fbMessageKeyMax);
-
-            // twMessgeKeyMin element
-            Element twMessageKeyMin = doc.createElement("twMessageKeyMin");
-            twMessageKeyMin.appendChild(doc.createTextNode("" + partition.getTwMessageIdMin()));
-            pe.appendChild(twMessageKeyMin);
-
-            // twMessgeKeyMin element
-            Element twMessageKeyMax = doc.createElement("twMessageKeyMax");
-            twMessageKeyMax.appendChild(doc.createTextNode("" + partition.getTwMessageIdMax()));
-            pe.appendChild(twMessageKeyMax);
-
-            // twMessgeKeyMin element
-            Element numCommonUsers = doc.createElement("numCommonUsers");
-            numCommonUsers.appendChild(doc.createTextNode("" + partition.getCommonUsers()));
-            pe.appendChild(numCommonUsers);
-
-        }
-
-        public static void main(String args[]) throws Exception {
-            String confFile = "/Users/rgrove1/work/research/asterix/icde/data-gen/conf/conf.xml";
-            String outputPath = "/Users/rgrove1/work/research/asterix/icde/data-gen/output/conf-output.xml";
-            Configuration conf = getConfiguration(confFile);
-            writeToXML(conf, outputPath);
-        }
-
-    }
-
     public static class Date {
 
         private int day;
@@ -1992,57 +728,6 @@
         }
     }
 
-    public static class PartitionMetrics {
-
-        private final int fbMessages;
-        private final int twMessages;
-
-        private final int fbOnlyUsers;
-        private final int twitterOnlyUsers;
-        private final int commonUsers;
-
-        public PartitionMetrics(float number, String unit, int numPartitions) throws IOException {
-
-            int factor = 0;
-            if (unit.equalsIgnoreCase("MB")) {
-                factor = 1024 * 1024;
-            } else if (unit.equalsIgnoreCase("GB")) {
-                factor = 1024 * 1024 * 1024;
-            } else if (unit.equalsIgnoreCase("TB")) {
-                factor = 1024 * 1024 * 1024 * 1024;
-            } else
-                throw new IOException("Invalid unit:" + unit);
-
-            fbMessages = (int) (((number * factor * 0.80) / 258) / numPartitions);
-            twMessages = (int) (fbMessages * 1.1 / 0.35);
-
-            fbOnlyUsers = (int) ((number * factor * 0.20 * 0.0043)) / numPartitions;
-            twitterOnlyUsers = (int) (0.25 * fbOnlyUsers);
-            commonUsers = (int) (0.1 * fbOnlyUsers);
-        }
-
-        public int getFbMessages() {
-            return fbMessages;
-        }
-
-        public int getTwMessages() {
-            return twMessages;
-        }
-
-        public int getFbOnlyUsers() {
-            return fbOnlyUsers;
-        }
-
-        public int getTwitterOnlyUsers() {
-            return twitterOnlyUsers;
-        }
-
-        public int getCommonUsers() {
-            return commonUsers;
-        }
-
-    }
-
     public static String[] lastNames = { "Hoopengarner", "Harrow", "Gardner", "Blyant", "Best", "Buttermore", "Gronko",
             "Mayers", "Countryman", "Neely", "Ruhl", "Taggart", "Bash", "Cason", "Hil", "Zalack", "Mingle", "Carr",
             "Rohtin", "Wardle", "Pullman", "Wire", "Kellogg", "Hiles", "Keppel", "Bratton", "Sutton", "Wickes",
@@ -2482,4 +1167,4 @@
             "Lexicone", "Fax-fax", "Viatechi", "Inchdox", "Kongreen", "Doncare", "Y-geohex", "Opeelectronics",
             "Medflex", "Dancode", "Roundhex", "Labzatron", "Newhotplus", "Sancone", "Ronholdings", "Quoline",
             "zoomplus", "Fix-touch", "Codetechno", "Tanzumbam", "Indiex", "Canline" };
-}
+}
\ No newline at end of file
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/GenericSocketFeedAdapter.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/GenericSocketFeedAdapter.java
index 4cdda1e..278565a 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/GenericSocketFeedAdapter.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/GenericSocketFeedAdapter.java
@@ -4,7 +4,6 @@
 import java.io.InputStream;
 import java.net.ServerSocket;
 import java.net.Socket;
-import java.util.Map;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -20,23 +19,14 @@
 
     private static final long serialVersionUID = 1L;
 
-    public static final String KEY_PORT = "port";
-
     private static final Logger LOGGER = Logger.getLogger(GenericSocketFeedAdapter.class.getName());
 
-    private Map<String, String> configuration;
-
     private SocketFeedServer socketFeedServer;
 
-    private static final int DEFAULT_PORT = 2909;
-
-    public GenericSocketFeedAdapter(Map<String, String> configuration, ITupleParserFactory parserFactory,
-            ARecordType outputtype, IHyracksTaskContext ctx) throws AsterixException, IOException {
+    public GenericSocketFeedAdapter(ITupleParserFactory parserFactory, ARecordType outputtype, int port,
+            IHyracksTaskContext ctx) throws AsterixException, IOException {
         super(parserFactory, outputtype, ctx);
-        this.configuration = configuration;
-        String portValue = (String) this.configuration.get(KEY_PORT);
-        int port = portValue != null ? Integer.parseInt(portValue) : DEFAULT_PORT;
-        this.socketFeedServer = new SocketFeedServer(configuration, outputtype, port);
+        this.socketFeedServer = new SocketFeedServer(outputtype, port);
     }
 
     @Override
@@ -53,8 +43,7 @@
         private ServerSocket serverSocket;
         private InputStream inputStream;
 
-        public SocketFeedServer(Map<String, String> configuration, ARecordType outputtype, int port)
-                throws IOException, AsterixException {
+        public SocketFeedServer(ARecordType outputtype, int port) throws IOException, AsterixException {
             try {
                 serverSocket = new ServerSocket(port);
             } catch (Exception e) {
@@ -70,19 +59,27 @@
         public InputStream getInputStream() {
             Socket socket;
             try {
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("waiting for client at " + serverSocket.getLocalPort());
+                }
                 socket = serverSocket.accept();
                 inputStream = socket.getInputStream();
             } catch (IOException e) {
                 if (LOGGER.isLoggable(Level.SEVERE)) {
                     LOGGER.severe("Unable to create input stream required for feed ingestion");
                 }
-                e.printStackTrace();
             }
             return inputStream;
         }
 
         public void stop() throws IOException {
-            serverSocket.close();
+            try {
+                serverSocket.close();
+            } catch (IOException ioe) {
+                if (LOGGER.isLoggable(Level.WARNING)) {
+                    LOGGER.warning("Unable to close socket at " + serverSocket.getLocalPort());
+                }
+            }
         }
 
     }
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/GenericSocketFeedAdapterFactory.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/GenericSocketFeedAdapterFactory.java
index 03c65c7..4fd0453 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/GenericSocketFeedAdapterFactory.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/GenericSocketFeedAdapterFactory.java
@@ -14,34 +14,52 @@
  */
 package edu.uci.ics.asterix.tools.external.data;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.commons.lang3.StringUtils;
 
 import edu.uci.ics.asterix.external.adapter.factory.StreamBasedAdapterFactory;
 import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
 import edu.uci.ics.asterix.metadata.feeds.IGenericAdapterFactory;
 import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.util.AsterixRuntimeUtil;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 
 /**
  * Factory class for creating @see{GenericSocketFeedAdapter} The
  * adapter listens at a port for receiving data (from external world).
- * Data received is transformed into Asterix Data Format (ADM) and stored into
- * a dataset a configured in the Adapter configuration.
+ * Data received is transformed into Asterix Data Format (ADM).
  */
 public class GenericSocketFeedAdapterFactory extends StreamBasedAdapterFactory implements IGenericAdapterFactory {
 
-    /**
-     * 
-     */
     private static final long serialVersionUID = 1L;
 
     private ARecordType outputType;
 
+    private List<Pair<String, Integer>> sockets;
+
+    private Mode mode = Mode.IP;
+
+    public static final String KEY_SOCKETS = "sockets";
+
+    public static final String KEY_MODE = "address-type";
+
+    public static enum Mode {
+        NC,
+        IP
+    }
+
     @Override
     public String getName() {
-        return "generic_socket_feed";
+        return "socket_adaptor";
     }
 
     @Override
@@ -59,16 +77,67 @@
         this.configuration = configuration;
         outputType = (ARecordType) outputType;
         this.configureFormat(outputType);
+        this.configureSockets(configuration);
     }
 
     @Override
     public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
-        return new AlgebricksCountPartitionConstraint(1);
+        List<String> locations = new ArrayList<String>();
+        for (Pair<String, Integer> socket : sockets) {
+            locations.add(socket.first);
+        }
+        return new AlgebricksAbsolutePartitionConstraint(locations.toArray(new String[] {}));
     }
 
     @Override
     public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception {
-        return new GenericSocketFeedAdapter(configuration, parserFactory, outputType, ctx);
+        Pair<String, Integer> socket = sockets.get(partition);
+        return new GenericSocketFeedAdapter(parserFactory, outputType, socket.second, ctx);
     }
 
+    private void configureSockets(Map<String, String> configuration) throws Exception {
+        sockets = new ArrayList<Pair<String, Integer>>();
+        String modeValue = configuration.get(KEY_MODE);
+        if (modeValue != null) {
+            mode = Mode.valueOf(modeValue.trim().toUpperCase());
+        }
+        String socketsValue = configuration.get(KEY_SOCKETS);
+        Map<String, Set<String>> ncMap = AsterixRuntimeUtil.getNodeControllerMap();
+        List<String> ncs = AsterixRuntimeUtil.getAllNodeControllers();
+        if (socketsValue == null) {
+            throw new IllegalArgumentException("\'sockets\' parameter not specified as part of adaptor configuration");
+        }
+        String[] socketsArray = socketsValue.split(",");
+        Random random = new Random();
+        for (String socket : socketsArray) {
+            String[] socketTokens = socket.split(":");
+            String host = socketTokens[0];
+            int port = Integer.parseInt(socketTokens[1]);
+            Pair<String, Integer> p = null;
+            switch (mode) {
+                case IP:
+                    Set<String> ncsOnIp = ncMap.get(host);
+                    if (ncsOnIp == null || ncsOnIp.isEmpty()) {
+                        throw new IllegalArgumentException("Invalid host " + host
+                                + " as it is not part of the AsterixDB cluster. Valid choices are "
+                                + StringUtils.join(ncMap.keySet(), ", "));
+                    }
+                    String[] ncArray = ncsOnIp.toArray(new String[] {});
+                    String nc = ncArray[random.nextInt(ncArray.length)];
+                    p = new Pair<String, Integer>(nc, port);
+                    break;
+
+                case NC:
+                    p = new Pair<String, Integer>(host, port);
+                    if (!ncs.contains(host)) {
+                        throw new IllegalArgumentException("Invalid NC " + host
+                                + " as it is not part of the AsterixDB cluster. Valid choices are "
+                                + StringUtils.join(ncs, ", "));
+
+                    }
+                    break;
+            }
+            sockets.add(p);
+        }
+    }
 }
\ No newline at end of file
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TweetGenerator.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TweetGenerator.java
index b92c3fd..28ff14d 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TweetGenerator.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TweetGenerator.java
@@ -1,17 +1,3 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
 package edu.uci.ics.asterix.tools.external.data;
 
 import java.io.IOException;
@@ -25,49 +11,93 @@
 
 public class TweetGenerator {
 
+    public static final String NUM_KEY_SPACES = "num-key-spaces";
     public static final String KEY_DURATION = "duration";
     public static final String KEY_TPS = "tps";
-    public static final String KEY_MIN_TPS = "tps-min";
-    public static final String KEY_MAX_TPS = "tps-max";
     public static final String KEY_TPUT_DURATION = "tput-duration";
     public static final String KEY_GUID_SEED = "guid-seed";
+    public static final String KEY_FRAME_WRITER_MODE = "frame-writer-mode";
+    public static final String KEY_DATA_MODE = "data-mode";
 
     public static final String OUTPUT_FORMAT = "output-format";
     public static final String OUTPUT_FORMAT_ARECORD = "arecord";
     public static final String OUTPUT_FORMAT_ADM_STRING = "adm-string";
 
+    private static final int DEFAULT_DURATION = 60;
+    private static final int DEFAULT_GUID_SEED = 0;
+
     private int duration;
     private TweetMessageIterator tweetIterator = null;
+    private int partition;
+    private int tweetCount = 0;
     private int frameTweetCount = 0;
     private int numFlushedTweets = 0;
     private OutputStream os;
     private DataGenerator dataGenerator = null;
     private ByteBuffer outputBuffer = ByteBuffer.allocate(32 * 1024);
+    private GULongIDGenerator[] uidGenerators;
+    private int numUidGenerators;
+    private FrameWriterMode frameWriterMode;
+    private DataMode dataMode;
 
-    public TweetGenerator(Map<String, String> configuration, int partition, String format) throws Exception {
-        String value = configuration.get(KEY_DURATION);
-        duration = value != null ? Integer.parseInt(value) : 60;
-        InitializationInfo info = new InitializationInfo();
-        info.timeDurationInSecs = duration;
-        dataGenerator = new DataGenerator(info);
-
-        String seedValue = configuration.get(KEY_GUID_SEED);
-        int seedInt = seedValue != null ? Integer.parseInt(seedValue) : 0;
-        tweetIterator = dataGenerator.new TweetMessageIterator(duration, partition, (byte) seedInt);
+    public int getTweetCount() {
+        return tweetCount;
     }
 
-    private void writeTweetString(TweetMessage next) throws IOException {
-        String tweet = next.toString() + "\n";
-        byte[] b = tweet.getBytes();
-        if (outputBuffer.position() + b.length > outputBuffer.limit()) {
-            flush();
-            numFlushedTweets += frameTweetCount;
-            frameTweetCount = 0;
-            outputBuffer.put(b);
-            frameTweetCount++;
-        } else {
-            outputBuffer.put(b);
-            frameTweetCount++;
+    public enum DataMode {
+        REUSE_DATA,
+        NEW_DATA
+    }
+
+    public enum FrameWriterMode {
+        DUMMY_NO_PARSING,
+        PARSING
+    }
+
+    public TweetGenerator(Map<String, String> configuration, int partition, String format, OutputStream os)
+            throws Exception {
+        this.partition = partition;
+        String value = configuration.get(KEY_DURATION);
+        duration = value != null ? Integer.parseInt(value) : DEFAULT_DURATION;
+
+        value = configuration.get(KEY_DATA_MODE);
+        dataMode = value != null ? DataMode.valueOf(value) : DataMode.NEW_DATA;
+        numUidGenerators = configuration.get(NUM_KEY_SPACES) != null ? Integer.parseInt(configuration
+                .get(NUM_KEY_SPACES)) : 1;
+        uidGenerators = new GULongIDGenerator[numUidGenerators];
+
+        int guidSeed = configuration.get(KEY_GUID_SEED) != null ? Integer.parseInt(configuration.get(KEY_GUID_SEED))
+                : DEFAULT_GUID_SEED;
+
+        for (int i = 0; i < uidGenerators.length; i++) {
+            uidGenerators[i] = new GULongIDGenerator(partition, (byte) (i + guidSeed));
+        }
+
+        InitializationInfo info = new InitializationInfo();
+        dataGenerator = new DataGenerator(info);
+        value = configuration.get(KEY_FRAME_WRITER_MODE);
+        frameWriterMode = value != null ? FrameWriterMode.valueOf(value.toUpperCase()) : FrameWriterMode.PARSING;
+        dataMode = configuration.get(KEY_DATA_MODE) != null ? DataMode.valueOf(configuration.get(KEY_DATA_MODE))
+                : DataMode.NEW_DATA;
+        tweetIterator = dataGenerator.new TweetMessageIterator(duration, uidGenerators, dataMode);
+        this.os = os;
+    }
+
+    private void writeTweetString(TweetMessage tweetMessage) throws IOException {
+        String tweet = tweetMessage.toString() + "\n";
+        tweetCount++;
+        if (frameWriterMode.equals(FrameWriterMode.PARSING)) {
+            byte[] b = tweet.getBytes();
+            if (outputBuffer.position() + b.length > outputBuffer.limit()) {
+                flush();
+                numFlushedTweets += frameTweetCount;
+                frameTweetCount = 0;
+                outputBuffer.put(b);
+                frameTweetCount++;
+            } else {
+                outputBuffer.put(b);
+                frameTweetCount++;
+            }
         }
     }
 
@@ -80,21 +110,22 @@
         os.write(outputBuffer.array(), 0, outputBuffer.limit());
         outputBuffer.position(0);
         outputBuffer.limit(32 * 1024);
+        tweetIterator.toggleUidKeySpace();
     }
 
     public boolean setNextRecordBatch(int numTweetsInBatch) throws Exception {
-        int count = 0;
-        if (tweetIterator.hasNext()) {
+        boolean moreData = tweetIterator.hasNext();
+        if (!moreData) {
+            System.out.println("TWEET COUNT: [" + partition + "]" + tweetCount);
+            return false;
+        } else {
+            int count = 0;
             while (count < numTweetsInBatch) {
                 writeTweetString(tweetIterator.next());
                 count++;
             }
             return true;
         }
-        return false;
     }
 
-    public void setOutputStream(OutputStream os) {
-        this.os = os;
-    }
-}
+}
\ No newline at end of file
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapter.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapter.java
index 07e018a..56e4673 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapter.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapter.java
@@ -1,30 +1,11 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
 package edu.uci.ics.asterix.tools.external.data;
 
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.net.UnknownHostException;
-import java.util.Date;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
 import java.util.Map;
-import java.util.Random;
-import java.util.Timer;
-import java.util.TimerTask;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.logging.Level;
@@ -39,6 +20,8 @@
 
 /**
  * TPS can be configured between 1 and 20,000
+ * 
+ * @author ramang
  */
 public class TwitterFirehoseFeedAdapter extends StreamBasedAdapter implements IFeedAdapter {
 
@@ -46,180 +29,104 @@
 
     private static final Logger LOGGER = Logger.getLogger(TwitterFirehoseFeedAdapter.class.getName());
 
-    private final TwitterServer twitterServer;
-
-    private TwitterClient twitterClient;
-
-    private static final String LOCALHOST = "127.0.0.1";
-    private static final int PORT = 2909;
-    private static final int TPUT_DURATION_DEFAULT = 5; // 5 seconds
+    private final Map<String, String> configuration;
 
     private ExecutorService executorService = Executors.newCachedThreadPool();
 
+    private PipedOutputStream outputStream = new PipedOutputStream();
+
+    private PipedInputStream inputStream = new PipedInputStream(outputStream);
+
+    private final TwitterServer twitterServer;
+
     public TwitterFirehoseFeedAdapter(Map<String, String> configuration, ITupleParserFactory parserFactory,
-            ARecordType outputtype, IHyracksTaskContext ctx, int partition) throws Exception {
+            ARecordType outputtype, int partition, IHyracksTaskContext ctx) throws Exception {
         super(parserFactory, outputtype, ctx);
-        this.twitterServer = new TwitterServer(configuration, outputtype, executorService, partition);
-        this.twitterClient = new TwitterClient(twitterServer.getPort());
+        this.configuration = configuration;
+        this.twitterServer = new TwitterServer(configuration, partition, outputtype, outputStream, executorService);
     }
 
     @Override
     public void start(int partition, IFrameWriter writer) throws Exception {
         twitterServer.start();
-        twitterClient.start();
         super.start(partition, writer);
     }
 
     @Override
     public InputStream getInputStream(int partition) throws IOException {
-        return twitterClient.getInputStream();
+        return inputStream;
     }
 
     private static class TwitterServer {
-        private ServerSocket serverSocket;
-        private final Listener listener;
-        private int port = -1;
-        private ExecutorService executorService;
+        private final DataProvider dataProvider;
+        private final ExecutorService executorService;
 
-        public TwitterServer(Map<String, String> configuration, ARecordType outputtype,
-                ExecutorService executorService, int partition) throws Exception {
-            int numAttempts = 0;
-            while (port < 0) {
-                try {
-                    serverSocket = new ServerSocket(PORT + numAttempts);
-                    port = PORT + numAttempts;
-                } catch (Exception e) {
-                    if (LOGGER.isLoggable(Level.INFO)) {
-                        LOGGER.info("port: " + (PORT + numAttempts) + " unusable ");
-                    }
-                    numAttempts++;
-                }
-            }
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Twitter server configured to use port: " + port);
-            }
-            String dvds = configuration.get("dataverse-dataset");
-            listener = new Listener(serverSocket, configuration, outputtype, dvds, partition);
+        public TwitterServer(Map<String, String> configuration, int partition, ARecordType outputtype, OutputStream os,
+                ExecutorService executorService) throws Exception {
+            dataProvider = new DataProvider(configuration, outputtype, partition, os);
             this.executorService = executorService;
         }
 
-        public void start() {
-            executorService.execute(listener);
-        }
-
         public void stop() throws IOException {
-            listener.stop();
-            serverSocket.close();
+            dataProvider.stop();
         }
 
-        public int getPort() {
-            return port;
-        }
-    }
-
-    private static class TwitterClient {
-
-        private Socket socket;
-        private int port;
-
-        public TwitterClient(int port) throws UnknownHostException, IOException {
-            this.port = port;
-        }
-
-        public InputStream getInputStream() throws IOException {
-            return socket.getInputStream();
-        }
-
-        public void start() throws UnknownHostException, IOException {
-            socket = new Socket(LOCALHOST, port);
+        public void start() {
+            executorService.execute(dataProvider);
         }
 
     }
 
-    private static class Listener implements Runnable {
-
-        private final ServerSocket serverSocket;
-        private Socket socket;
-        private TweetGenerator tweetGenerator;
-        private boolean continuePush = true;
-        private int fixedTps = -1;
-        private int minTps = -1;
-        private int maxTps = -1;
-        private int tputDuration;
-        private Rate task;
-        private Mode mode;
+    private static class DataProvider implements Runnable {
 
         public static final String KEY_MODE = "mode";
 
+        private TweetGenerator tweetGenerator;
+        private boolean continuePush = true;
+        private int batchSize;
+        private final Mode mode;
+        private final OutputStream os;
+
         public static enum Mode {
             AGGRESSIVE,
-            CONTROLLED,
+            CONTROLLED
         }
 
-        public Listener(ServerSocket serverSocket, Map<String, String> configuration, ARecordType outputtype,
-                String datasetName, int partition) throws Exception {
-            this.serverSocket = serverSocket;
-            this.tweetGenerator = new TweetGenerator(configuration, partition, TweetGenerator.OUTPUT_FORMAT_ADM_STRING);
-            String value = configuration.get(KEY_MODE);
-            String confValue = null;
-            if (value != null) {
-                mode = Mode.valueOf(value.toUpperCase());
-                switch (mode) {
-                    case AGGRESSIVE:
-                        break;
-                    case CONTROLLED:
-                        confValue = configuration.get(TweetGenerator.KEY_TPS);
-                        if (confValue != null) {
-                            minTps = Integer.parseInt(confValue);
-                            maxTps = minTps;
-                            fixedTps = minTps;
-                        } else {
-                            confValue = configuration.get(TweetGenerator.KEY_MIN_TPS);
-                            if (confValue != null) {
-                                minTps = Integer.parseInt(confValue);
-                            }
-                            confValue = configuration.get(TweetGenerator.KEY_MAX_TPS);
-                            if (confValue != null) {
-                                maxTps = Integer.parseInt(configuration.get(TweetGenerator.KEY_MAX_TPS));
-                            }
-
-                            if (minTps < 0 || maxTps < 0 || minTps > maxTps) {
-                                throw new IllegalArgumentException("Incorrect value for min/max TPS");
-                            }
-                        }
-
-                }
-            } else {
-                mode = Mode.AGGRESSIVE;
+        public DataProvider(Map<String, String> configuration, ARecordType outputtype, int partition, OutputStream os)
+                throws Exception {
+            this.tweetGenerator = new TweetGenerator(configuration, partition, TweetGenerator.OUTPUT_FORMAT_ADM_STRING,
+                    os);
+            this.os = os;
+            mode = configuration.get(KEY_MODE) != null ? Mode.valueOf(configuration.get(KEY_MODE).toUpperCase())
+                    : Mode.AGGRESSIVE;
+            switch (mode) {
+                case CONTROLLED:
+                    String tpsValue = configuration.get(TweetGenerator.KEY_TPS);
+                    if (tpsValue == null) {
+                        throw new IllegalArgumentException("TPS value not configured. use tps=<value>");
+                    }
+                    batchSize = Integer.parseInt(tpsValue);
+                    break;
+                case AGGRESSIVE:
+                    batchSize = 5000;
+                    break;
             }
-
-            value = configuration.get(TweetGenerator.KEY_TPUT_DURATION);
-            tputDuration = value != null ? Integer.parseInt(value) : TPUT_DURATION_DEFAULT;
-            task = new Rate(tweetGenerator, tputDuration, datasetName, partition);
         }
 
         @Override
         public void run() {
+            boolean moreData = true;
+            long startBatch;
+            long endBatch;
+
             while (true) {
                 try {
-                    socket = serverSocket.accept();
-                    OutputStream os = socket.getOutputStream();
-                    tweetGenerator.setOutputStream(os);
-                    boolean moreData = true;
-                    Timer timer = new Timer();
-                    timer.schedule(task, tputDuration * 1000, tputDuration * 1000);
-                    long startBatch;
-                    long endBatch;
-                    Random random = new Random();
-                    int batchSize = 0;
                     while (moreData && continuePush) {
                         switch (mode) {
+                            case AGGRESSIVE:
+                                moreData = tweetGenerator.setNextRecordBatch(batchSize);
+                                break;
                             case CONTROLLED:
-                                if (maxTps > 0) {
-                                    batchSize = minTps + random.nextInt((maxTps + 1) - minTps);
-                                } else {
-                                    batchSize = fixedTps;
-                                }
                                 startBatch = System.currentTimeMillis();
                                 moreData = tweetGenerator.setNextRecordBatch(batchSize);
                                 endBatch = System.currentTimeMillis();
@@ -227,30 +134,14 @@
                                     Thread.sleep(1000 - (endBatch - startBatch));
                                 }
                                 break;
-                            case AGGRESSIVE:
-                                batchSize = Integer.MAX_VALUE;
-                                moreData = tweetGenerator.setNextRecordBatch(batchSize);
                         }
                     }
-                    timer.cancel();
                     os.close();
                     break;
                 } catch (Exception e) {
                     if (LOGGER.isLoggable(Level.WARNING)) {
                         LOGGER.warning("Exception in adaptor " + e.getMessage());
                     }
-                } finally {
-                    try {
-                        if (socket != null && socket.isClosed()) {
-                            socket.close();
-                            if (LOGGER.isLoggable(Level.INFO)) {
-                                LOGGER.info("Closed socket:" + socket.getPort());
-                            }
-                        }
-                    } catch (IOException e) {
-                        e.printStackTrace();
-                    }
-
                 }
             }
         }
@@ -259,37 +150,6 @@
             continuePush = false;
         }
 
-        private static class Rate extends TimerTask {
-
-            private final TweetGenerator gen;
-            private final int tputDuration;
-            private final int partition;
-            private final String dataset;
-            private int prevMeasuredTweets = 0;
-
-            public Rate(TweetGenerator gen, int tputDuration, String dataset, int partition) {
-                this.gen = gen;
-                this.tputDuration = tputDuration;
-                this.dataset = dataset;
-                this.partition = partition;
-                if (LOGGER.isLoggable(Level.WARNING)) {
-                    LOGGER.warning(new Date() + " " + "Dataset" + " " + "partition" + " " + "Total flushed tweets"
-                            + "\t" + "intantaneous throughput");
-                }
-            }
-
-            @Override
-            public void run() {
-                int currentMeasureTweets = gen.getNumFlushedTweets();
-                if (LOGGER.isLoggable(Level.FINE)) {
-                    LOGGER.fine(dataset + " " + partition + " " + gen.getNumFlushedTweets() + "\t"
-                            + ((currentMeasureTweets - prevMeasuredTweets) / tputDuration) + " ID "
-                            + Thread.currentThread().getId());
-                }
-                prevMeasuredTweets = currentMeasureTweets;
-            }
-
-        }
     }
 
     @Override
@@ -297,4 +157,9 @@
         twitterServer.stop();
     }
 
-}
+    @Override
+    public Map<String, String> getConfiguration() {
+        return configuration;
+    }
+
+}
\ No newline at end of file
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapterFactory.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapterFactory.java
index 2305c32..41b1915 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapterFactory.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapterFactory.java
@@ -1,5 +1,5 @@
 /*
-x * Copyright 2009-2012 by The Regents of the University of California
+x * Copyright 2009-2013 by The Regents of the University of California
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
@@ -41,13 +41,6 @@
     private static final long serialVersionUID = 1L;
 
     /*
-     * The dataverse and dataset names for the target feed dataset. This informaiton 
-     * is used in configuring partition constraints for the adapter. It is preferred that 
-     * the adapter location does not coincide with a partition location for the feed dataset.
-     */
-    private static final String KEY_DATAVERSE_DATASET = "dataverse-dataset";
-
-    /*
      * Degree of parallelism for feed ingestion activity. Defaults to 1.
      * This builds up the count constraint for the ingestion operator.
      */
@@ -106,7 +99,7 @@
 
     @Override
     public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception {
-        return new TwitterFirehoseFeedAdapter(configuration, parserFactory, outputType, ctx, partition);
+        return new TwitterFirehoseFeedAdapter(configuration, parserFactory, outputType, partition, ctx);
     }
 
     @Override