synthetic adapter
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
index f00deba..c0608d4 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
@@ -63,6 +63,7 @@
 import edu.uci.ics.asterix.common.config.GlobalConfig;
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
 import edu.uci.ics.asterix.common.functions.FunctionSignature;
+import edu.uci.ics.asterix.external.feed.lifecycle.FeedActivityIdFactory;
 import edu.uci.ics.asterix.file.DatasetOperations;
 import edu.uci.ics.asterix.file.DataverseOperations;
 import edu.uci.ics.asterix.file.FeedOperations;
@@ -74,16 +75,18 @@
 import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
 import edu.uci.ics.asterix.metadata.api.IMetadataEntity;
 import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
-import edu.uci.ics.asterix.metadata.declared.FeedJobEventListenerFactory;
 import edu.uci.ics.asterix.metadata.entities.Dataset;
 import edu.uci.ics.asterix.metadata.entities.Datatype;
 import edu.uci.ics.asterix.metadata.entities.Dataverse;
 import edu.uci.ics.asterix.metadata.entities.ExternalDatasetDetails;
+import edu.uci.ics.asterix.metadata.entities.FeedActivity;
+import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityType;
 import edu.uci.ics.asterix.metadata.entities.FeedDatasetDetails;
 import edu.uci.ics.asterix.metadata.entities.Function;
 import edu.uci.ics.asterix.metadata.entities.Index;
 import edu.uci.ics.asterix.metadata.entities.InternalDatasetDetails;
 import edu.uci.ics.asterix.metadata.entities.NodeGroup;
+import edu.uci.ics.asterix.metadata.entities.FeedDatasetDetails.FeedState;
 import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.asterix.om.types.ATypeTag;
 import edu.uci.ics.asterix.om.types.IAType;
@@ -1347,9 +1350,8 @@
             CompiledBeginFeedStatement cbfs = new CompiledBeginFeedStatement(dataverseName, bfs.getDatasetName()
                     .getValue(), bfs.getQuery(), bfs.getVarCounter());
 
-            Dataset dataset;
-            dataset = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName, bfs
-                    .getDatasetName().getValue());
+            Dataset dataset = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(),
+                    dataverseName, bfs.getDatasetName().getValue());
             if (dataset == null) {
                 throw new AsterixException("Unknown dataset :" + bfs.getDatasetName().getValue());
             }
@@ -1361,21 +1363,16 @@
             bfs.initialize(metadataProvider.getMetadataTxnContext(), dataset);
             cbfs.setQuery(bfs.getQuery());
             JobSpecification compiled = rewriteCompileQuery(metadataProvider, bfs.getQuery(), cbfs);
-            compiled.setJobletEventListenerFactory(new FeedJobEventListenerFactory(dataset, compiled, hcc));
-
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
-            JobId jobId = null;
-            if (compiled != null) {
-                jobId = runJob(hcc, compiled, false);
-            }
+            runJob(hcc, compiled, false);
         } catch (Exception e) {
             if (bActiveTxn) {
                 abort(e, e, mdTxnCtx);
             }
             throw new AlgebricksException(e);
         } finally {
-            releaseReadLatch();
+            releaseWriteLatch();
         }
     }
 
@@ -1390,9 +1387,29 @@
             ControlFeedStatement cfs = (ControlFeedStatement) stmt;
             String dataverseName = cfs.getDataverseName() == null ? activeDefaultDataverse == null ? null
                     : activeDefaultDataverse.getDataverseName() : cfs.getDatasetName().getValue();
+            String datasetName = cfs.getDatasetName().getValue();
+            Dataset dataset = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(),
+                    dataverseName, cfs.getDatasetName().getValue());
+            if (dataset == null) {
+                throw new AsterixException("Unknown dataset :" + cfs.getDatasetName().getValue() + " in dataverse "
+                        + dataverseName);
+            }
+            if (!dataset.getDatasetType().equals(DatasetType.FEED)) {
+                throw new AsterixException("Statement not applicable. Dataset " + cfs.getDatasetName().getValue()
+                        + " is not a " + DatasetType.FEED);
+            }
+
+            FeedActivity feedActivity = MetadataManager.INSTANCE.getRecentFeedActivity(mdTxnCtx, dataverseName,
+                    datasetName);
+
+            if (feedActivity == null || !FeedActivityType.FEED_BEGIN.equals(feedActivity.getFeedActivityType())) {
+                throw new AsterixException("Invalid operation. The feed is currently not " + FeedState.ACTIVE);
+            }
+
             CompiledControlFeedStatement clcfs = new CompiledControlFeedStatement(cfs.getOperationType(),
                     dataverseName, cfs.getDatasetName().getValue(), cfs.getAlterAdapterConfParams());
-            JobSpecification jobSpec = FeedOperations.buildControlFeedJobSpec(clcfs, metadataProvider);
+
+            JobSpecification jobSpec = FeedOperations.buildControlFeedJobSpec(clcfs, metadataProvider, feedActivity);
 
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedOperations.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedOperations.java
index f9bd2d5..ce34076 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedOperations.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedOperations.java
@@ -14,7 +14,6 @@
  */
 package edu.uci.ics.asterix.file;
 
-import java.io.File;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.logging.Logger;
@@ -27,6 +26,7 @@
 import edu.uci.ics.asterix.external.feed.lifecycle.IFeedMessage.MessageType;
 import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
 import edu.uci.ics.asterix.metadata.entities.Dataset;
+import edu.uci.ics.asterix.metadata.entities.FeedActivity;
 import edu.uci.ics.asterix.metadata.entities.FeedDatasetDetails;
 import edu.uci.ics.asterix.translator.CompiledStatements.CompiledControlFeedStatement;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
@@ -58,11 +58,12 @@
      * @throws AlgebricksException
      */
     public static JobSpecification buildControlFeedJobSpec(CompiledControlFeedStatement controlFeedStatement,
-            AqlMetadataProvider metadataProvider) throws AsterixException, AlgebricksException {
+            AqlMetadataProvider metadataProvider, FeedActivity feedActivity) throws AsterixException,
+            AlgebricksException {
         switch (controlFeedStatement.getOperationType()) {
             case ALTER:
             case END: {
-                return createSendMessageToFeedJobSpec(controlFeedStatement, metadataProvider);
+                return createSendMessageToFeedJobSpec(controlFeedStatement, metadataProvider, feedActivity);
             }
             default: {
                 throw new AsterixException("Unknown Operation Type: " + controlFeedStatement.getOperationType());
@@ -72,14 +73,10 @@
     }
 
     private static JobSpecification createSendMessageToFeedJobSpec(CompiledControlFeedStatement controlFeedStatement,
-            AqlMetadataProvider metadataProvider) throws AsterixException {
+            AqlMetadataProvider metadataProvider, FeedActivity feedActivity) throws AsterixException {
         String dataverseName = controlFeedStatement.getDataverseName() == null ? metadataProvider
                 .getDefaultDataverseName() : controlFeedStatement.getDataverseName();
         String datasetName = controlFeedStatement.getDatasetName();
-        String datasetPath = dataverseName + File.separator + datasetName;
-
-        LOGGER.info(" DATASETPATH: " + datasetPath);
-
         Dataset dataset;
         try {
             dataset = metadataProvider.findDataset(dataverseName, datasetName);
@@ -110,7 +107,7 @@
         try {
             Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> p = metadataProvider.buildFeedMessengerRuntime(
                     metadataProvider, spec, (FeedDatasetDetails) dataset.getDatasetDetails(), dataverseName,
-                    datasetName, feedMessages);
+                    datasetName, feedMessages, feedActivity);
             feedMessenger = p.first;
             messengerPc = p.second;
         } catch (AlgebricksException e) {
@@ -121,9 +118,7 @@
 
         NullSinkOperatorDescriptor nullSink = new NullSinkOperatorDescriptor(spec);
         AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, nullSink, messengerPc);
-
         spec.connect(new OneToOneConnectorDescriptor(spec), feedMessenger, 0, nullSink, 0);
-
         spec.addRoot(nullSink);
         return spec;
 
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
index 01cbca0..0faf95e 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
@@ -16,9 +16,11 @@
 import edu.uci.ics.asterix.common.api.AsterixAppContextInfo;
 import edu.uci.ics.asterix.common.config.AsterixExternalProperties;
 import edu.uci.ics.asterix.common.config.AsterixMetadataProperties;
+import edu.uci.ics.asterix.external.feed.lifecycle.FeedActivityIdFactory;
 import edu.uci.ics.asterix.metadata.MetadataManager;
 import edu.uci.ics.asterix.metadata.api.IAsterixStateProxy;
 import edu.uci.ics.asterix.metadata.bootstrap.AsterixStateProxy;
+import edu.uci.ics.asterix.metadata.declared.FeedJobLifecycleListener;
 import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
 import edu.uci.ics.hyracks.api.application.ICCApplicationEntryPoint;
 import edu.uci.ics.hyracks.api.client.HyracksConnection;
@@ -41,7 +43,7 @@
             LOGGER.info("Starting Asterix cluster controller");
         }
 
-        AsterixAppContextInfo.initialize(appCtx);
+        AsterixAppContextInfo.initialize(appCtx, getNewHyracksClientConnection());
 
         proxy = AsterixStateProxy.registerRemoteObject();
         appCtx.setDistributedState(proxy);
@@ -49,6 +51,9 @@
         AsterixMetadataProperties metadataProperties = AsterixAppContextInfo.getInstance().getMetadataProperties();
         MetadataManager.INSTANCE = new MetadataManager(proxy, metadataProperties);
 
+        AsterixAppContextInfo.getInstance().getCCApplicationContext()
+                .addJobLifecycleListener(FeedJobLifecycleListener.INSTANCE);
+
         AsterixExternalProperties externalProperties = AsterixAppContextInfo.getInstance().getExternalProperties();
         setupWebServer(externalProperties);
         webServer.start();
diff --git a/asterix-common/pom.xml b/asterix-common/pom.xml
index 1a0e782..f37a662 100644
--- a/asterix-common/pom.xml
+++ b/asterix-common/pom.xml
@@ -110,6 +110,14 @@
 			<artifactId>hyracks-storage-am-lsm-common</artifactId>
 		</dependency>
 		<dependency>
+			<groupId>edu.uci.ics.hyracks</groupId>
+			<artifactId>hyracks-storage-am-common</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>edu.uci.ics.hyracks</groupId>
+			<artifactId>hyracks-api</artifactId>
+		</dependency>
+		<dependency>
 			<groupId>edu.uci.ics.asterix</groupId>
 			<artifactId>asterix-test-framework</artifactId>
 			<version>0.0.6-SNAPSHOT</version>
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/AsterixAppContextInfo.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/AsterixAppContextInfo.java
index 06a1c1f..1c92108 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/AsterixAppContextInfo.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/AsterixAppContextInfo.java
@@ -26,10 +26,10 @@
 import edu.uci.ics.asterix.common.context.AsterixRuntimeComponentsProvider;
 import edu.uci.ics.asterix.common.dataflow.IAsterixApplicationContextInfo;
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
 import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
-
+import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
 /*
  * Acts as an holder class for IndexRegistryProvider, AsterixStorageManager
  * instances that are accessed from the NCs. In addition an instance of ICCApplicationContext 
@@ -46,8 +46,9 @@
     private AsterixMetadataProperties metadataProperties;
     private AsterixStorageProperties storageProperties;
     private AsterixTransactionProperties txnProperties;
+    private IHyracksClientConnection hcc;
 
-    public static void initialize(ICCApplicationContext ccAppCtx) throws AsterixException {
+    public static void initialize(ICCApplicationContext ccAppCtx, IHyracksClientConnection hcc) throws AsterixException {
         if (INSTANCE == null) {
             INSTANCE = new AsterixAppContextInfo(ccAppCtx);
         }
@@ -57,7 +58,7 @@
         INSTANCE.metadataProperties = new AsterixMetadataProperties(propertiesAccessor);
         INSTANCE.storageProperties = new AsterixStorageProperties(propertiesAccessor);
         INSTANCE.txnProperties = new AsterixTransactionProperties(propertiesAccessor);
-
+        INSTANCE.hcc = hcc;
         Logger.getLogger("edu.uci.ics").setLevel(INSTANCE.externalProperties.getLogLevel());
     }
 
@@ -108,4 +109,8 @@
     public AsterixExternalProperties getExternalProperties() {
         return externalProperties;
     }
+
+    public IHyracksClientConnection getHcc() {
+        return hcc;
+    }
 }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedIntakeOperatorDescriptor.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedIntakeOperatorDescriptor.java
index 2da4e76..29d0c94 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedIntakeOperatorDescriptor.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedIntakeOperatorDescriptor.java
@@ -83,4 +83,8 @@
 		}
 		return new FeedIntakeOperatorNodePushable(feedId, adapter, partition);
 	}
+
+	public FeedId getFeedId() {
+		return feedId;
+	}
 }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedIntakeOperatorNodePushable.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedIntakeOperatorNodePushable.java
index d0dbb98..4d10a81 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedIntakeOperatorNodePushable.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedIntakeOperatorNodePushable.java
@@ -30,95 +30,94 @@
 /**
  * The runtime for @see{FeedIntakeOperationDescriptor}
  */
-public class FeedIntakeOperatorNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
+public class FeedIntakeOperatorNodePushable extends
+		AbstractUnaryInputUnaryOutputOperatorNodePushable {
 
-    private final IDatasourceAdapter adapter;
-    private final int partition;
-    private final IFeedManager feedManager;
-    private final FeedId feedId;
-    private final LinkedBlockingQueue<IFeedMessage> inbox;
-    private FeedInboxMonitor feedInboxMonitor;
+	private final IDatasourceAdapter adapter;
+	private final int partition;
+	private final IFeedManager feedManager;
+	private final FeedId feedId;
+	private final LinkedBlockingQueue<IFeedMessage> inbox;
+	private FeedInboxMonitor feedInboxMonitor;
 
-    public FeedIntakeOperatorNodePushable(FeedId feedId, IDatasourceAdapter adapter, int partition) {
-        this.adapter = adapter;
-        this.partition = partition;
-        this.feedManager = (IFeedManager) FeedManager.INSTANCE;
-        this.feedId = feedId;
-        inbox = new LinkedBlockingQueue<IFeedMessage>();
-    }
+	public FeedIntakeOperatorNodePushable(FeedId feedId,
+			IDatasourceAdapter adapter, int partition) {
+		this.adapter = adapter;
+		this.partition = partition;
+		this.feedManager = (IFeedManager) FeedManager.INSTANCE;
+		this.feedId = feedId;
+		inbox = new LinkedBlockingQueue<IFeedMessage>();
+	}
 
-    @Override
-    public void open() throws HyracksDataException {
-        if (adapter instanceof IManagedFeedAdapter) {
-            feedInboxMonitor = new FeedInboxMonitor((IManagedFeedAdapter) adapter, inbox, partition);
-            feedInboxMonitor.start();
-            feedManager.registerFeedMsgQueue(feedId, inbox);
-        }
-        writer.open();
-        try {
-            adapter.start(partition, writer);
-        } catch (Exception e) {
-            e.printStackTrace();
-            throw new HyracksDataException(e);
-            /* 
-             we do not throw an exception, but allow the operator to close
-             gracefully throwing an exception here would result in a job abort and a
-             transaction roll back that undoes all the work done so far.
-             */
+	@Override
+	public void open() throws HyracksDataException {
+		if (adapter instanceof IManagedFeedAdapter) {
+			feedInboxMonitor = new FeedInboxMonitor(
+					(IManagedFeedAdapter) adapter, inbox, partition);
+			feedInboxMonitor.start();
+			feedManager.registerFeedMsgQueue(feedId, inbox);
+		}
+		writer.open();
+		try {
+			adapter.start(partition, writer);
+		} catch (Exception e) {
+			e.printStackTrace();
+			throw new HyracksDataException(e);
+		} finally {
+			writer.close();
+			if (adapter instanceof IManagedFeedAdapter) {
+				feedManager.unregisterFeedMsgQueue(feedId, inbox);
+			}
+		}
+	}
 
-        } finally {
-            writer.close();
-            if (adapter instanceof IManagedFeedAdapter) {
-                feedManager.unregisterFeedMsgQueue(feedId, inbox);
-            }
-        }
-    }
+	@Override
+	public void fail() throws HyracksDataException {
+		writer.close();
+	}
 
-    @Override
-    public void fail() throws HyracksDataException {
-        writer.close();
-    }
+	@Override
+	public void close() throws HyracksDataException {
 
-    @Override
-    public void close() throws HyracksDataException {
-        
-    }
+	}
 
-    @Override
-    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-        // do nothing
-    }
+	@Override
+	public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+		// do nothing
+	}
 }
 
 class FeedInboxMonitor extends Thread {
 
-    private LinkedBlockingQueue<IFeedMessage> inbox;
-    private final IManagedFeedAdapter adapter;
+	private LinkedBlockingQueue<IFeedMessage> inbox;
+	private final IManagedFeedAdapter adapter;
 
-    public FeedInboxMonitor(IManagedFeedAdapter adapter, LinkedBlockingQueue<IFeedMessage> inbox, int partition) {
-        this.inbox = inbox;
-        this.adapter = adapter;
-    }
+	public FeedInboxMonitor(IManagedFeedAdapter adapter,
+			LinkedBlockingQueue<IFeedMessage> inbox, int partition) {
+		this.inbox = inbox;
+		this.adapter = adapter;
+	}
 
-    @Override
-    public void run() {
-        while (true) {
-            try {
-                IFeedMessage feedMessage = inbox.take();
-                switch (feedMessage.getMessageType()) {
-                    case STOP:
-                        adapter.stop();
-                        break;
-                    case ALTER:
-                        adapter.alter(((AlterFeedMessage) feedMessage).getAlteredConfParams());
-                        break;
-                }
-            } catch (InterruptedException ie) {
-                break;
-            } catch (Exception e) {
-                throw new RuntimeException(e);
-            }
-        }
-    }
+	@Override
+	public void run() {
+		while (true) {
+			try {
+				IFeedMessage feedMessage = inbox.take();
+				switch (feedMessage.getMessageType()) {
+				case STOP:
+					adapter.stop();
+					break;
+				case ALTER:
+					adapter.alter(((AlterFeedMessage) feedMessage)
+							.getAlteredConfParams());
+					break;
+				}
+			} catch (InterruptedException ie) {
+				break;
+			} catch (Exception e) {
+				throw new RuntimeException(e);
+			}
+		}
+	}
 
 }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/IFeedClientFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/IFeedClientFactory.java
new file mode 100644
index 0000000..4fb14ff
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/IFeedClientFactory.java
@@ -0,0 +1,22 @@
+package edu.uci.ics.asterix.external.dataset.adapter;
+
+import java.util.Map;
+
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+
+public interface IFeedClientFactory {
+
+    public IPullBasedFeedClient createFeedClient(IHyracksTaskContext ctx, Map<String, String> configuration)
+            throws Exception;
+
+    public ARecordType getRecordType() throws AsterixException;
+
+    public FeedClientType getFeedClientType();
+
+    public enum FeedClientType {
+        GENERIC,
+        TYPED
+    }
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/IPullBasedFeedClient.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/IPullBasedFeedClient.java
index a1eb075..26e4439 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/IPullBasedFeedClient.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/IPullBasedFeedClient.java
@@ -1,11 +1,18 @@
 package edu.uci.ics.asterix.external.dataset.adapter;
 
 import java.io.DataOutput;
+import java.util.Map;
 
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
 
 public interface IPullBasedFeedClient {
 
+    public enum InflowState {
+        NO_MORE_DATA,
+        DATA_AVAILABLE,
+        DATA_NOT_AVAILABLE
+    }
+
     /**
      * Writes the next fetched tuple into the provided instance of DatatOutput.
      * 
@@ -15,7 +22,7 @@
      *         false if no record was written to the DataOutput instance indicating non-availability of new data.
      * @throws AsterixException
      */
-    public boolean nextTuple(DataOutput dataOutput) throws AsterixException;
+    public InflowState nextTuple(DataOutput dataOutput) throws AsterixException;
 
     /**
      * Provides logic for any corrective action that feed client needs to execute on
@@ -28,10 +35,8 @@
     public void resetOnFailure(Exception e) throws AsterixException;
 
     /**
-     * Terminates a feed, that is data ingestion activity ceases.
-     * 
-     * @throws Exception
+     * @param configuration
      */
-    public void stop() throws Exception;
+    public boolean alter(Map<String, String> configuration);
 
 }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAdapter.java
index 38686c2..97f9a80 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAdapter.java
@@ -15,8 +15,11 @@
 package edu.uci.ics.asterix.external.dataset.adapter;
 
 import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.logging.Logger;
 
-import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.external.dataset.adapter.IPullBasedFeedClient.InflowState;
+import edu.uci.ics.asterix.feed.managed.adapter.IManagedFeedAdapter;
 import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -29,18 +32,27 @@
  * Captures the common logic for obtaining bytes from an external source
  * and packing them into frames as tuples.
  */
-public abstract class PullBasedAdapter extends AbstractDatasourceAdapter implements ITypedDatasourceAdapter {
+public abstract class PullBasedAdapter extends AbstractDatasourceAdapter implements ITypedDatasourceAdapter,
+        IManagedFeedAdapter {
 
     private static final long serialVersionUID = 1L;
+    private static final Logger LOGGER = Logger.getLogger(PullBasedAdapter.class.getName());
 
     protected ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(1);
     protected IPullBasedFeedClient pullBasedFeedClient;
     protected ARecordType adapterOutputType;
     private FrameTupleAppender appender;
     private ByteBuffer frame;
+    protected boolean continueIngestion = true;
+    protected boolean alterRequested = false;
+    private Map<String, String> modifiedConfiguration = null;
 
     public abstract IPullBasedFeedClient getFeedClient(int partition) throws Exception;
 
+    public void alter(Map<String, String> modifedConfiguration) {
+        this.modifiedConfiguration = modifedConfiguration;
+    }
+
     @Override
     public void start(int partition, IFrameWriter writer) throws Exception {
         appender = new FrameTupleAppender(ctx.getFrameSize());
@@ -48,21 +60,35 @@
         appender.reset(frame, true);
 
         pullBasedFeedClient = getFeedClient(partition);
-        boolean moreData = false;
-        while (true) {
+        InflowState inflowState = null;
+        while (continueIngestion) {
             tupleBuilder.reset();
             try {
-                moreData = pullBasedFeedClient.nextTuple(tupleBuilder.getDataOutput());
-                if (moreData) {
-                    tupleBuilder.addFieldEndOffset();
-                    appendTupleToFrame(writer);
-                } else {
-                    FrameUtils.flushFrame(frame, writer);
-                    break;
+                inflowState = pullBasedFeedClient.nextTuple(tupleBuilder.getDataOutput());
+                switch (inflowState) {
+                    case DATA_AVAILABLE:
+                        tupleBuilder.addFieldEndOffset();
+                        appendTupleToFrame(writer);
+                        break;
+                    case NO_MORE_DATA:
+                        LOGGER.info("REACHED END OF FEED");
+                        FrameUtils.flushFrame(frame, writer);
+                        continueIngestion = false;
+                        break;
+                    case DATA_NOT_AVAILABLE:
+                        break;
+                }
+                if (alterRequested) {
+                    boolean success = pullBasedFeedClient.alter(modifiedConfiguration);
+                    if (success) {
+                        configuration = modifiedConfiguration;
+                        modifiedConfiguration = null;
+                    }
                 }
             } catch (Exception failureException) {
                 try {
                     pullBasedFeedClient.resetOnFailure(failureException);
+                    tupleBuilder.reset();
                     continue;
                 } catch (Exception recoveryException) {
                     throw new Exception(recoveryException);
@@ -71,16 +97,6 @@
         }
     }
 
-    /**
-     * Allows an adapter to handle a runtime exception.
-     * @param e exception encountered during runtime
-     * @throws AsterixException
-     */
-    public void resetOnFailure(Exception e) throws AsterixException {
-        pullBasedFeedClient.resetOnFailure(e);
-        tupleBuilder.reset();
-    }
-
     private void appendTupleToFrame(IFrameWriter writer) throws HyracksDataException {
         if (!appender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0, tupleBuilder.getSize())) {
             FrameUtils.flushFrame(frame, writer);
@@ -97,4 +113,13 @@
         return adapterOutputType;
     }
 
+    /**
+     * Discontinue the ingestion of data and end the feed.
+     * 
+     * @throws Exception
+     */
+    public void stop() {
+        continueIngestion = false;
+    }
+
 }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedFeedClient.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedFeedClient.java
index 17ecd86..a0c613c 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedFeedClient.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedFeedClient.java
@@ -17,46 +17,142 @@
 import java.io.DataOutput;
 import java.io.IOException;
 
+import edu.uci.ics.asterix.builders.IARecordBuilder;
+import edu.uci.ics.asterix.builders.RecordBuilder;
+import edu.uci.ics.asterix.builders.UnorderedListBuilder;
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
 import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ARecordSerializerDeserializer;
+import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import edu.uci.ics.asterix.om.base.ABoolean;
+import edu.uci.ics.asterix.om.base.AInt32;
+import edu.uci.ics.asterix.om.base.AMutableDateTime;
+import edu.uci.ics.asterix.om.base.AMutableInt32;
+import edu.uci.ics.asterix.om.base.AMutablePoint;
 import edu.uci.ics.asterix.om.base.AMutableRecord;
+import edu.uci.ics.asterix.om.base.AMutableString;
+import edu.uci.ics.asterix.om.base.AMutableUnorderedList;
+import edu.uci.ics.asterix.om.base.AString;
+import edu.uci.ics.asterix.om.base.IACursor;
+import edu.uci.ics.asterix.om.base.IAObject;
+import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.AUnorderedListType;
+import edu.uci.ics.asterix.om.types.BuiltinType;
 import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
 
 public abstract class PullBasedFeedClient implements IPullBasedFeedClient {
 
     protected ARecordSerializerDeserializer recordSerDe;
     protected AMutableRecord mutableRecord;
     protected boolean messageReceived;
-    protected boolean continueIngestion=true;
+    protected boolean continueIngestion = true;
+    protected IARecordBuilder recordBuilder = new RecordBuilder();
 
-    public abstract boolean setNextRecord() throws Exception;
+    protected AMutableString aString = new AMutableString("");
+    protected AMutableInt32 aInt32 = new AMutableInt32(0);
+    protected AMutablePoint aPoint = new AMutablePoint(0, 0);
+    protected AMutableDateTime aDateTime = new AMutableDateTime(0);
+
+    @SuppressWarnings("unchecked")
+    protected ISerializerDeserializer<AString> stringSerde = AqlSerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(BuiltinType.ASTRING);
+    @SuppressWarnings("unchecked")
+    protected ISerializerDeserializer<ABoolean> booleanSerde = AqlSerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(BuiltinType.ABOOLEAN);
+    @SuppressWarnings("unchecked")
+    protected ISerializerDeserializer<AInt32> int32Serde = AqlSerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(BuiltinType.AINT32);
+
+    public abstract InflowState setNextRecord() throws Exception;
 
     @Override
-    public boolean nextTuple(DataOutput dataOutput) throws AsterixException {
+    public InflowState nextTuple(DataOutput dataOutput) throws AsterixException {
         try {
-            boolean newData = setNextRecord();
-            if (newData && continueIngestion) {
-                IAType t = mutableRecord.getType();
-                ATypeTag tag = t.getTypeTag();
-                try {
-                    dataOutput.writeByte(tag.serialize());
-                } catch (IOException e) {
-                    throw new HyracksDataException(e);
-                }
-                recordSerDe.serialize(mutableRecord, dataOutput);
-                return true;
+            InflowState state = setNextRecord();
+            boolean first = true;
+            switch (state) {
+                case DATA_AVAILABLE:
+                    IAType t = mutableRecord.getType();
+                    ATypeTag tag = t.getTypeTag();
+                    try {
+                        dataOutput.writeByte(tag.serialize());
+                    } catch (IOException e) {
+                        throw new HyracksDataException(e);
+                    }
+                    if (first) {
+                        recordBuilder.reset(mutableRecord.getType());
+                        first = false;
+                    }
+                    recordBuilder.init();
+                    writeRecord(mutableRecord, dataOutput, recordBuilder);
+                    break;
+
+                case DATA_NOT_AVAILABLE:
+                    break;
+                case NO_MORE_DATA:
+                    break;
             }
-            return false;
+            return state;
         } catch (Exception e) {
             throw new AsterixException(e);
         }
 
     }
 
-    @Override
-    public void stop() {
-        continueIngestion = false;
+    @SuppressWarnings("unchecked")
+    private void writeRecord(AMutableRecord record, DataOutput dataOutput, IARecordBuilder recordBuilder)
+            throws IOException, AsterixException {
+        ArrayBackedValueStorage fieldValue = new ArrayBackedValueStorage();
+        int numFields = record.getType().getFieldNames().length;
+        for (int pos = 0; pos < numFields; pos++) {
+            fieldValue.reset();
+            IAObject obj = record.getValueByPos(pos);
+            writeObject(obj, fieldValue.getDataOutput());
+            recordBuilder.addField(pos, fieldValue);
+        }
+        recordBuilder.write(dataOutput, false);
+    }
+
+    private void writeObject(IAObject obj, DataOutput dataOutput) throws IOException, AsterixException {
+        switch (obj.getType().getTypeTag()) {
+            case RECORD:
+                ATypeTag tag = obj.getType().getTypeTag();
+                try {
+                    dataOutput.writeByte(tag.serialize());
+                } catch (IOException e) {
+                    throw new HyracksDataException(e);
+                }
+                IARecordBuilder recordBuilder = new RecordBuilder();
+                recordBuilder.reset((ARecordType) obj.getType());
+                recordBuilder.init();
+                writeRecord((AMutableRecord) obj, dataOutput, recordBuilder);
+                break;
+            case UNORDEREDLIST:
+                tag = obj.getType().getTypeTag();
+                try {
+                    dataOutput.writeByte(tag.serialize());
+                } catch (IOException e) {
+                    throw new HyracksDataException(e);
+                }
+                UnorderedListBuilder listBuilder = new UnorderedListBuilder();
+                listBuilder.reset((AUnorderedListType) ((AMutableUnorderedList) obj).getType());
+                IACursor cursor = ((AMutableUnorderedList) obj).getCursor();
+                ArrayBackedValueStorage listItemValue = new ArrayBackedValueStorage();
+                while (cursor.next()) {
+                    listItemValue.reset();
+                    IAObject item = cursor.get();
+                    writeObject(item, listItemValue.getDataOutput());
+                    listBuilder.addItem(listItemValue);
+                }
+                listBuilder.write(dataOutput, false);
+                break;
+            default:
+                AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(obj.getType()).serialize(obj,
+                        dataOutput);
+                break;
+        }
     }
 }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java
index ebfbcad..ef69018 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java
@@ -14,7 +14,6 @@
  */
 package edu.uci.ics.asterix.external.dataset.adapter;
 
-import java.util.HashMap;
 import java.util.Map;
 
 import edu.uci.ics.asterix.feed.managed.adapter.IManagedFeedAdapter;
@@ -31,14 +30,11 @@
  */
 public class PullBasedTwitterAdapter extends PullBasedAdapter implements IManagedFeedAdapter {
 
-   
     private static final long serialVersionUID = 1L;
-    
+
     public static final String QUERY = "query";
     public static final String INTERVAL = "interval";
 
-    private boolean alterRequested = false;
-    private Map<String, String> alteredParams = new HashMap<String, String>();
     private ARecordType recordType;
 
     private PullBasedTwitterFeedClient tweetClient;
@@ -69,30 +65,6 @@
     }
 
     @Override
-    public void stop()  {
-        tweetClient.stop();
-    }
-
-    @Override
-    public void alter(Map<String, String> properties)  {
-        alterRequested = true;
-        this.alteredParams = properties;
-    }
-
-    public boolean isAlterRequested() {
-        return alterRequested;
-    }
-
-    public Map<String, String> getAlteredParams() {
-        return alteredParams;
-    }
-
-    public void postAlteration() {
-        alteredParams = null;
-        alterRequested = false;
-    }
-
-    @Override
     public ARecordType getAdapterOutputType() {
         return recordType;
     }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java
index 2a07472..41dadfa 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java
@@ -88,11 +88,11 @@
     }
 
     @Override
-    public boolean setNextRecord() throws Exception {
+    public InflowState setNextRecord() throws Exception {
         Tweet tweet;
         tweet = getNextTweet();
         if (tweet == null) {
-            return false;
+            return InflowState.DATA_NOT_AVAILABLE;
         }
         int numFields = recordType.getFieldNames().length;
 
@@ -106,7 +106,7 @@
             mutableRecord.setValueAtPos(i, mutableFields[i]);
         }
         id++;
-        return true;
+        return InflowState.DATA_AVAILABLE;
     }
 
     @Override
@@ -114,4 +114,10 @@
         // TOOO: implement resetting logic for Twitter
     }
 
+    @Override
+    public boolean alter(Map<String, String> configuration) {
+        // TODO Auto-generated method stub
+        return false;
+    }
+
 }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedClient.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedClient.java
index 366b4af..21bef4a 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedClient.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedClient.java
@@ -18,6 +18,7 @@
 import java.net.URL;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.Queue;
 
 import com.sun.syndication.feed.synd.SyndEntryImpl;
@@ -81,10 +82,10 @@
     }
 
     @Override
-    public boolean setNextRecord() throws Exception {
+    public InflowState setNextRecord() throws Exception {
         SyndEntryImpl feedEntry = getNextRSSFeed();
         if (feedEntry == null) {
-            return false;
+            return InflowState.DATA_NOT_AVAILABLE;
         }
         tupleFieldValues[0] = idPrefix + ":" + id;
         tupleFieldValues[1] = feedEntry.getTitle();
@@ -96,7 +97,7 @@
             mutableRecord.setValueAtPos(i, mutableFields[i]);
         }
         id++;
-        return true;
+        return InflowState.DATA_AVAILABLE;
     }
 
     private SyndEntryImpl getNextRSSFeed() throws Exception {
@@ -138,6 +139,12 @@
 
     }
 
+    @Override
+    public boolean alter(Map<String, String> configuration) {
+        // TODO Auto-generated method stub
+        return false;
+    }
+
 }
 
 class FetcherEventListenerImpl implements FetcherListener {
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/feed/lifecycle/FeedActivityIdFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/feed/lifecycle/FeedActivityIdFactory.java
new file mode 100644
index 0000000..b90f897
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/feed/lifecycle/FeedActivityIdFactory.java
@@ -0,0 +1,25 @@
+package edu.uci.ics.asterix.external.feed.lifecycle;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class FeedActivityIdFactory {
+	private static AtomicInteger id = new AtomicInteger();
+	private static boolean isInitialized = false;
+
+	public static boolean isInitialized() {
+		return isInitialized;
+	}
+
+	public static void initialize(int initialId) {
+		id.set(initialId);
+		isInitialized = true;
+	}
+
+	public static int generateFeedActivityId() {
+		return id.incrementAndGet();
+	}
+
+	public static int getMostRecentFeedActivityId() {
+		return id.get();
+	}
+}
\ No newline at end of file
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java
index 09ec2f7..73df2a8 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java
@@ -22,6 +22,7 @@
 
 import edu.uci.ics.asterix.common.config.AsterixMetadataProperties;
 import edu.uci.ics.asterix.common.functions.FunctionSignature;
+import edu.uci.ics.asterix.external.feed.lifecycle.FeedId;
 import edu.uci.ics.asterix.metadata.api.IAsterixStateProxy;
 import edu.uci.ics.asterix.metadata.api.IMetadataManager;
 import edu.uci.ics.asterix.metadata.api.IMetadataNode;
@@ -29,6 +30,7 @@
 import edu.uci.ics.asterix.metadata.entities.DatasourceAdapter;
 import edu.uci.ics.asterix.metadata.entities.Datatype;
 import edu.uci.ics.asterix.metadata.entities.Dataverse;
+import edu.uci.ics.asterix.metadata.entities.FeedActivity;
 import edu.uci.ics.asterix.metadata.entities.Function;
 import edu.uci.ics.asterix.metadata.entities.Index;
 import edu.uci.ics.asterix.metadata.entities.Node;
@@ -36,6 +38,7 @@
 import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
 import edu.uci.ics.asterix.transaction.management.service.transaction.JobId;
 import edu.uci.ics.asterix.transaction.management.service.transaction.JobIdFactory;
+import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
 
 /**
  * Provides access to Asterix metadata via remote methods to the metadata node.
@@ -86,6 +89,7 @@
     private IMetadataNode metadataNode;
     private final ReadWriteLock metadataLatch;
     private final AsterixMetadataProperties metadataProperties;
+    private IHyracksClientConnection hcc;
 
     public MetadataManager(IAsterixStateProxy proxy, AsterixMetadataProperties metadataProperties) {
         if (proxy == null) {
@@ -594,6 +598,29 @@
     }
 
     @Override
+    public void registerFeedActivity(MetadataTransactionContext ctx, FeedId feedId, FeedActivity feedActivity)
+            throws MetadataException {
+        try {
+            metadataNode.registerFeedActivity(ctx.getJobId(), feedId, feedActivity);
+        } catch (RemoteException e) {
+            throw new MetadataException(e);
+        }
+    }
+
+    @Override
+    public FeedActivity getRecentFeedActivity(MetadataTransactionContext ctx, String dataverseName, String datasetName)
+            throws MetadataException {
+
+        FeedActivity feedActivity = null;
+        try {
+            feedActivity = metadataNode.getRecentFeedActivity(ctx.getJobId(), new FeedId(dataverseName, datasetName));
+        } catch (RemoteException e) {
+            throw new MetadataException(e);
+        }
+        return feedActivity;
+    }
+
+    @Override
     public void acquireWriteLatch() {
         metadataLatch.writeLock().lock();
     }
@@ -612,4 +639,5 @@
     public void releaseReadLatch() {
         metadataLatch.readLock().unlock();
     }
+
 }
\ No newline at end of file
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
index 5bdf086..d363d01 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
@@ -17,6 +17,7 @@
 
 import java.rmi.RemoteException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
 import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
@@ -24,6 +25,8 @@
 import edu.uci.ics.asterix.common.context.AsterixAppRuntimeContext;
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
 import edu.uci.ics.asterix.common.functions.FunctionSignature;
+import edu.uci.ics.asterix.external.feed.lifecycle.FeedActivityIdFactory;
+import edu.uci.ics.asterix.external.feed.lifecycle.FeedId;
 import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
 import edu.uci.ics.asterix.metadata.api.IMetadataIndex;
 import edu.uci.ics.asterix.metadata.api.IMetadataNode;
@@ -34,6 +37,7 @@
 import edu.uci.ics.asterix.metadata.entities.DatasourceAdapter;
 import edu.uci.ics.asterix.metadata.entities.Datatype;
 import edu.uci.ics.asterix.metadata.entities.Dataverse;
+import edu.uci.ics.asterix.metadata.entities.FeedActivity;
 import edu.uci.ics.asterix.metadata.entities.Function;
 import edu.uci.ics.asterix.metadata.entities.Index;
 import edu.uci.ics.asterix.metadata.entities.InternalDatasetDetails;
@@ -43,6 +47,7 @@
 import edu.uci.ics.asterix.metadata.entitytupletranslators.DatasourceAdapterTupleTranslator;
 import edu.uci.ics.asterix.metadata.entitytupletranslators.DatatypeTupleTranslator;
 import edu.uci.ics.asterix.metadata.entitytupletranslators.DataverseTupleTranslator;
+import edu.uci.ics.asterix.metadata.entitytupletranslators.FeedActivityTupleTranslator;
 import edu.uci.ics.asterix.metadata.entitytupletranslators.FunctionTupleTranslator;
 import edu.uci.ics.asterix.metadata.entitytupletranslators.IndexTupleTranslator;
 import edu.uci.ics.asterix.metadata.entitytupletranslators.NodeGroupTupleTranslator;
@@ -1126,4 +1131,62 @@
     public int getMostRecentDatasetId() throws MetadataException, RemoteException {
         return DatasetIdFactory.getMostRecentDatasetId();
     }
+
+    @Override
+    public void registerFeedActivity(JobId jobId, FeedId feedId, FeedActivity feedActivity) throws MetadataException,
+            RemoteException {
+        try {
+            if (!FeedActivityIdFactory.isInitialized()) {
+                initializeFeedActivityIdFactory(jobId);
+            }
+            feedActivity.setActivityId(FeedActivityIdFactory.generateFeedActivityId());
+            FeedActivityTupleTranslator tupleReaderWriter = new FeedActivityTupleTranslator(true);
+            ITupleReference tuple = tupleReaderWriter.getTupleFromMetadataEntity(feedActivity);
+            insertTupleIntoIndex(jobId, MetadataPrimaryIndexes.FEED_ACTIVITY_DATASET, tuple);
+        } catch (Exception e) {
+            throw new MetadataException(e);
+        }
+
+    }
+
+    @Override
+    public FeedActivity getRecentFeedActivity(JobId jobId, FeedId feedId) throws MetadataException, RemoteException {
+        try {
+            ITupleReference searchKey = createTuple(feedId.getDataverse(), feedId.getDataset());
+            FeedActivityTupleTranslator tupleReaderWriter = new FeedActivityTupleTranslator(false);
+            List<FeedActivity> results = new ArrayList<FeedActivity>();
+            IValueExtractor<FeedActivity> valueExtractor = new MetadataEntityValueExtractor<FeedActivity>(
+                    tupleReaderWriter);
+            searchIndex(jobId, MetadataPrimaryIndexes.FEED_ACTIVITY_DATASET, searchKey, valueExtractor, results);
+            if (!results.isEmpty()) {
+                Collections.sort(results);
+                return results.get(results.size() - 1);
+            }
+            return null;
+        } catch (Exception e) {
+            throw new MetadataException(e);
+        }
+    }
+
+    @Override
+    public void initializeFeedActivityIdFactory(JobId jobId) throws MetadataException, RemoteException {
+        try {
+            ITupleReference searchKey = createTuple();
+            FeedActivityTupleTranslator tupleReaderWriter = new FeedActivityTupleTranslator(true);
+            List<FeedActivity> results = new ArrayList<FeedActivity>();
+            IValueExtractor<FeedActivity> valueExtractor = new MetadataEntityValueExtractor<FeedActivity>(
+                    tupleReaderWriter);
+            searchIndex(jobId, MetadataPrimaryIndexes.FEED_ACTIVITY_DATASET, searchKey, valueExtractor, results);
+            int maxActivityId = 0;
+            for (FeedActivity fa : results) {
+                if (maxActivityId < fa.getActivityId()) {
+                    maxActivityId = fa.getActivityId();
+                }
+            }
+            FeedActivityIdFactory.initialize(maxActivityId);
+        } catch (Exception e) {
+            throw new MetadataException(e);
+        }
+
+    }
 }
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataManager.java
index f11144e..5eff5b3 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataManager.java
@@ -19,18 +19,20 @@
 import java.util.List;
 
 import edu.uci.ics.asterix.common.functions.FunctionSignature;
+import edu.uci.ics.asterix.external.feed.lifecycle.FeedId;
 import edu.uci.ics.asterix.metadata.MetadataException;
 import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
-import edu.uci.ics.asterix.metadata.entities.DatasourceAdapter;
 import edu.uci.ics.asterix.metadata.entities.Dataset;
+import edu.uci.ics.asterix.metadata.entities.DatasourceAdapter;
 import edu.uci.ics.asterix.metadata.entities.Datatype;
 import edu.uci.ics.asterix.metadata.entities.Dataverse;
+import edu.uci.ics.asterix.metadata.entities.FeedActivity;
 import edu.uci.ics.asterix.metadata.entities.Function;
 import edu.uci.ics.asterix.metadata.entities.Index;
 import edu.uci.ics.asterix.metadata.entities.Node;
 import edu.uci.ics.asterix.metadata.entities.NodeGroup;
 import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
-import edu.uci.ics.asterix.transaction.management.service.transaction.JobId;
+import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
 
 /**
  * A metadata manager provides user access to Asterix metadata (e.g., types,
@@ -128,7 +130,7 @@
      * @throws MetadataException
      */
     List<Dataverse> getDataverses(MetadataTransactionContext ctx) throws MetadataException;
-    
+
     /**
      * Retrieves a dataverse with given name.
      * 
@@ -179,7 +181,7 @@
      *             For example, if the dataset already exists.
      */
     public void addDataset(MetadataTransactionContext ctx, Dataset dataset) throws MetadataException;
-    
+
     /**
      * Retrieves a dataset within a given dataverse.
      * 
@@ -441,10 +443,29 @@
     public List<Function> getDataverseFunctions(MetadataTransactionContext ctx, String dataverseName)
             throws MetadataException;
 
+    /**
+     * @param ctx
+     * @param feedId
+     * @param feedActivity
+     * @throws MetadataException
+     */
+    public void registerFeedActivity(MetadataTransactionContext ctx, FeedId feedId, FeedActivity feedActivity)
+            throws MetadataException;
+
+    /**
+     * @param ctx
+     * @param dataverseName
+     * @param datasetName
+     * @return
+     * @throws MetadataException
+     */
+    public FeedActivity getRecentFeedActivity(MetadataTransactionContext ctx, String dataverseName, String datasetName)
+            throws MetadataException;
+
     public void initializeDatasetIdFactory(MetadataTransactionContext ctx) throws MetadataException;
-    
+
     public int getMostRecentDatasetId() throws MetadataException;
-    
+
     public void acquireWriteLatch();
 
     public void releaseWriteLatch();
@@ -453,5 +474,4 @@
 
     public void releaseReadLatch();
 
-
 }
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataNode.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataNode.java
index e0b5e96..e58a2f0 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataNode.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataNode.java
@@ -21,11 +21,13 @@
 import java.util.List;
 
 import edu.uci.ics.asterix.common.functions.FunctionSignature;
+import edu.uci.ics.asterix.external.feed.lifecycle.FeedId;
 import edu.uci.ics.asterix.metadata.MetadataException;
 import edu.uci.ics.asterix.metadata.entities.DatasourceAdapter;
 import edu.uci.ics.asterix.metadata.entities.Dataset;
 import edu.uci.ics.asterix.metadata.entities.Datatype;
 import edu.uci.ics.asterix.metadata.entities.Dataverse;
+import edu.uci.ics.asterix.metadata.entities.FeedActivity;
 import edu.uci.ics.asterix.metadata.entities.Function;
 import edu.uci.ics.asterix.metadata.entities.Index;
 import edu.uci.ics.asterix.metadata.entities.Node;
@@ -383,7 +385,6 @@
     public void addNode(JobId jobId, Node node) throws MetadataException, RemoteException;
 
     /**
-
      * @param jobId
      *            A globally unique id for an active metadata transaction.
      * @param functionSignature
@@ -408,7 +409,8 @@
      *             group to be deleted.
      * @throws RemoteException
      */
-    public void dropFunction(JobId jobId, FunctionSignature functionSignature) throws MetadataException, RemoteException;
+    public void dropFunction(JobId jobId, FunctionSignature functionSignature) throws MetadataException,
+            RemoteException;
 
     /**
      * @param jobId
@@ -442,8 +444,8 @@
     public List<DatasourceAdapter> getDataverseAdapters(JobId jobId, String dataverseName) throws MetadataException,
             RemoteException;
 
-    public DatasourceAdapter getAdapter(JobId jobId, String dataverseName, String adapterName) throws MetadataException,
-            RemoteException;
+    public DatasourceAdapter getAdapter(JobId jobId, String dataverseName, String adapterName)
+            throws MetadataException, RemoteException;
 
     /**
      * Deletes a adapter , acquiring local locks on behalf of the given
@@ -472,8 +474,34 @@
      */
     public void addAdapter(JobId jobId, DatasourceAdapter adapter) throws MetadataException, RemoteException;
 
+    /**
+     * @param jobId
+     * @param feedId
+     * @return
+     * @throws MetadataException
+     * @throws RemoteException
+     */
+    public FeedActivity getRecentFeedActivity(JobId jobId, FeedId feedId) throws MetadataException, RemoteException;
+
     public void initializeDatasetIdFactory(JobId jobId) throws MetadataException, RemoteException;
-    
+
+    /**
+     * @param jobId
+     * @throws MetadataException
+     * @throws RemoteException
+     */
+    public void initializeFeedActivityIdFactory(JobId jobId) throws MetadataException, RemoteException;
+
     public int getMostRecentDatasetId() throws MetadataException, RemoteException;
 
+    /**
+     * @param jobId
+     *            A globally unique id for an active metadata transaction.
+     * @param feedId
+     *            A unique id for the feed
+     * @param feedActivity
+     */
+    public void registerFeedActivity(JobId jobId, FeedId feedId, FeedActivity feedActivity) throws MetadataException,
+            RemoteException;
+
 }
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataPrimaryIndexes.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataPrimaryIndexes.java
index 4d2faf3..b222fb3 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataPrimaryIndexes.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataPrimaryIndexes.java
@@ -95,8 +95,9 @@
                 MetadataRecordTypes.DATASOURCE_ADAPTER_RECORDTYPE, DATASOURCE_ADAPTER_DATASET_ID, true, new int[] { 0,
                         1 });
 
-        FEED_ACTIVITY_DATASET = new MetadataIndex("FeedActivity", null, 3, new IAType[] { BuiltinType.ASTRING,
-                BuiltinType.ASTRING }, new String[] { "DataverseName", "DatasetName" }, 0,
-                MetadataRecordTypes.FEED_ACTIVITY_RECORDTYPE, FEED_ACTIVITY_DATASET_ID, true, new int[] { 0, 1 });
+        FEED_ACTIVITY_DATASET = new MetadataIndex("FeedActivity", null, 4, new IAType[] { BuiltinType.ASTRING,
+                BuiltinType.ASTRING, BuiltinType.AINT32 },
+                new String[] { "DataverseName", "DatasetName", "ActivityId" }, 0,
+                MetadataRecordTypes.FEED_ACTIVITY_RECORDTYPE, FEED_ACTIVITY_DATASET_ID, true, new int[] { 0, 1, 2 });
     }
 }
\ No newline at end of file
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataRecordTypes.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataRecordTypes.java
index 5b67102..992fbd3 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataRecordTypes.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataRecordTypes.java
@@ -367,17 +367,18 @@
     // FeedActivityRecordType.
     public static final int FEED_ACTIVITY_ARECORD_DATAVERSE_NAME_FIELD_INDEX = 0;
     public static final int FEED_ACTIVITY_ARECORD_DATASET_NAME_FIELD_INDEX = 1;
-    public static final int FEED_ACTIVITY_ARECORD_STATUS_FIELD_INDEX = 2;
-    public static final int FEED_ACTIVITY_ARECORD_INGEST_NODES_FIELD_INDEX = 3;
-    public static final int FEED_ACTIVITY_ARECORD_COMPUTE_NODES_FIELD_INDEX = 4;
-    public static final int FEED_ACTIVITY_ARECORD_LAST_UPDATE_TIMESTAMP_FIELD_INDEX = 5;
+    public static final int FEED_ACTIVITY_ARECORD_ACTIVITY_ID_FIELD_INDEX = 2;
+    public static final int FEED_ACTIVITY_ARECORD_ACTIVITY_TYPE_FIELD_INDEX = 3;
+    public static final int FEED_ACTIVITY_ARECORD_INGEST_NODES_FIELD_INDEX = 4;
+    public static final int FEED_ACTIVITY_ARECORD_COMPUTE_NODES_FIELD_INDEX = 5;
+    public static final int FEED_ACTIVITY_ARECORD_LAST_UPDATE_TIMESTAMP_FIELD_INDEX = 6;
 
     private static ARecordType createFeedActivityRecordType() throws AsterixException {
         AUnorderedListType unorderedListType = new AUnorderedListType(BuiltinType.ASTRING, null);
-        String[] fieldNames = { "DataverseName", "DatasetName", "Status", "IngestNodes", "ComputeNodes",
-                "UpdateTimestamp" };
-        IAType[] fieldTypes = { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, unorderedListType,
-                unorderedListType, BuiltinType.ASTRING };
+        String[] fieldNames = { "DataverseName", "DatasetName", "ActivityId", "ActivityType", "IngestNodes",
+                "ComputeNodes", "UpdateTimestamp" };
+        IAType[] fieldTypes = { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.AINT32, BuiltinType.ASTRING,
+                unorderedListType, unorderedListType, BuiltinType.ASTRING };
         return new ARecordType("FeedActivityRecordType", fieldNames, fieldTypes, true);
     }
 
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
index 4773ed0..c974dcd 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
@@ -56,6 +56,7 @@
 import edu.uci.ics.asterix.metadata.entities.Datatype;
 import edu.uci.ics.asterix.metadata.entities.Dataverse;
 import edu.uci.ics.asterix.metadata.entities.ExternalDatasetDetails;
+import edu.uci.ics.asterix.metadata.entities.FeedActivity;
 import edu.uci.ics.asterix.metadata.entities.FeedDatasetDetails;
 import edu.uci.ics.asterix.metadata.entities.Index;
 import edu.uci.ics.asterix.metadata.entities.InternalDatasetDetails;
@@ -99,6 +100,7 @@
 import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
 import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
 import edu.uci.ics.hyracks.algebricks.runtime.operators.std.SinkWriterRuntimeFactory;
+import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ILinearizeComparatorFactory;
@@ -148,6 +150,7 @@
     private FileSplit outputFile;
     private ResultSetId resultSetId;
     private IResultSerializerFactoryProvider resultSerializerFactoryProvider;
+    private IHyracksClientConnection hcc;
 
     private final Dataverse defaultDataverse;
     private JobId jobId;
@@ -461,12 +464,13 @@
 
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildFeedMessengerRuntime(
             AqlMetadataProvider metadataProvider, JobSpecification jobSpec, FeedDatasetDetails datasetDetails,
-            String dataverse, String dataset, List<IFeedMessage> feedMessages) throws AlgebricksException {
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc = metadataProvider
-                .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(dataverse, dataset, dataset);
+            String dataverse, String dataset, List<IFeedMessage> feedMessages, FeedActivity feedActivity)
+            throws AlgebricksException {
+        AlgebricksPartitionConstraint partitionConstraint = new AlgebricksAbsolutePartitionConstraint(feedActivity
+                .getIngestNodes().toArray(new String[] {}));
         FeedMessageOperatorDescriptor feedMessenger = new FeedMessageOperatorDescriptor(jobSpec, dataverse, dataset,
                 feedMessages);
-        return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(feedMessenger, spPc.second);
+        return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(feedMessenger, partitionConstraint);
     }
 
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildBtreeRuntime(JobSpecification jobSpec,
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/FeedJobEventListenerFactory.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/FeedJobEventListenerFactory.java
deleted file mode 100644
index 3e7510f..0000000
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/FeedJobEventListenerFactory.java
+++ /dev/null
@@ -1,110 +0,0 @@
-package edu.uci.ics.asterix.metadata.declared;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import edu.uci.ics.asterix.external.data.operator.FeedIntakeOperatorDescriptor;
-import edu.uci.ics.asterix.metadata.MetadataManager;
-import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
-import edu.uci.ics.asterix.metadata.entities.Dataset;
-import edu.uci.ics.asterix.metadata.entities.FeedDatasetDetails;
-import edu.uci.ics.asterix.metadata.entities.FeedDatasetDetails.FeedState;
-import edu.uci.ics.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
-import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
-import edu.uci.ics.hyracks.api.context.IHyracksJobletContext;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
-import edu.uci.ics.hyracks.api.job.IJobletEventListener;
-import edu.uci.ics.hyracks.api.job.IJobletEventListenerFactory;
-import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.api.job.JobInfo;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.api.job.JobStatus;
-
-public class FeedJobEventListenerFactory implements IJobletEventListenerFactory {
-
-    /**
-	 * 
-	 */
-    private static final long serialVersionUID = 1L;
-
-    private final Dataset dataset;
-    private final JobSpecification jobSpec;
-    private final IHyracksClientConnection hcc;
-    private IJobletEventListener jobEventListener;
-
-    public FeedJobEventListenerFactory(Dataset dataset, JobSpecification jobSpec, IHyracksClientConnection hcc) {
-        this.dataset = dataset;
-        this.jobSpec = jobSpec;
-        this.hcc = hcc;
-    }
-
-    @Override
-    public IJobletEventListener createListener(IHyracksJobletContext ctx) {
-
-        jobEventListener = new IJobletEventListener() {
-
-            private final List<String> ingestLocations = new ArrayList<String>();
-            private final List<String> computeLocations = new ArrayList<String>();
-
-            @Override
-            public void jobletStart(JobId jobId) {
-                List<OperatorDescriptorId> ingestOperatorIds = new ArrayList<OperatorDescriptorId>();
-                List<OperatorDescriptorId> computeOperatorIds = new ArrayList<OperatorDescriptorId>();
-
-                Map<OperatorDescriptorId, IOperatorDescriptor> operators = jobSpec.getOperatorMap();
-                for (Entry<OperatorDescriptorId, IOperatorDescriptor> entry : operators.entrySet()) {
-                    if (entry.getValue() instanceof FeedIntakeOperatorDescriptor) {
-                        ingestOperatorIds.add(entry.getKey());
-                    } else if (entry.getValue() instanceof AlgebricksMetaOperatorDescriptor) {
-                        computeOperatorIds.add(entry.getKey());
-                    }
-                }
-
-                try {
-                    JobInfo info = hcc.getJobInfo(jobId);
-
-                    for (OperatorDescriptorId ingestOpId : ingestOperatorIds) {
-                        ingestLocations.addAll(info.getOperatorLocations().get(ingestOpId));
-                    }
-                    for (OperatorDescriptorId computeOpId : computeOperatorIds) {
-                        computeLocations.addAll(info.getOperatorLocations().get(computeOpId));
-                    }
-                    System.out.println("job info " + info);
-
-                } catch (Exception e) {
-                    // TODO Add Exception handling here
-                }
-            }
-
-            @Override
-            public void jobletFinish(JobStatus status) {
-                MetadataManager.INSTANCE.acquireWriteLatch();
-                MetadataTransactionContext mdTxnCtx = null;
-                try {
-                    mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-                    FeedDatasetDetails feedDetails = (FeedDatasetDetails) dataset.getDatasetDetails();
-                    feedDetails.setFeedState(FeedState.INACTIVE);
-                    feedDetails.setComputeNodes(null);
-                    feedDetails.setIngestNodes(null);
-                    MetadataManager.INSTANCE
-                            .dropDataset(mdTxnCtx, dataset.getDataverseName(), dataset.getDatasetName());
-                    MetadataManager.INSTANCE.addDataset(mdTxnCtx, dataset);
-                } catch (Exception e) {
-
-                }
-            }
-
-            public List<String> getIngestLocations() {
-                return ingestLocations;
-            }
-
-            public List<String> getComputeLocations() {
-                return computeLocations;
-            }
-        };
-        return jobEventListener;
-    }
-}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/FeedJobLifecycleListener.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/FeedJobLifecycleListener.java
new file mode 100644
index 0000000..0d044fe
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/FeedJobLifecycleListener.java
@@ -0,0 +1,267 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.metadata.declared;
+
+import java.io.Serializable;
+import java.rmi.RemoteException;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import edu.uci.ics.asterix.common.api.AsterixAppContextInfo;
+import edu.uci.ics.asterix.external.data.operator.FeedIntakeOperatorDescriptor;
+import edu.uci.ics.asterix.external.feed.lifecycle.FeedId;
+import edu.uci.ics.asterix.metadata.MetadataException;
+import edu.uci.ics.asterix.metadata.MetadataManager;
+import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
+import edu.uci.ics.asterix.metadata.entities.FeedActivity;
+import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityType;
+import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.std.AssignRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.std.EmptyTupleSourceRuntimeFactory;
+import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.IActivityClusterGraphGenerator;
+import edu.uci.ics.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
+import edu.uci.ics.hyracks.api.job.IJobLifecycleListener;
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobInfo;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.api.job.JobStatus;
+
+public class FeedJobLifecycleListener implements IJobLifecycleListener, Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    public static FeedJobLifecycleListener INSTANCE = new FeedJobLifecycleListener();
+
+    private LinkedBlockingQueue<Message> inbox;
+
+    private FeedJobLifecycleListener() {
+        inbox = new LinkedBlockingQueue<Message>();
+        feedJobNotificationHandler = new FeedJobNotificationHandler(inbox);
+        new Thread(feedJobNotificationHandler).start();
+    }
+
+    private final FeedJobNotificationHandler feedJobNotificationHandler;
+
+    @Override
+    public void notifyJobStart(JobId jobId) throws HyracksException {
+        if (feedJobNotificationHandler.isRegisteredFeed(jobId)) {
+            inbox.add(new Message(jobId, Message.MessageKind.JOB_START));
+        }
+    }
+
+    @Override
+    public void notifyJobFinish(JobId jobId) throws HyracksException {
+        if (feedJobNotificationHandler.isRegisteredFeed(jobId)) {
+            inbox.add(new Message(jobId, Message.MessageKind.JOB_FINISH));
+        }
+    }
+
+    @Override
+    public void notifyJobCreation(JobId jobId, IActivityClusterGraphGeneratorFactory acggf) throws HyracksException {
+        IActivityClusterGraphGenerator acgg = acggf.createActivityClusterGraphGenerator(jobId, AsterixAppContextInfo
+                .getInstance().getCCApplicationContext(), EnumSet.noneOf(JobFlag.class));
+        JobSpecification spec = acgg.getJobSpecification();
+        boolean feedIngestionJob = false;
+        FeedId feedId = null;
+        for (IOperatorDescriptor opDesc : spec.getOperatorMap().values()) {
+            if (!(opDesc instanceof FeedIntakeOperatorDescriptor)) {
+                continue;
+            }
+            feedId = ((FeedIntakeOperatorDescriptor) opDesc).getFeedId();
+            feedIngestionJob = true;
+            break;
+        }
+        if (feedIngestionJob) {
+            feedJobNotificationHandler.registerFeed(feedId, jobId, spec);
+        }
+    }
+
+    private static class Message {
+        public JobId jobId;
+
+        public enum MessageKind {
+            JOB_START,
+            JOB_FINISH
+        }
+
+        public MessageKind messageKind;
+
+        public Message(JobId jobId, MessageKind msgKind) {
+            this.jobId = jobId;
+            this.messageKind = msgKind;
+        }
+    }
+
+    private static class FeedJobNotificationHandler implements Runnable, Serializable {
+
+        private static final long serialVersionUID = 1L;
+        private LinkedBlockingQueue<Message> inbox;
+        private Map<JobId, FeedInfo> registeredFeeds = new HashMap<JobId, FeedInfo>();
+
+        public FeedJobNotificationHandler(LinkedBlockingQueue<Message> inbox) {
+            this.inbox = inbox;
+        }
+
+        public boolean isRegisteredFeed(JobId jobId) {
+            return registeredFeeds.containsKey(jobId);
+        }
+
+        public void registerFeed(FeedId feedId, JobId jobId, JobSpecification jobSpec) {
+            if (registeredFeeds.containsKey(jobId)) {
+                throw new IllegalStateException(" Feed already registered ");
+            }
+            registeredFeeds.put(jobId, new FeedInfo(feedId, jobSpec));
+        }
+
+        @Override
+        public void run() {
+            Message mesg;
+            while (true) {
+                try {
+                    mesg = inbox.take();
+                    FeedInfo feedInfo = registeredFeeds.get(mesg.jobId);
+                    switch (mesg.messageKind) {
+                        case JOB_START:
+                            handleJobStartMessage(feedInfo, mesg);
+                            break;
+                        case JOB_FINISH:
+                            handleJobFinishMessage(feedInfo, mesg);
+                            break;
+                    }
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+
+            }
+        }
+
+        private void handleJobStartMessage(FeedInfo feedInfo, Message message) {
+            JobSpecification jobSpec = feedInfo.jobSpec;
+
+            List<OperatorDescriptorId> ingestOperatorIds = new ArrayList<OperatorDescriptorId>();
+            List<OperatorDescriptorId> computeOperatorIds = new ArrayList<OperatorDescriptorId>();
+
+            Map<OperatorDescriptorId, IOperatorDescriptor> operators = jobSpec.getOperatorMap();
+            for (Entry<OperatorDescriptorId, IOperatorDescriptor> entry : operators.entrySet()) {
+                if (entry.getValue() instanceof AlgebricksMetaOperatorDescriptor) {
+                    AlgebricksMetaOperatorDescriptor op = ((AlgebricksMetaOperatorDescriptor) entry.getValue());
+                    IPushRuntimeFactory[] runtimeFactories = op.getPipeline().getRuntimeFactories();
+                    for (IPushRuntimeFactory rf : runtimeFactories) {
+                        if (rf instanceof EmptyTupleSourceRuntimeFactory) {
+                            ingestOperatorIds.add(entry.getKey());
+                        } else if (rf instanceof AssignRuntimeFactory) {
+                            computeOperatorIds.add(entry.getKey());
+                        }
+                    }
+                }
+            }
+
+            try {
+                IHyracksClientConnection hcc = AsterixAppContextInfo.getInstance().getHcc();
+                JobInfo info = hcc.getJobInfo(message.jobId);
+
+                for (OperatorDescriptorId ingestOpId : ingestOperatorIds) {
+                    feedInfo.ingestLocations.addAll(info.getOperatorLocations().get(ingestOpId));
+                }
+                for (OperatorDescriptorId computeOpId : computeOperatorIds) {
+                    List<String> locations = info.getOperatorLocations().get(computeOpId);
+                    if (locations != null) {
+                        feedInfo.computeLocations.addAll(locations);
+                    } else {
+                        feedInfo.computeLocations.addAll(feedInfo.ingestLocations);
+                    }
+                }
+                FeedActivity feedActivity = new FeedActivity(feedInfo.feedId.getDataverse(),
+                        feedInfo.feedId.getDataset(), FeedActivityType.FEED_BEGIN, feedInfo.ingestLocations,
+                        feedInfo.computeLocations);
+
+                MetadataManager.INSTANCE.acquireWriteLatch();
+                MetadataTransactionContext mdTxnCtx = null;
+                try {
+                    mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+                    MetadataManager.INSTANCE.registerFeedActivity(mdTxnCtx, new FeedId(feedInfo.feedId.getDataverse(),
+                            feedInfo.feedId.getDataset()), feedActivity);
+                    MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+                } catch (Exception e) {
+                    MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+                } finally {
+                    MetadataManager.INSTANCE.releaseWriteLatch();
+                }
+            } catch (Exception e) {
+                // TODO Add Exception handling here
+            }
+        }
+
+        private void handleJobFinishMessage(FeedInfo feedInfo, Message message) {
+            MetadataManager.INSTANCE.acquireWriteLatch();
+            MetadataTransactionContext mdTxnCtx = null;
+
+            try {
+                IHyracksClientConnection hcc = AsterixAppContextInfo.getInstance().getHcc();
+                JobInfo info = hcc.getJobInfo(message.jobId);
+                JobStatus status = info.getPendingStatus();
+                Exception e;
+                boolean failure = status != null && status.equals(JobStatus.FAILURE);
+                FeedActivityType activityType = FeedActivityType.FEED_END;
+                if (failure) {
+                    e = info.getPendingException();
+                    activityType = FeedActivityType.FEED_FAILURE;
+                }
+                mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+                FeedActivity feedActivity = new FeedActivity(feedInfo.feedId.getDataverse(),
+                        feedInfo.feedId.getDataset(), activityType, feedInfo.ingestLocations, feedInfo.computeLocations);
+                MetadataManager.INSTANCE.registerFeedActivity(mdTxnCtx, new FeedId(feedInfo.feedId.getDataverse(),
+                        feedInfo.feedId.getDataset()), feedActivity);
+                MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+            } catch (RemoteException | ACIDException | MetadataException e) {
+                try {
+                    MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+                } catch (RemoteException | ACIDException ae) {
+                    throw new IllegalStateException(" Unable to abort ");
+                }
+            } catch (Exception e) {
+                // add exception handling here
+            } finally {
+                MetadataManager.INSTANCE.releaseWriteLatch();
+            }
+        }
+
+    }
+
+    private static class FeedInfo {
+        public FeedId feedId;
+        public JobSpecification jobSpec;
+        public List<String> ingestLocations = new ArrayList<String>();
+        public List<String> computeLocations = new ArrayList<String>();
+
+        public FeedInfo(FeedId feedId, JobSpecification jobSpec) {
+            this.feedId = feedId;
+            this.jobSpec = jobSpec;
+        }
+
+    }
+}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/FeedActivity.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/FeedActivity.java
index 97f7a1e..eb2673f 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/FeedActivity.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/FeedActivity.java
@@ -17,32 +17,41 @@
 
 import java.util.List;
 
-import edu.uci.ics.asterix.metadata.IDatasetDetails;
 import edu.uci.ics.asterix.metadata.MetadataCache;
 import edu.uci.ics.asterix.metadata.api.IMetadataEntity;
-import edu.uci.ics.asterix.metadata.entities.FeedDatasetDetails.FeedState;
 
 /**
  * Metadata describing a feed activity record.
  */
-public class FeedActivity implements IMetadataEntity {
+public class FeedActivity implements IMetadataEntity, Comparable<FeedActivity> {
 
     private static final long serialVersionUID = 1L;
 
+    private int activityId;
+
     private final String dataverseName;
     // Enforced to be unique within a dataverse.
     private final String datasetName;
 
-    private FeedState feedState;
     private List<String> ingestNodes;
     private List<String> computeNodes;
     private String lastUpdatedTimestamp;
+    private FeedActivityType activityType;
 
-    public FeedActivity(String dataverseName, String datasetName, String feedState, List<String> ingestNodes,
-            List<String> computeNodes) {
+    public static enum FeedActivityType {
+        FEED_BEGIN,
+        FEED_END,
+        FEED_FAILURE,
+        FEED_STATS,
+        FEED_EXPAND,
+        FEED_SHRINK
+    }
+
+    public FeedActivity(String dataverseName, String datasetName, FeedActivityType feedActivityType,
+            List<String> ingestNodes, List<String> computeNodes) {
         this.dataverseName = dataverseName;
         this.datasetName = datasetName;
-        this.feedState = FeedState.valueOf(feedState);
+        this.activityType = feedActivityType;
         this.ingestNodes = ingestNodes;
         this.computeNodes = computeNodes;
     }
@@ -83,12 +92,12 @@
         return true;
     }
 
-    public FeedState getFeedState() {
-        return feedState;
+    public FeedActivityType getFeedActivityType() {
+        return activityType;
     }
 
-    public void setFeedState(FeedState feedState) {
-        this.feedState = feedState;
+    public void setFeedActivityType(FeedActivityType feedActivityType) {
+        this.activityType = feedActivityType;
     }
 
     public List<String> getIngestNodes() {
@@ -114,4 +123,17 @@
     public void setLastUpdatedTimestamp(String lastUpdatedTimestamp) {
         this.lastUpdatedTimestamp = lastUpdatedTimestamp;
     }
+
+    public int getActivityId() {
+        return activityId;
+    }
+
+    public void setActivityId(int activityId) {
+        this.activityId = activityId;
+    }
+
+    @Override
+    public int compareTo(FeedActivity o) {
+        return this.activityId - o.getActivityId();
+    }
 }
\ No newline at end of file
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/AbstractTupleTranslator.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/AbstractTupleTranslator.java
index 377efc7..c3f79d0 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/AbstractTupleTranslator.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/AbstractTupleTranslator.java
@@ -20,6 +20,7 @@
 import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
 import edu.uci.ics.asterix.metadata.api.IMetadataEntityTupleTranslator;
 import edu.uci.ics.asterix.om.base.ABoolean;
+import edu.uci.ics.asterix.om.base.AInt32;
 import edu.uci.ics.asterix.om.base.AMutableString;
 import edu.uci.ics.asterix.om.base.AString;
 import edu.uci.ics.asterix.om.types.BuiltinType;
@@ -40,6 +41,10 @@
     @SuppressWarnings("unchecked")
     protected ISerializerDeserializer<ABoolean> booleanSerde = AqlSerializerDeserializerProvider.INSTANCE
             .getSerializerDeserializer(BuiltinType.ABOOLEAN);
+    @SuppressWarnings("unchecked")
+    protected ISerializerDeserializer<AInt32> int32Serde = AqlSerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(BuiltinType.AINT32);
+
     protected final IARecordBuilder recordBuilder;
     protected final ArrayBackedValueStorage fieldValue;
     protected final ArrayTupleBuilder tupleBuilder;
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/FeedActivityTupleTranslator.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/FeedActivityTupleTranslator.java
index a1acd38..e81eb3e 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/FeedActivityTupleTranslator.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/FeedActivityTupleTranslator.java
@@ -30,6 +30,7 @@
 import edu.uci.ics.asterix.metadata.bootstrap.MetadataPrimaryIndexes;
 import edu.uci.ics.asterix.metadata.bootstrap.MetadataRecordTypes;
 import edu.uci.ics.asterix.metadata.entities.FeedActivity;
+import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityType;
 import edu.uci.ics.asterix.om.base.AInt32;
 import edu.uci.ics.asterix.om.base.AMutableInt32;
 import edu.uci.ics.asterix.om.base.ARecord;
@@ -46,13 +47,16 @@
  * Translates a Dataset metadata entity to an ITupleReference and vice versa.
  */
 public class FeedActivityTupleTranslator extends AbstractTupleTranslator<FeedActivity> {
-    // Field indexes of serialized Dataset in a tuple.
-    // First key field.
-    public static final int FEED_ACTIVITY_DATAVERSENAME_TUPLE_FIELD_INDEX = 0;
-    // Second key field.
-    public static final int FEED_ACTIVITY_DATASET_DATASETNAME_TUPLE_FIELD_INDEX = 1;
-    // Payload field containing serialized Dataset.
-    public static final int FEED_ACTIVITY_PAYLOAD_TUPLE_FIELD_INDEX = 2;
+    // Field indexes of serialized FeedActivity in a tuple.
+    // Key field.
+    public static final int FEED_ACTIVITY_ACTIVITY_DATAVERSE_NAME_FIELD_INDEX = 0;
+
+    public static final int FEED_ACTIVITY_ACTIVITY_DATASET_NAME_FIELD_INDEX = 1;
+
+    public static final int FEED_ACTIVITY_ACTIVITY_ID_FIELD_INDEX = 2;
+
+    // Payload field containing serialized FeedActivity.
+    public static final int FEED_ACTIVITY_PAYLOAD_TUPLE_FIELD_INDEX = 3;
 
     @SuppressWarnings("unchecked")
     private ISerializerDeserializer<ARecord> recordSerDes = AqlSerializerDeserializerProvider.INSTANCE
@@ -62,7 +66,7 @@
 
     @SuppressWarnings("unchecked")
     public FeedActivityTupleTranslator(boolean getTuple) {
-        super(getTuple, MetadataPrimaryIndexes.DATASET_DATASET.getFieldCount());
+        super(getTuple, MetadataPrimaryIndexes.FEED_ACTIVITY_DATASET.getFieldCount());
         aInt32 = new AMutableInt32(-1);
         aInt32Serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT32);
     }
@@ -84,8 +88,10 @@
                 .getValueByPos(MetadataRecordTypes.FEED_ACTIVITY_ARECORD_DATAVERSE_NAME_FIELD_INDEX)).getStringValue();
         String datasetName = ((AString) feedActivityRecord
                 .getValueByPos(MetadataRecordTypes.FEED_ACTIVITY_ARECORD_DATASET_NAME_FIELD_INDEX)).getStringValue();
-        String status = ((AString) feedActivityRecord
-                .getValueByPos(MetadataRecordTypes.FEED_ACTIVITY_ARECORD_STATUS_FIELD_INDEX)).getStringValue();
+        int activityId = ((AInt32) feedActivityRecord
+                .getValueByPos(MetadataRecordTypes.FEED_ACTIVITY_ARECORD_ACTIVITY_ID_FIELD_INDEX)).getIntegerValue();
+        String feedActivityType = ((AString) feedActivityRecord
+                .getValueByPos(MetadataRecordTypes.FEED_ACTIVITY_ARECORD_ACTIVITY_TYPE_FIELD_INDEX)).getStringValue();
 
         List<String> ingestNodes = new ArrayList<String>();
         IACursor cursor = ((AUnorderedList) feedActivityRecord
@@ -101,22 +107,30 @@
             computeNodes.add(((AString) cursor.get()).getStringValue());
         }
 
-        return new FeedActivity(dataverseName, datasetName, status, ingestNodes, computeNodes);
+        FeedActivity fa = new FeedActivity(dataverseName, datasetName, FeedActivityType.valueOf(feedActivityType),
+                ingestNodes, computeNodes);
+        fa.setActivityId(activityId);
+        return fa;
     }
 
     @Override
     public ITupleReference getTupleFromMetadataEntity(FeedActivity feedActivity) throws IOException, MetadataException {
-        // write the key in the first 2 fields of the tuple
+        // write the key in the first three fields of the tuple
         ArrayBackedValueStorage itemValue = new ArrayBackedValueStorage();
+
         tupleBuilder.reset();
         aString.setValue(feedActivity.getDataverseName());
         stringSerde.serialize(aString, tupleBuilder.getDataOutput());
         tupleBuilder.addFieldEndOffset();
+
         aString.setValue(feedActivity.getDatasetName());
         stringSerde.serialize(aString, tupleBuilder.getDataOutput());
         tupleBuilder.addFieldEndOffset();
 
-        // write the pay-load in the third field of the tuple
+        aInt32.setValue(feedActivity.getActivityId());
+        int32Serde.serialize(aInt32, tupleBuilder.getDataOutput());
+        tupleBuilder.addFieldEndOffset();
+        // write the pay-load in the 2nd field of the tuple
 
         recordBuilder.reset(MetadataRecordTypes.FEED_ACTIVITY_RECORDTYPE);
 
@@ -134,11 +148,17 @@
 
         // write field 2
         fieldValue.reset();
-        aString.setValue(feedActivity.getFeedState().name());
-        stringSerde.serialize(aString, fieldValue.getDataOutput());
-        recordBuilder.addField(MetadataRecordTypes.FEED_ACTIVITY_ARECORD_STATUS_FIELD_INDEX, fieldValue);
+        aInt32.setValue(feedActivity.getActivityId());
+        int32Serde.serialize(aInt32, fieldValue.getDataOutput());
+        recordBuilder.addField(MetadataRecordTypes.FEED_ACTIVITY_ARECORD_ACTIVITY_ID_FIELD_INDEX, fieldValue);
 
         // write field 3
+        fieldValue.reset();
+        aString.setValue(feedActivity.getFeedActivityType().name());
+        stringSerde.serialize(aString, fieldValue.getDataOutput());
+        recordBuilder.addField(MetadataRecordTypes.FEED_ACTIVITY_ARECORD_ACTIVITY_TYPE_FIELD_INDEX, fieldValue);
+
+        // write field 4
         UnorderedListBuilder listBuilder = new UnorderedListBuilder();
         listBuilder
                 .reset((AUnorderedListType) MetadataRecordTypes.FEED_ACTIVITY_RECORDTYPE.getFieldTypes()[MetadataRecordTypes.FEED_ACTIVITY_ARECORD_INGEST_NODES_FIELD_INDEX]);
@@ -152,7 +172,7 @@
         listBuilder.write(fieldValue.getDataOutput(), true);
         recordBuilder.addField(MetadataRecordTypes.FEED_ACTIVITY_ARECORD_INGEST_NODES_FIELD_INDEX, fieldValue);
 
-        // write field 4
+        // write field 5
         listBuilder
                 .reset((AUnorderedListType) MetadataRecordTypes.FEED_ACTIVITY_RECORDTYPE.getFieldTypes()[MetadataRecordTypes.FEED_ACTIVITY_ARECORD_COMPUTE_NODES_FIELD_INDEX]);
         for (String field : feedActivity.getIngestNodes()) {
@@ -165,7 +185,7 @@
         listBuilder.write(fieldValue.getDataOutput(), true);
         recordBuilder.addField(MetadataRecordTypes.FEED_ACTIVITY_ARECORD_COMPUTE_NODES_FIELD_INDEX, fieldValue);
 
-        // write field 5
+        // write field 6
         fieldValue.reset();
         aString.setValue(Calendar.getInstance().getTime().toString());
         stringSerde.serialize(aString, fieldValue.getDataOutput());
diff --git a/asterix-om/src/main/java/edu/uci/ics/asterix/om/base/AMutableUnorderedList.java b/asterix-om/src/main/java/edu/uci/ics/asterix/om/base/AMutableUnorderedList.java
index 71c3986..375efa3 100644
--- a/asterix-om/src/main/java/edu/uci/ics/asterix/om/base/AMutableUnorderedList.java
+++ b/asterix-om/src/main/java/edu/uci/ics/asterix/om/base/AMutableUnorderedList.java
@@ -20,15 +20,20 @@
 
 public final class AMutableUnorderedList extends AUnorderedList {
 
-    public AMutableUnorderedList(AUnorderedListType type) {
-        super(type);
-    }
+	public AMutableUnorderedList(AUnorderedListType type) {
+		super(type);
+	}
 
-    public AMutableUnorderedList(AUnorderedListType type, ArrayList<IAObject> sequence) {
-        super(type, sequence);
-    }
+	public AMutableUnorderedList(AUnorderedListType type,
+			ArrayList<IAObject> sequence) {
+		super(type, sequence);
+	}
 
-    public void add(IAObject obj) {
-        values.add(obj);
-    }
+	public void add(IAObject obj) {
+		values.add(obj);
+	}
+
+	public void clear() {
+		values.clear();
+	}
 }
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/DataGenerator.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/DataGenerator.java
new file mode 100644
index 0000000..8996a85
--- /dev/null
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/DataGenerator.java
@@ -0,0 +1,2475 @@
+package edu.uci.ics.asterix.tools.external.data;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.nio.CharBuffer;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+import javax.xml.transform.OutputKeys;
+import javax.xml.transform.Transformer;
+import javax.xml.transform.TransformerException;
+import javax.xml.transform.TransformerFactory;
+import javax.xml.transform.dom.DOMSource;
+import javax.xml.transform.stream.StreamResult;
+
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+
+public class DataGenerator {
+
+    private static RandomDateGenerator randDateGen;
+    private static RandomNameGenerator randNameGen;
+    private static RandomEmploymentGenerator randEmpGen;
+    private static RandomMessageGenerator randMessageGen;
+    private static RandomLocationGenerator randLocationGen;
+
+    private static DistributionHandler fbDistHandler;
+    private static DistributionHandler twDistHandler;
+
+    private static int totalFbMessages;
+    private static int numFbOnlyUsers;
+    private static int totalTwMessages;
+    private static int numTwOnlyUsers;
+
+    private static int numCommonUsers;
+
+    private static int fbUserId;
+    private static int twUserId;
+
+    private static int fbMessageId;
+    private static int twMessageId;
+
+    private static Random random = new Random();
+
+    private static String commonUserFbSuffix = "_fb";
+    private static String commonUserTwSuffix = "_tw";
+
+    private static String outputDir;
+
+    private static PartitionConfiguration partition;
+
+    private static FacebookUser fbUser = new FacebookUser();
+    private static TwitterUser twUser = new TwitterUser();
+
+    private static FacebookMessage fbMessage = new FacebookMessage();
+    private static TweetMessage twMessage = new TweetMessage();
+
+    private static int duration;
+
+    private static DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
+
+    private static void generateFacebookOnlyUsers(int numFacebookUsers) throws IOException {
+        FileAppender appender = FileUtil.getFileAppender(outputDir + "/" + "fb_users.adm", true, true);
+        FileAppender messageAppender = FileUtil.getFileAppender(outputDir + "/" + "fb_message.adm", true, true);
+
+        for (int i = 0; i < numFacebookUsers; i++) {
+            getFacebookUser(null);
+            appender.appendToFile(fbUser.toString());
+            generateFacebookMessages(fbUser, messageAppender, -1);
+        }
+        appender.close();
+        messageAppender.close();
+    }
+
+    private static void generateTwitterOnlyUsers(int numTwitterUsers) throws IOException {
+        FileAppender appender = FileUtil.getFileAppender(outputDir + "/" + "tw_users.adm", true, true);
+        FileAppender messageAppender = FileUtil.getFileAppender(outputDir + "/" + "tw_message.adm", true, true);
+
+        for (int i = 0; i < numTwitterUsers; i++) {
+            getTwitterUser(null);
+            appender.appendToFile(twUser.toString());
+            generateTwitterMessages(twUser, messageAppender, -1);
+        }
+        appender.close();
+        messageAppender.close();
+    }
+
+    private static void generateCommonUsers() throws IOException {
+        FileAppender fbAppender = FileUtil.getFileAppender(outputDir + "/" + "fb_users.adm", true, false);
+        FileAppender twAppender = FileUtil.getFileAppender(outputDir + "/" + "tw_users.adm", true, false);
+        FileAppender fbMessageAppender = FileUtil.getFileAppender(outputDir + "/" + "fb_message.adm", true, false);
+        FileAppender twMessageAppender = FileUtil.getFileAppender(outputDir + "/" + "tw_message.adm", true, false);
+
+        for (int i = 0; i < numCommonUsers; i++) {
+            getFacebookUser(commonUserFbSuffix);
+            fbAppender.appendToFile(fbUser.toString());
+            generateFacebookMessages(fbUser, fbMessageAppender, -1);
+
+            getCorrespondingTwitterUser(fbUser);
+            twAppender.appendToFile(twUser.toString());
+            generateTwitterMessages(twUser, twMessageAppender, -1);
+        }
+
+        fbAppender.close();
+        twAppender.close();
+        fbMessageAppender.close();
+        twMessageAppender.close();
+    }
+
+    private static void generateFacebookMessages(FacebookUser user, FileAppender appender, int numMsg)
+            throws IOException {
+        Message message;
+        int numMessages = 0;
+        if (numMsg == -1) {
+            numMessages = fbDistHandler
+                    .getFromDistribution(fbUserId - partition.getTargetPartition().getFbUserKeyMin());
+        }
+        for (int i = 0; i < numMessages; i++) {
+            message = randMessageGen.getNextRandomMessage();
+            Point location = randLocationGen.getRandomPoint();
+            fbMessage.reset(fbMessageId++, user.getId(), random.nextInt(totalFbMessages + 1), location, message);
+            appender.appendToFile(fbMessage.toString());
+        }
+    }
+
+    private static void generateTwitterMessages(TwitterUser user, FileAppender appender, int numMsg) throws IOException {
+        Message message;
+        int numMessages = 0;
+        if (numMsg == -1) {
+            numMessages = twDistHandler
+                    .getFromDistribution(twUserId - partition.getTargetPartition().getTwUserKeyMin());
+        }
+
+        for (int i = 0; i < numMessages; i++) {
+            message = randMessageGen.getNextRandomMessage();
+            Point location = randLocationGen.getRandomPoint();
+            DateTime sendTime = randDateGen.getNextRandomDatetime();
+            twMessage.reset(twMessageId + "", user, location, sendTime, message.getReferredTopics(), message);
+            twMessageId++;
+            appender.appendToFile(twMessage.toString());
+        }
+    }
+
+    public static Iterator<TweetMessage> getTwitterMessageIterator() {
+        return new TweetMessageIterator(duration);
+    }
+
+    public static class TweetMessageIterator implements Iterator<TweetMessage> {
+
+        private final int duration;
+        private long startTime = 0;
+
+        public TweetMessageIterator(int duration) {
+            this.duration = duration;
+        }
+
+        @Override
+        public boolean hasNext() {
+            if (startTime == 0) {
+                startTime = System.currentTimeMillis();
+            }
+            return System.currentTimeMillis() - startTime < duration * 1000;
+        }
+
+        @Override
+        public TweetMessage next() {
+            getTwitterUser(null);
+            Message message = randMessageGen.getNextRandomMessage();
+            Point location = randLocationGen.getRandomPoint();
+            DateTime sendTime = randDateGen.getNextRandomDatetime();
+            twMessage.reset(twMessageId + "", twUser, location, sendTime, message.getReferredTopics(), message);
+            twMessageId++;
+            if (twUserId > numTwOnlyUsers) {
+                twUserId = 1;
+            }
+            return twMessage;
+
+        }
+
+        @Override
+        public void remove() {
+            // TODO Auto-generated method stub
+
+        }
+
+    }
+
+    public static class InitializationInfo {
+        public Date startDate = new Date(1, 1, 2005);
+        public Date endDate = new Date(8, 20, 2012);
+        public String[] lastNames = DataGenerator.lastNames;
+        public String[] firstNames = DataGenerator.firstNames;
+        public String[] vendors = DataGenerator.vendors;
+        public String[] jargon = DataGenerator.jargon;
+        public String[] org_list = DataGenerator.org_list;
+        public int percentEmployed = 90;
+        public Date employmentStartDate = new Date(1, 1, 2000);
+        public Date employmentEndDate = new Date(31, 12, 2012);
+        public int totalFbMessages;
+        public int numFbOnlyUsers;
+        public int totalTwMessages;
+        public int numTwOnlyUsers = 5000;
+        public int numCommonUsers;
+        public int fbUserIdMin;
+        public int fbMessageIdMin;
+        public int twUserIdMin;
+        public int twMessageIdMin;
+        public int timeDurationInSecs = 60;
+
+    }
+
+    public static void initialize(InitializationInfo info) {
+        randDateGen = new RandomDateGenerator(info.startDate, info.endDate);
+        randNameGen = new RandomNameGenerator(info.firstNames, info.lastNames);
+        randEmpGen = new RandomEmploymentGenerator(info.percentEmployed, info.employmentStartDate,
+                info.employmentEndDate, info.org_list);
+        randLocationGen = new RandomLocationGenerator(24, 49, 66, 98);
+        randMessageGen = new RandomMessageGenerator(info.vendors, info.jargon);
+        fbDistHandler = new DistributionHandler(info.totalFbMessages, 0.5, info.numFbOnlyUsers + info.numCommonUsers);
+        twDistHandler = new DistributionHandler(info.totalTwMessages, 0.5, info.numTwOnlyUsers + info.numCommonUsers);
+        fbUserId = info.fbUserIdMin;
+        twUserId = info.twUserIdMin;
+
+        fbMessageId = info.fbMessageIdMin;
+        twMessageId = info.fbMessageIdMin;
+        duration = info.timeDurationInSecs;
+    }
+
+    public static void main(String args[]) throws Exception {
+
+        String controllerInstallDir = null;
+        if (args.length < 2) {
+            printUsage();
+            System.exit(1);
+        } else {
+            controllerInstallDir = args[0];
+            String partitionConfXML = controllerInstallDir + "/output/partition-conf.xml";
+            String partitionName = args[1];
+            partition = XMLUtil.getPartitionConfiguration(partitionConfXML, partitionName);
+        }
+
+        // 1
+        randDateGen = new RandomDateGenerator(new Date(1, 1, 2005), new Date(8, 20, 2012));
+
+        String firstNameFile = controllerInstallDir + "/metadata/firstNames.txt";
+        String lastNameFile = controllerInstallDir + "/metadata/lastNames.txt";
+        String vendorFile = controllerInstallDir + "/metadata/vendors.txt";
+        String jargonFile = controllerInstallDir + "/metadata/jargon.txt";
+        String orgList = controllerInstallDir + "/metadata/org_list.txt";
+
+        randNameGen = new RandomNameGenerator(firstNameFile, lastNameFile);
+        randEmpGen = new RandomEmploymentGenerator(90, new Date(1, 1, 2000), new Date(8, 20, 2012), orgList);
+        randLocationGen = new RandomLocationGenerator(24, 49, 66, 98);
+        randMessageGen = new RandomMessageGenerator(vendorFile, jargonFile);
+
+        totalFbMessages = partition.getTargetPartition().getFbMessageIdMax()
+                - partition.getTargetPartition().getFbMessageIdMin() + 1;
+        numFbOnlyUsers = (partition.getTargetPartition().getFbUserKeyMax()
+                - partition.getTargetPartition().getFbUserKeyMin() + 1)
+                - partition.getTargetPartition().getCommonUsers();
+
+        totalTwMessages = partition.getTargetPartition().getTwMessageIdMax()
+                - partition.getTargetPartition().getTwMessageIdMin() + 1;
+        numTwOnlyUsers = (partition.getTargetPartition().getTwUserKeyMax()
+                - partition.getTargetPartition().getTwUserKeyMin() + 1)
+                - partition.getTargetPartition().getCommonUsers();
+
+        numCommonUsers = partition.getTargetPartition().getCommonUsers();
+        fbDistHandler = new DistributionHandler(totalFbMessages, 0.5, numFbOnlyUsers + numCommonUsers);
+        twDistHandler = new DistributionHandler(totalTwMessages, 0.5, numTwOnlyUsers + numCommonUsers);
+
+        fbUserId = partition.getTargetPartition().getFbUserKeyMin();
+        twUserId = partition.getTargetPartition().getTwUserKeyMin();
+
+        fbMessageId = partition.getTargetPartition().getFbMessageIdMin();
+        twMessageId = partition.getTargetPartition().getTwMessageIdMin();
+
+        outputDir = partition.getSourcePartition().getPath();
+        generateData();
+    }
+
+    public static void printUsage() {
+        System.out.println(" Error: Invalid number of arguments ");
+        System.out.println(" Usage :" + " DataGenerator <path to configuration file> <partition name> ");
+    }
+
+    public static void generateData() throws IOException {
+        generateFacebookOnlyUsers(numFbOnlyUsers);
+        generateTwitterOnlyUsers(numTwOnlyUsers);
+        generateCommonUsers();
+        System.out.println("Partition :" + partition.getTargetPartition().getName() + " finished");
+    }
+
+    public static void getFacebookUser(String usernameSuffix) {
+        String suggestedName = randNameGen.getRandomName();
+        String[] nameComponents = suggestedName.split(" ");
+        String name = nameComponents[0] + nameComponents[1];
+        if (usernameSuffix != null) {
+            name = name + usernameSuffix;
+        }
+        String alias = nameComponents[0];
+        String userSince = randDateGen.getNextRandomDatetime().toString();
+        int numFriends = random.nextInt(25);
+        int[] friendIds = RandomUtil.getKFromN(numFriends, (numFbOnlyUsers + numCommonUsers));
+        Employment emp = randEmpGen.getRandomEmployment();
+        fbUser.reset(fbUserId++, alias, name, userSince, friendIds, emp);
+    }
+
+    public static void getTwitterUser(String usernameSuffix) {
+        String suggestedName = randNameGen.getRandomName();
+        String[] nameComponents = suggestedName.split(" ");
+        String screenName = nameComponents[0] + nameComponents[1] + randNameGen.getRandomNameSuffix();
+        String name = suggestedName;
+        if (usernameSuffix != null) {
+            name = name + usernameSuffix;
+        }
+        int numFriends = random.nextInt((int) (100)); // draw from Zipfian
+        int statusesCount = random.nextInt(500); // draw from Zipfian
+        int followersCount = random.nextInt((int) (200));
+        twUser.reset(screenName, numFriends, statusesCount, name, followersCount);
+        twUserId++;
+    }
+
+    public static void getCorrespondingTwitterUser(FacebookUser fbUser) {
+        String screenName = fbUser.getName().substring(0, fbUser.getName().lastIndexOf(commonUserFbSuffix))
+                + commonUserTwSuffix;
+        String name = screenName.split(" ")[0];
+        int numFriends = random.nextInt((int) ((numTwOnlyUsers + numCommonUsers)));
+        int statusesCount = random.nextInt(500); // draw from Zipfian
+        int followersCount = random.nextInt((int) (numTwOnlyUsers + numCommonUsers));
+        twUser.reset(screenName, numFriends, statusesCount, name, followersCount);
+    }
+
+    public static class RandomDateGenerator {
+
+        private final Date startDate;
+        private final Date endDate;
+        private final Random random = new Random();
+        private final int yearDifference;
+        private Date workingDate;
+        private Date recentDate;
+        private DateTime dateTime;
+
+        public RandomDateGenerator(Date startDate, Date endDate) {
+            this.startDate = startDate;
+            this.endDate = endDate;
+            yearDifference = endDate.getYear() - startDate.getYear() + 1;
+            workingDate = new Date();
+            recentDate = new Date();
+            dateTime = new DateTime();
+        }
+
+        public Date getStartDate() {
+            return startDate;
+        }
+
+        public Date getEndDate() {
+            return endDate;
+        }
+
+        public Date getNextRandomDate() {
+            int year = random.nextInt(yearDifference) + startDate.getYear();
+            int month;
+            int day;
+            if (year == endDate.getYear()) {
+                month = random.nextInt(endDate.getMonth()) + 1;
+                if (month == endDate.getMonth()) {
+                    day = random.nextInt(endDate.getDay()) + 1;
+                } else {
+                    day = random.nextInt(28) + 1;
+                }
+            } else {
+                month = random.nextInt(12) + 1;
+                day = random.nextInt(28) + 1;
+            }
+            workingDate.reset(month, day, year);
+            return workingDate;
+        }
+
+        public DateTime getNextRandomDatetime() {
+            Date randomDate = getNextRandomDate();
+            dateTime.reset(randomDate);
+            return dateTime;
+        }
+
+        public Date getNextRecentDate(Date date) {
+            int year = date.getYear()
+                    + (date.getYear() == endDate.getYear() ? 0 : random.nextInt(endDate.getYear() - date.getYear()));
+            int month = (year == endDate.getYear()) ? date.getMonth() == endDate.getMonth() ? (endDate.getMonth())
+                    : (date.getMonth() + random.nextInt(endDate.getMonth() - date.getMonth())) : random.nextInt(12) + 1;
+
+            int day = (year == endDate.getYear()) ? month == endDate.getMonth() ? date.getDay() == endDate.getDay() ? endDate
+                    .getDay() : date.getDay() + random.nextInt(endDate.getDay() - date.getDay())
+                    : random.nextInt(28) + 1
+                    : random.nextInt(28) + 1;
+            recentDate.reset(month, day, year);
+            return recentDate;
+        }
+
+        public static void main(String args[]) throws Exception {
+            Date date = new Date(2, 20, 2012);
+            RandomDateGenerator dgen = new RandomDateGenerator(new Date(1, 1, 2005), new Date(8, 20, 2012));
+            while (true) {
+                Date nextDate = dgen.getNextRandomDate();
+                if (nextDate.getDay() == 0) {
+                    throw new Exception("invalid date " + nextDate);
+                }
+
+                // System.out.println(" original date: " + date);
+                System.out.println(nextDate);
+            }
+        }
+    }
+
+    public static class DateTime extends Date {
+
+        private String hour = "10";
+        private String min = "10";
+        private String sec = "00";
+        private long chrononTime;
+
+        public DateTime(int month, int day, int year, String hour, String min, String sec) {
+            super(month, day, year);
+            this.hour = hour;
+            this.min = min;
+            this.sec = sec;
+            chrononTime = new java.util.Date(year, month, day, Integer.parseInt(hour), Integer.parseInt(min),
+                    Integer.parseInt(sec)).getTime();
+        }
+
+        public void reset(int month, int day, int year, String hour, String min, String sec) {
+            super.setDay(month);
+            super.setDay(day);
+            super.setYear(year);
+            this.hour = hour;
+            this.min = min;
+            this.sec = sec;
+            chrononTime = new java.util.Date(year, month, day, Integer.parseInt(hour), Integer.parseInt(min),
+                    Integer.parseInt(sec)).getTime();
+        }
+
+        public DateTime() {
+        }
+
+        public DateTime(Date date) {
+            super(date.getMonth(), date.getDay(), date.getYear());
+        }
+
+        public void reset(Date date) {
+            reset(date.getMonth(), date.getDay(), date.getYear());
+        }
+
+        public DateTime(Date date, int hour, int min, int sec) {
+            super(date.getMonth(), date.getDay(), date.getYear());
+            this.hour = (hour < 10) ? "0" : "" + hour;
+            this.min = (min < 10) ? "0" : "" + min;
+            this.sec = (sec < 10) ? "0" : "" + sec;
+        }
+
+        public long getChrononTime() {
+            return chrononTime;
+        }
+
+        public String getHour() {
+            return hour;
+        }
+
+        public String getMin() {
+            return min;
+        }
+
+        public String getSec() {
+            return sec;
+        }
+
+        public String toString() {
+            StringBuilder builder = new StringBuilder();
+            builder.append("datetime");
+            builder.append("(\"");
+            builder.append(super.getYear());
+            builder.append("-");
+            builder.append(super.getMonth() < 10 ? "0" + super.getMonth() : super.getMonth());
+            builder.append("-");
+            builder.append(super.getDay() < 10 ? "0" + super.getDay() : super.getDay());
+            builder.append("T");
+            builder.append(hour + ":" + min + ":" + sec);
+            builder.append("\")");
+            return builder.toString();
+        }
+    }
+
+    public static class Message {
+
+        private char[] message = new char[500];
+        private List<String> referredTopics;
+        private int length;
+
+        public Message(char[] m, List<String> referredTopics) {
+            System.arraycopy(m, 0, message, 0, m.length);
+            length = m.length;
+            this.referredTopics = referredTopics;
+        }
+
+        public Message() {
+            referredTopics = new ArrayList<String>();
+            length = 0;
+        }
+
+        public char[] getMessage() {
+            return message;
+        }
+
+        public List<String> getReferredTopics() {
+            return referredTopics;
+        }
+
+        public void reset(char[] m, int offset, int length, List<String> referredTopics) {
+            System.arraycopy(m, offset, message, 0, length);
+            this.length = length;
+            this.referredTopics = referredTopics;
+        }
+
+        public int getLength() {
+            return length;
+        }
+
+        public char charAt(int index) {
+            return message[index];
+        }
+
+    }
+
+    public static class Point {
+
+        private float latitude;
+        private float longitude;
+
+        public float getLatitude() {
+            return latitude;
+        }
+
+        public float getLongitude() {
+            return longitude;
+        }
+
+        public Point(float latitude, float longitude) {
+            this.latitude = latitude;
+            this.longitude = longitude;
+        }
+
+        public void reset(float latitude, float longitude) {
+            this.latitude = latitude;
+            this.longitude = longitude;
+        }
+
+        public Point() {
+        }
+
+        public String toString() {
+            StringBuilder builder = new StringBuilder();
+            builder.append("point(\"" + latitude + "," + longitude + "\")");
+            return builder.toString();
+        }
+    }
+
+    public static class RandomNameGenerator {
+
+        private String[] firstNames;
+        private String[] lastNames;
+
+        private final Random random = new Random();
+
+        private final String[] connectors = new String[] { "_", "#", "$", "@" };
+
+        public RandomNameGenerator(String firstNameFilePath, String lastNameFilePath) throws IOException {
+            firstNames = FileUtil.listyFile(new File(firstNameFilePath)).toArray(new String[] {});
+            lastNames = FileUtil.listyFile(new File(lastNameFilePath)).toArray(new String[] {});
+        }
+
+        public RandomNameGenerator(String[] firstNames, String[] lastNames) {
+            this.firstNames = firstNames;
+            this.lastNames = lastNames;
+        }
+
+        public String getRandomName() {
+            String name;
+            name = getSuggestedName();
+            return name;
+
+        }
+
+        private String getSuggestedName() {
+            int firstNameIndex = random.nextInt(firstNames.length);
+            int lastNameIndex = random.nextInt(lastNames.length);
+            String suggestedName = firstNames[firstNameIndex] + " " + lastNames[lastNameIndex];
+            return suggestedName;
+        }
+
+        public String getRandomNameSuffix() {
+            return connectors[random.nextInt(connectors.length)] + random.nextInt(1000);
+        }
+    }
+
+    public static class RandomMessageGenerator {
+
+        private final MessageTemplate messageTemplate;
+
+        public RandomMessageGenerator(String vendorFilePath, String jargonFilePath) throws IOException {
+            List<String> vendors = FileUtil.listyFile(new File(vendorFilePath));
+            List<String> jargon = FileUtil.listyFile(new File(jargonFilePath));
+            this.messageTemplate = new MessageTemplate(vendors, jargon);
+        }
+
+        public RandomMessageGenerator(String[] vendors, String[] jargon) {
+            List<String> vendorList = new ArrayList<String>();
+            for (String v : vendors) {
+                vendorList.add(v);
+            }
+            List<String> jargonList = new ArrayList<String>();
+            for (String j : jargon) {
+                jargonList.add(j);
+            }
+            this.messageTemplate = new MessageTemplate(vendorList, jargonList);
+        }
+
+        public Message getNextRandomMessage() {
+            return messageTemplate.getNextMessage();
+        }
+    }
+
+    public static class AbstractMessageTemplate {
+
+        protected final Random random = new Random();
+
+        protected String[] positiveVerbs = new String[] { "like", "love" };
+        protected String[] negativeVerbs = new String[] { "dislike", "hate", "can't stand" };
+
+        protected String[] negativeAdjectives = new String[] { "horrible", "bad", "terrible", "OMG" };
+        protected String[] postiveAdjectives = new String[] { "good", "awesome", "amazing", "mind-blowing" };
+
+        protected String[] otherWords = new String[] { "the", "its" };
+    }
+
+    public static class MessageTemplate extends AbstractMessageTemplate {
+
+        private List<String> vendors;
+        private List<String> jargon;
+        private CharBuffer buffer;
+        private List<String> referredTopics;
+        private Message message = new Message();
+
+        public MessageTemplate(List<String> vendors, List<String> jargon) {
+            this.vendors = vendors;
+            this.jargon = jargon;
+            buffer = CharBuffer.allocate(2500);
+            referredTopics = new ArrayList<String>();
+        }
+
+        public Message getNextMessage() {
+            buffer.position(0);
+            buffer.limit(2500);
+            referredTopics.clear();
+            boolean isPositive = random.nextBoolean();
+            String[] verbArray = isPositive ? positiveVerbs : negativeVerbs;
+            String[] adjectiveArray = isPositive ? postiveAdjectives : negativeAdjectives;
+            String verb = verbArray[random.nextInt(verbArray.length)];
+            String adjective = adjectiveArray[random.nextInt(adjectiveArray.length)];
+
+            buffer.put(" ");
+            buffer.put(verb);
+            buffer.put(" ");
+            String vendor = vendors.get(random.nextInt(vendors.size()));
+            referredTopics.add(vendor);
+            buffer.append(vendor);
+            buffer.append(" ");
+            buffer.append(otherWords[random.nextInt(otherWords.length)]);
+            buffer.append(" ");
+            String jargonTerm = jargon.get(random.nextInt(jargon.size()));
+            referredTopics.add(jargonTerm);
+            buffer.append(jargonTerm);
+            buffer.append(" is ");
+            buffer.append(adjective);
+            if (random.nextBoolean()) {
+                buffer.append(isPositive ? ":)" : ":(");
+            }
+
+            buffer.flip();
+            message.reset(buffer.array(), 0, buffer.limit(), referredTopics);
+            return message;
+        }
+    }
+
+    public static class RandomUtil {
+
+        public static Random random = new Random();
+
+        public static int[] getKFromN(int k, int n) {
+            int[] result = new int[k];
+            int cnt = 0;
+            HashSet<Integer> values = new HashSet<Integer>();
+            while (cnt < k) {
+                int val = random.nextInt(n + 1);
+                if (values.contains(val)) {
+                    continue;
+                }
+
+                result[cnt++] = val;
+                values.add(val);
+            }
+            return result;
+        }
+    }
+
+    public static class FileUtil {
+
+        public static List<String> listyFile(File file) throws IOException {
+
+            BufferedReader reader = new BufferedReader(new FileReader(file));
+            String line;
+            List<String> list = new ArrayList<String>();
+            while (true) {
+                line = reader.readLine();
+                if (line == null) {
+                    break;
+                }
+                list.add(line);
+            }
+            return list;
+        }
+
+        public static FileAppender getFileAppender(String filePath, boolean createIfNotExists, boolean overwrite)
+                throws IOException {
+            return new FileAppender(filePath, createIfNotExists, overwrite);
+        }
+    }
+
+    public static class FileAppender {
+
+        private final BufferedWriter writer;
+
+        public FileAppender(String filePath, boolean createIfNotExists, boolean overwrite) throws IOException {
+            File file = new File(filePath);
+            if (!file.exists()) {
+                if (createIfNotExists) {
+                    new File(file.getParent()).mkdirs();
+                } else {
+                    throw new IOException("path " + filePath + " does not exists");
+                }
+            }
+            this.writer = new BufferedWriter(new FileWriter(file, !overwrite));
+        }
+
+        public void appendToFile(String content) throws IOException {
+            writer.append(content);
+            writer.append("\n");
+        }
+
+        public void close() throws IOException {
+            writer.close();
+        }
+    }
+
+    public static class RandomEmploymentGenerator {
+
+        private final int percentEmployed;
+        private final Random random = new Random();
+        private final RandomDateGenerator randDateGen;
+        private final List<String> organizations;
+        private Employment emp;
+
+        public RandomEmploymentGenerator(int percentEmployed, Date beginEmpDate, Date endEmpDate, String orgListPath)
+                throws IOException {
+            this.percentEmployed = percentEmployed;
+            this.randDateGen = new RandomDateGenerator(beginEmpDate, endEmpDate);
+            organizations = FileUtil.listyFile(new File(orgListPath));
+            emp = new Employment();
+        }
+
+        public RandomEmploymentGenerator(int percentEmployed, Date beginEmpDate, Date endEmpDate, String[] orgList) {
+            this.percentEmployed = percentEmployed;
+            this.randDateGen = new RandomDateGenerator(beginEmpDate, endEmpDate);
+            organizations = new ArrayList<String>();
+            for (String org : orgList) {
+                organizations.add(org);
+            }
+            emp = new Employment();
+        }
+
+        public Employment getRandomEmployment() {
+            int empployed = random.nextInt(100) + 1;
+            boolean isEmployed = false;
+            if (empployed <= percentEmployed) {
+                isEmployed = true;
+            }
+            Date startDate = randDateGen.getNextRandomDate();
+            Date endDate = null;
+            if (!isEmployed) {
+                endDate = randDateGen.getNextRecentDate(startDate);
+            }
+            String org = organizations.get(random.nextInt(organizations.size()));
+            emp.reset(org, startDate, endDate);
+            return emp;
+        }
+    }
+
+    public static class RandomLocationGenerator {
+
+        private Random random = new Random();
+
+        private final int beginLat;
+        private final int endLat;
+        private final int beginLong;
+        private final int endLong;
+
+        private Point point;
+
+        public RandomLocationGenerator(int beginLat, int endLat, int beginLong, int endLong) {
+            this.beginLat = beginLat;
+            this.endLat = endLat;
+            this.beginLong = beginLong;
+            this.endLong = endLong;
+            this.point = new Point();
+        }
+
+        public Point getRandomPoint() {
+            int latMajor = beginLat + random.nextInt(endLat - beginLat);
+            int latMinor = random.nextInt(100);
+            float latitude = latMajor + ((float) latMinor) / 100;
+
+            int longMajor = beginLong + random.nextInt(endLong - beginLong);
+            int longMinor = random.nextInt(100);
+            float longitude = longMajor + ((float) longMinor) / 100;
+
+            point.reset(latitude, longitude);
+            return point;
+        }
+
+    }
+
+    public static class PartitionConfiguration {
+
+        private final TargetPartition targetPartition;
+        private final SourcePartition sourcePartition;
+
+        public PartitionConfiguration(SourcePartition sourcePartition, TargetPartition targetPartition) {
+            this.sourcePartition = sourcePartition;
+            this.targetPartition = targetPartition;
+        }
+
+        public TargetPartition getTargetPartition() {
+            return targetPartition;
+        }
+
+        public SourcePartition getSourcePartition() {
+            return sourcePartition;
+        }
+
+    }
+
+    public static class SourcePartition {
+
+        private final String name;
+        private final String host;
+        private final String path;
+
+        public SourcePartition(String name, String host, String path) {
+            this.name = name;
+            this.host = host;
+            this.path = path;
+        }
+
+        public String getName() {
+            return name;
+        }
+
+        public String getHost() {
+            return host;
+        }
+
+        public String getPath() {
+            return path;
+        }
+    }
+
+    public static class TargetPartition {
+        private final String name;
+        private final String host;
+        private final String path;
+        private final int fbUserKeyMin;
+        private final int fbUserKeyMax;
+        private final int twUserKeyMin;
+        private final int twUserKeyMax;
+        private final int fbMessageIdMin;
+        private final int fbMessageIdMax;
+        private final int twMessageIdMin;
+        private final int twMessageIdMax;
+        private final int commonUsers;
+
+        public TargetPartition(String partitionName, String host, String path, int fbUserKeyMin, int fbUserKeyMax,
+                int twUserKeyMin, int twUserKeyMax, int fbMessageIdMin, int fbMessageIdMax, int twMessageIdMin,
+                int twMessageIdMax, int commonUsers) {
+            this.name = partitionName;
+            this.host = host;
+            this.path = path;
+            this.fbUserKeyMin = fbUserKeyMin;
+            this.fbUserKeyMax = fbUserKeyMax;
+            this.twUserKeyMin = twUserKeyMin;
+            this.twUserKeyMax = twUserKeyMax;
+            this.twMessageIdMin = twMessageIdMin;
+            this.twMessageIdMax = twMessageIdMax;
+            this.fbMessageIdMin = fbMessageIdMin;
+            this.fbMessageIdMax = fbMessageIdMax;
+            this.commonUsers = commonUsers;
+        }
+
+        public String toString() {
+            StringBuilder builder = new StringBuilder();
+            builder.append(name);
+            builder.append(" ");
+            builder.append(host);
+            builder.append("\n");
+            builder.append(path);
+            builder.append("\n");
+            builder.append("fbUser:key:min");
+            builder.append(fbUserKeyMin);
+
+            builder.append("\n");
+            builder.append("fbUser:key:max");
+            builder.append(fbUserKeyMax);
+
+            builder.append("\n");
+            builder.append("twUser:key:min");
+            builder.append(twUserKeyMin);
+
+            builder.append("\n");
+            builder.append("twUser:key:max");
+            builder.append(twUserKeyMax);
+
+            builder.append("\n");
+            builder.append("fbMessage:key:min");
+            builder.append(fbMessageIdMin);
+
+            builder.append("\n");
+            builder.append("fbMessage:key:max");
+            builder.append(fbMessageIdMax);
+
+            builder.append("\n");
+            builder.append("twMessage:key:min");
+            builder.append(twMessageIdMin);
+
+            builder.append("\n");
+            builder.append("twMessage:key:max");
+            builder.append(twMessageIdMax);
+
+            builder.append("\n");
+            builder.append("twMessage:key:max");
+            builder.append(twMessageIdMax);
+
+            builder.append("\n");
+            builder.append("commonUsers");
+            builder.append(commonUsers);
+
+            return new String(builder);
+        }
+
+        public String getName() {
+            return name;
+        }
+
+        public String getHost() {
+            return host;
+        }
+
+        public int getFbUserKeyMin() {
+            return fbUserKeyMin;
+        }
+
+        public int getFbUserKeyMax() {
+            return fbUserKeyMax;
+        }
+
+        public int getTwUserKeyMin() {
+            return twUserKeyMin;
+        }
+
+        public int getTwUserKeyMax() {
+            return twUserKeyMax;
+        }
+
+        public int getFbMessageIdMin() {
+            return fbMessageIdMin;
+        }
+
+        public int getFbMessageIdMax() {
+            return fbMessageIdMax;
+        }
+
+        public int getTwMessageIdMin() {
+            return twMessageIdMin;
+        }
+
+        public int getTwMessageIdMax() {
+            return twMessageIdMax;
+        }
+
+        public int getCommonUsers() {
+            return commonUsers;
+        }
+
+        public String getPath() {
+            return path;
+        }
+    }
+
+    public static class Employment {
+
+        private String organization;
+        private Date startDate;
+        private Date endDate;
+
+        public Employment(String organization, Date startDate, Date endDate) {
+            this.organization = organization;
+            this.startDate = startDate;
+            this.endDate = endDate;
+        }
+
+        public Employment() {
+        }
+
+        public void reset(String organization, Date startDate, Date endDate) {
+            this.organization = organization;
+            this.startDate = startDate;
+            this.endDate = endDate;
+        }
+
+        public String getOrganization() {
+            return organization;
+        }
+
+        public Date getStartDate() {
+            return startDate;
+        }
+
+        public Date getEndDate() {
+            return endDate;
+        }
+
+        public String toString() {
+            StringBuilder builder = new StringBuilder("");
+            builder.append("{");
+            builder.append("\"organization-name\":");
+            builder.append("\"" + organization + "\"");
+            builder.append(",");
+            builder.append("\"start-date\":");
+            builder.append(startDate);
+            if (endDate != null) {
+                builder.append(",");
+                builder.append("\"end-date\":");
+                builder.append(endDate);
+            }
+            builder.append("}");
+            return new String(builder);
+        }
+
+    }
+
+    public static class FacebookMessage {
+
+        private int messageId;
+        private int authorId;
+        private int inResponseTo;
+        private Point senderLocation;
+        private Message message;
+
+        public int getMessageId() {
+            return messageId;
+        }
+
+        public int getAuthorID() {
+            return authorId;
+        }
+
+        public Point getSenderLocation() {
+            return senderLocation;
+        }
+
+        public Message getMessage() {
+            return message;
+        }
+
+        public int getInResponseTo() {
+            return inResponseTo;
+        }
+
+        public FacebookMessage() {
+
+        }
+
+        public FacebookMessage(int messageId, int authorId, int inResponseTo, Point senderLocation, Message message) {
+            this.messageId = messageId;
+            this.authorId = authorId;
+            this.inResponseTo = inResponseTo;
+            this.senderLocation = senderLocation;
+            this.message = message;
+        }
+
+        public void reset(int messageId, int authorId, int inResponseTo, Point senderLocation, Message message) {
+            this.messageId = messageId;
+            this.authorId = authorId;
+            this.inResponseTo = inResponseTo;
+            this.senderLocation = senderLocation;
+            this.message = message;
+        }
+
+        public String toString() {
+            StringBuilder builder = new StringBuilder();
+            builder.append("{");
+            builder.append("\"message-id\":");
+            builder.append(messageId);
+            builder.append(",");
+            builder.append("\"author-id\":");
+            builder.append(authorId);
+            builder.append(",");
+            builder.append("\"in-response-to\":");
+            builder.append(inResponseTo);
+            builder.append(",");
+            builder.append("\"sender-location\":");
+            builder.append(senderLocation);
+            builder.append(",");
+            builder.append("\"message\":");
+            builder.append("\"");
+            for (int i = 0; i < message.getLength(); i++) {
+                builder.append(message.charAt(i));
+            }
+            builder.append("\"");
+            builder.append("}");
+            return new String(builder);
+        }
+    }
+
+    public static class FacebookUser {
+
+        private int id;
+        private String alias;
+        private String name;
+        private String userSince;
+        private int[] friendIds;
+        private Employment employment;
+
+        public FacebookUser() {
+
+        }
+
+        public FacebookUser(int id, String alias, String name, String userSince, int[] friendIds, Employment employment) {
+            this.id = id;
+            this.alias = alias;
+            this.name = name;
+            this.userSince = userSince;
+            this.friendIds = friendIds;
+            this.employment = employment;
+        }
+
+        public int getId() {
+            return id;
+        }
+
+        public String getAlias() {
+            return alias;
+        }
+
+        public String getName() {
+            return name;
+        }
+
+        public String getUserSince() {
+            return userSince;
+        }
+
+        public int[] getFriendIds() {
+            return friendIds;
+        }
+
+        public Employment getEmployment() {
+            return employment;
+        }
+
+        public String toString() {
+            StringBuilder builder = new StringBuilder();
+            builder.append("{");
+            builder.append("\"id\":" + id);
+            builder.append(",");
+            builder.append("\"alias\":" + "\"" + alias + "\"");
+            builder.append(",");
+            builder.append("\"name\":" + "\"" + name + "\"");
+            builder.append(",");
+            builder.append("\"user-since\":" + userSince);
+            builder.append(",");
+            builder.append("\"friend-ids\":");
+            builder.append("{{");
+            for (int i = 0; i < friendIds.length; i++) {
+                builder.append(friendIds[i]);
+                builder.append(",");
+            }
+            if (friendIds.length > 0) {
+                builder.deleteCharAt(builder.lastIndexOf(","));
+            }
+            builder.append("}}");
+            builder.append(",");
+            builder.append("\"employment\":");
+            builder.append("[");
+            builder.append(employment.toString());
+            builder.append("]");
+            builder.append("}");
+            return builder.toString();
+        }
+
+        public void setId(int id) {
+            this.id = id;
+        }
+
+        public void setAlias(String alias) {
+            this.alias = alias;
+        }
+
+        public void setName(String name) {
+            this.name = name;
+        }
+
+        public void setUserSince(String userSince) {
+            this.userSince = userSince;
+        }
+
+        public void setFriendIds(int[] friendIds) {
+            this.friendIds = friendIds;
+        }
+
+        public void setEmployment(Employment employment) {
+            this.employment = employment;
+        }
+
+        public void reset(int id, String alias, String name, String userSince, int[] friendIds, Employment employment) {
+            this.id = id;
+            this.alias = alias;
+            this.name = name;
+            this.userSince = userSince;
+            this.friendIds = friendIds;
+            this.employment = employment;
+        }
+    }
+
+    public static class TweetMessage {
+
+        private String tweetid;
+        private TwitterUser user;
+        private Point senderLocation;
+        private DateTime sendTime;
+        private List<String> referredTopics;
+        private Message messageText;
+
+        public TweetMessage() {
+
+        }
+
+        public TweetMessage(String tweetid, TwitterUser user, Point senderLocation, DateTime sendTime,
+                List<String> referredTopics, Message messageText) {
+            this.tweetid = tweetid;
+            this.user = user;
+            this.senderLocation = senderLocation;
+            this.sendTime = sendTime;
+            this.referredTopics = referredTopics;
+            this.messageText = messageText;
+        }
+
+        public void reset(String tweetid, TwitterUser user, Point senderLocation, DateTime sendTime,
+                List<String> referredTopics, Message messageText) {
+            this.tweetid = tweetid;
+            this.user = user;
+            this.senderLocation = senderLocation;
+            this.sendTime = sendTime;
+            this.referredTopics = referredTopics;
+            this.messageText = messageText;
+        }
+
+        public String toString() {
+            StringBuilder builder = new StringBuilder();
+            builder.append("{");
+            builder.append("\"tweetid\":");
+            builder.append("\"" + tweetid + "\"");
+            builder.append(",");
+            builder.append("\"user\":");
+            builder.append(user);
+            builder.append(",");
+            builder.append("\"sender-location\":");
+            builder.append(senderLocation);
+            builder.append(",");
+            builder.append("\"send-time\":");
+            builder.append(sendTime);
+            builder.append(",");
+            builder.append("\"referred-topics\":");
+            builder.append("{{");
+            for (String topic : referredTopics) {
+                builder.append("\"" + topic + "\"");
+                builder.append(",");
+            }
+            if (referredTopics.size() > 0) {
+                builder.deleteCharAt(builder.lastIndexOf(","));
+            }
+            builder.append("}}");
+            builder.append(",");
+            builder.append("\"message-text\":");
+            builder.append("\"");
+            for (int i = 0; i < messageText.getLength(); i++) {
+                builder.append(messageText.charAt(i));
+            }
+            builder.append("\"");
+            builder.append("}");
+            return new String(builder);
+        }
+
+        public String getTweetid() {
+            return tweetid;
+        }
+
+        public void setTweetid(String tweetid) {
+            this.tweetid = tweetid;
+        }
+
+        public TwitterUser getUser() {
+            return user;
+        }
+
+        public void setUser(TwitterUser user) {
+            this.user = user;
+        }
+
+        public Point getSenderLocation() {
+            return senderLocation;
+        }
+
+        public void setSenderLocation(Point senderLocation) {
+            this.senderLocation = senderLocation;
+        }
+
+        public DateTime getSendTime() {
+            return sendTime;
+        }
+
+        public void setSendTime(DateTime sendTime) {
+            this.sendTime = sendTime;
+        }
+
+        public List<String> getReferredTopics() {
+            return referredTopics;
+        }
+
+        public void setReferredTopics(List<String> referredTopics) {
+            this.referredTopics = referredTopics;
+        }
+
+        public Message getMessageText() {
+            return messageText;
+        }
+
+        public void setMessageText(Message messageText) {
+            this.messageText = messageText;
+        }
+
+    }
+
+    public static class TwitterUser {
+
+        private String screenName;
+        private String lang = "en";
+        private int friendsCount;
+        private int statusesCount;
+        private String name;
+        private int followersCount;
+
+        public TwitterUser() {
+
+        }
+
+        public TwitterUser(String screenName, int friendsCount, int statusesCount, String name, int followersCount) {
+            this.screenName = screenName;
+            this.friendsCount = friendsCount;
+            this.statusesCount = statusesCount;
+            this.name = name;
+            this.followersCount = followersCount;
+        }
+
+        public void reset(String screenName, int friendsCount, int statusesCount, String name, int followersCount) {
+            this.screenName = screenName;
+            this.friendsCount = friendsCount;
+            this.statusesCount = statusesCount;
+            this.name = name;
+            this.followersCount = followersCount;
+        }
+
+        public String getScreenName() {
+            return screenName;
+        }
+
+        public int getFriendsCount() {
+            return friendsCount;
+        }
+
+        public int getStatusesCount() {
+            return statusesCount;
+        }
+
+        public String getName() {
+            return name;
+        }
+
+        public int getFollowersCount() {
+            return followersCount;
+        }
+
+        public String toString() {
+            StringBuilder builder = new StringBuilder();
+            builder.append("{");
+            builder.append("\"screen-name\":" + "\"" + screenName + "\"");
+            builder.append(",");
+            builder.append("\"lang\":" + "\"" + lang + "\"");
+            builder.append(",");
+            builder.append("\"friends_count\":" + friendsCount);
+            builder.append(",");
+            builder.append("\"statuses_count\":" + statusesCount);
+            builder.append(",");
+            builder.append("\"name\":" + "\"" + name + "\"");
+            builder.append(",");
+            builder.append("\"followers_count\":" + followersCount);
+            builder.append("}");
+            return builder.toString();
+        }
+
+    }
+
+    public static class DistributionHandler {
+
+        private final ZipfGenerator zipfGen;
+        private final int totalUsers;
+        private final int totalMessages;
+        private Random random = new Random();
+
+        public DistributionHandler(int totalMessages, double skew, int totalNumUsers) {
+            zipfGen = new ZipfGenerator(totalMessages, skew);
+            totalUsers = totalNumUsers;
+            this.totalMessages = totalMessages;
+        }
+
+        public int getFromDistribution(int rank) {
+            double prob = zipfGen.getProbability(rank);
+            int numMessages = (int) (prob * totalMessages);
+            return numMessages;
+        }
+
+        public static void main(String args[]) {
+            int totalMessages = 1000 * 4070;
+            double skew = 0.5;
+            int totalUsers = 4070;
+            DistributionHandler d = new DistributionHandler(totalMessages, skew, totalUsers);
+            int sum = 0;
+            for (int i = totalUsers; i >= 1; i--) {
+                float contrib = d.getFromDistribution(i);
+                sum += contrib;
+                System.out.println(i + ":" + contrib);
+            }
+
+            System.out.println("SUM" + ":" + sum);
+
+        }
+    }
+
+    public static class ZipfGenerator {
+
+        private Random rnd = new Random(System.currentTimeMillis());
+        private int size;
+        private double skew;
+        private double bottom = 0;
+
+        public ZipfGenerator(int size, double skew) {
+            this.size = size;
+            this.skew = skew;
+            for (int i = 1; i < size; i++) {
+                this.bottom += (1 / Math.pow(i, this.skew));
+            }
+        }
+
+        // the next() method returns an rank id. The frequency of returned rank
+        // ids are follows Zipf distribution.
+        public int next() {
+            int rank;
+            double friquency = 0;
+            double dice;
+            rank = rnd.nextInt(size);
+            friquency = (1.0d / Math.pow(rank, this.skew)) / this.bottom;
+            dice = rnd.nextDouble();
+            while (!(dice < friquency)) {
+                rank = rnd.nextInt(size);
+                friquency = (1.0d / Math.pow(rank, this.skew)) / this.bottom;
+                dice = rnd.nextDouble();
+            }
+            return rank;
+        }
+
+        // This method returns a probability that the given rank occurs.
+        public double getProbability(int rank) {
+            return (1.0d / Math.pow(rank, this.skew)) / this.bottom;
+        }
+
+        public static void main(String[] args) throws IOException {
+            int total = (int) (3.7 * 1000 * 1000);
+            int skew = 2;
+            int numUsers = 1000 * 1000;
+            /*
+             * if (args.length != 2) { System.out.println("usage:" +
+             * "./zipf size skew"); System.exit(-1); }
+             */
+            BufferedWriter buf = new BufferedWriter(new FileWriter(new File("/tmp/zip_output")));
+            ZipfGenerator zipf = new ZipfGenerator(total, skew);
+            double sum = 0;
+            for (int i = 1; i <= numUsers; i++) {
+                double prob = zipf.getProbability(i);
+                double contribution = (double) (prob * total);
+                String contrib = i + ":" + contribution;
+                buf.write(contrib);
+                buf.write("\n");
+                System.out.println(contrib);
+                sum += contribution;
+            }
+            System.out.println("sum is :" + sum);
+        }
+    }
+
+    public static class PartitionElement implements ILibraryElement {
+        private final String name;
+        private final String host;
+        private final int fbUserKeyMin;
+        private final int fbUserKeyMax;
+        private final int twUserKeyMin;
+        private final int twUserKeyMax;
+        private final int fbMessageIdMin;
+        private final int fbMessageIdMax;
+        private final int twMessageIdMin;
+        private final int twMessageIdMax;
+
+        public PartitionElement(String partitionName, String host, int fbUserKeyMin, int fbUserKeyMax,
+                int twUserKeyMin, int twUserKeyMax, int fbMessageIdMin, int fbMessageIdMax, int twMessageIdMin,
+                int twMessageIdMax) {
+            this.name = partitionName;
+            this.host = host;
+            this.fbUserKeyMin = fbUserKeyMin;
+            this.fbUserKeyMax = fbUserKeyMax;
+            this.twUserKeyMin = twUserKeyMax;
+            this.twUserKeyMax = twUserKeyMax;
+            this.twMessageIdMin = twMessageIdMin;
+            this.twMessageIdMax = twMessageIdMax;
+            this.fbMessageIdMin = fbMessageIdMin;
+            this.fbMessageIdMax = fbMessageIdMax;
+        }
+
+        public String toString() {
+            StringBuilder builder = new StringBuilder();
+            builder.append(name);
+            builder.append(" ");
+            builder.append(host);
+            builder.append("\n");
+            builder.append("fbUser:key:min");
+            builder.append(fbUserKeyMin);
+
+            builder.append("\n");
+            builder.append("fbUser:key:max");
+            builder.append(fbUserKeyMax);
+
+            builder.append("\n");
+            builder.append("twUser:key:min");
+            builder.append(twUserKeyMin);
+
+            builder.append("\n");
+            builder.append("twUser:key:max");
+            builder.append(twUserKeyMax);
+
+            builder.append("\n");
+            builder.append("fbMessage:key:min");
+            builder.append(fbMessageIdMin);
+
+            builder.append("\n");
+            builder.append("fbMessage:key:max");
+            builder.append(fbMessageIdMax);
+
+            builder.append("\n");
+            builder.append("twMessage:key:min");
+            builder.append(twMessageIdMin);
+
+            builder.append("\n");
+            builder.append("twMessage:key:max");
+            builder.append(twMessageIdMax);
+
+            builder.append("\n");
+            builder.append("twMessage:key:max");
+            builder.append(twUserKeyMin);
+
+            return new String(builder);
+        }
+
+        @Override
+        public String getName() {
+            return "Partition";
+        }
+
+    }
+
+    interface ILibraryElement {
+
+        public enum ElementType {
+            PARTITION
+        }
+
+        public String getName();
+
+    }
+
+    public static class Configuration {
+
+        private final float numMB;
+        private final String unit;
+
+        private final List<SourcePartition> sourcePartitions;
+        private List<TargetPartition> targetPartitions;
+
+        public Configuration(float numMB, String unit, List<SourcePartition> partitions) throws IOException {
+            this.numMB = numMB;
+            this.unit = unit;
+            this.sourcePartitions = partitions;
+
+        }
+
+        public float getNumMB() {
+            return numMB;
+        }
+
+        public String getUnit() {
+            return unit;
+        }
+
+        public List<SourcePartition> getSourcePartitions() {
+            return sourcePartitions;
+        }
+
+        public List<TargetPartition> getTargetPartitions() {
+            return targetPartitions;
+        }
+
+        public void setTargetPartitions(List<TargetPartition> targetPartitions) {
+            this.targetPartitions = targetPartitions;
+        }
+
+    }
+
+    public static class XMLUtil {
+
+        public static void writeToXML(Configuration conf, String filePath) throws IOException,
+                ParserConfigurationException, TransformerException {
+
+            DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance();
+            DocumentBuilder docBuilder = docFactory.newDocumentBuilder();
+
+            // root elements
+            Document doc = docBuilder.newDocument();
+            Element rootElement = doc.createElement("Partitions");
+            doc.appendChild(rootElement);
+
+            int index = 0;
+            for (TargetPartition partition : conf.getTargetPartitions()) {
+                writePartitionElement(conf.getSourcePartitions().get(index), partition, rootElement, doc);
+            }
+
+            TransformerFactory transformerFactory = TransformerFactory.newInstance();
+            Transformer transformer = transformerFactory.newTransformer();
+
+            transformer.setOutputProperty(OutputKeys.ENCODING, "utf-8");
+            transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "no");
+            transformer.setOutputProperty(OutputKeys.INDENT, "yes");
+            transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "2");
+
+            DOMSource source = new DOMSource(doc);
+            StreamResult result = new StreamResult(new File(filePath));
+
+            transformer.transform(source, result);
+
+        }
+
+        public static void writePartitionInfo(Configuration conf, String filePath) throws IOException {
+            BufferedWriter bw = new BufferedWriter(new FileWriter(filePath));
+            for (SourcePartition sp : conf.getSourcePartitions()) {
+                bw.write(sp.getHost() + ":" + sp.getName() + ":" + sp.getPath());
+                bw.write("\n");
+            }
+            bw.close();
+        }
+
+        public static Document getDocument(String filePath) throws Exception {
+            File inputFile = new File(filePath);
+            DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
+            DocumentBuilder db = dbf.newDocumentBuilder();
+            Document doc = db.parse(inputFile);
+            doc.getDocumentElement().normalize();
+            return doc;
+        }
+
+        public static Configuration getConfiguration(String filePath) throws Exception {
+            Configuration conf = getConfiguration(getDocument(filePath));
+            PartitionMetrics metrics = new PartitionMetrics(conf.getNumMB(), conf.getUnit(), conf.getSourcePartitions()
+                    .size());
+            List<TargetPartition> targetPartitions = getTargetPartitions(metrics, conf.getSourcePartitions());
+            conf.setTargetPartitions(targetPartitions);
+            return conf;
+        }
+
+        public static Configuration getConfiguration(Document document) throws IOException {
+            Element rootEle = document.getDocumentElement();
+            NodeList nodeList = rootEle.getChildNodes();
+            float size = Float.parseFloat(getStringValue((Element) nodeList, "size"));
+            String unit = getStringValue((Element) nodeList, "unit");
+            List<SourcePartition> sourcePartitions = getSourcePartitions(document);
+            return new Configuration(size, unit, sourcePartitions);
+        }
+
+        public static List<SourcePartition> getSourcePartitions(Document document) {
+            Element rootEle = document.getDocumentElement();
+            NodeList nodeList = rootEle.getElementsByTagName("partition");
+            List<SourcePartition> sourcePartitions = new ArrayList<SourcePartition>();
+            for (int i = 0; i < nodeList.getLength(); i++) {
+                Node node = nodeList.item(i);
+                sourcePartitions.add(getSourcePartition((Element) node));
+            }
+            return sourcePartitions;
+        }
+
+        public static SourcePartition getSourcePartition(Element functionElement) {
+            String name = getStringValue(functionElement, "name");
+            String host = getStringValue(functionElement, "host");
+            String path = getStringValue(functionElement, "path");
+            SourcePartition sp = new SourcePartition(name, host, path);
+            return sp;
+        }
+
+        public static String getStringValue(Element element, String tagName) {
+            String textValue = null;
+            NodeList nl = element.getElementsByTagName(tagName);
+            if (nl != null && nl.getLength() > 0) {
+                Element el = (Element) nl.item(0);
+                textValue = el.getFirstChild().getNodeValue();
+            }
+            return textValue;
+        }
+
+        public static PartitionConfiguration getPartitionConfiguration(String filePath, String partitionName)
+                throws Exception {
+            PartitionConfiguration pconf = getPartitionConfigurations(getDocument(filePath), partitionName);
+            return pconf;
+        }
+
+        public static PartitionConfiguration getPartitionConfigurations(Document document, String partitionName)
+                throws IOException {
+
+            Element rootEle = document.getDocumentElement();
+            NodeList nodeList = rootEle.getElementsByTagName("Partition");
+
+            for (int i = 0; i < nodeList.getLength(); i++) {
+                Node node = nodeList.item(i);
+                Element nodeElement = (Element) node;
+                String name = getStringValue(nodeElement, "name");
+                if (!name.equalsIgnoreCase(partitionName)) {
+                    continue;
+                }
+                String host = getStringValue(nodeElement, "host");
+                String path = getStringValue(nodeElement, "path");
+
+                String fbUserKeyMin = getStringValue(nodeElement, "fbUserKeyMin");
+                String fbUserKeyMax = getStringValue(nodeElement, "fbUserKeyMax");
+                String twUserKeyMin = getStringValue(nodeElement, "twUserKeyMin");
+                String twUserKeyMax = getStringValue(nodeElement, "twUserKeyMax");
+                String fbMessageKeyMin = getStringValue(nodeElement, "fbMessageKeyMin");
+
+                String fbMessageKeyMax = getStringValue(nodeElement, "fbMessageKeyMax");
+                String twMessageKeyMin = getStringValue(nodeElement, "twMessageKeyMin");
+                String twMessageKeyMax = getStringValue(nodeElement, "twMessageKeyMax");
+                String numCommonUsers = getStringValue(nodeElement, "numCommonUsers");
+
+                SourcePartition sp = new SourcePartition(name, host, path);
+
+                TargetPartition tp = new TargetPartition(partitionName, host, path, Integer.parseInt(fbUserKeyMin),
+                        Integer.parseInt(fbUserKeyMax), Integer.parseInt(twUserKeyMin), Integer.parseInt(twUserKeyMax),
+                        Integer.parseInt(fbMessageKeyMin), Integer.parseInt(fbMessageKeyMax),
+                        Integer.parseInt(twMessageKeyMin), Integer.parseInt(twMessageKeyMax),
+                        Integer.parseInt(numCommonUsers));
+                PartitionConfiguration pc = new PartitionConfiguration(sp, tp);
+                return pc;
+            }
+            return null;
+        }
+
+        public static List<TargetPartition> getTargetPartitions(PartitionMetrics metrics,
+                List<SourcePartition> sourcePartitions) {
+            List<TargetPartition> partitions = new ArrayList<TargetPartition>();
+            int fbUserKeyMin = 1;
+            int twUserKeyMin = 1;
+            int fbMessageIdMin = 1;
+            int twMessageIdMin = 1;
+
+            for (SourcePartition sp : sourcePartitions) {
+                int fbUserKeyMax = fbUserKeyMin + metrics.getFbOnlyUsers() + metrics.getCommonUsers() - 1;
+                int twUserKeyMax = twUserKeyMin + metrics.getTwitterOnlyUsers() + metrics.getCommonUsers() - 1;
+
+                int fbMessageIdMax = fbMessageIdMin + metrics.getFbMessages() - 1;
+                int twMessageIdMax = twMessageIdMin + metrics.getTwMessages() - 1;
+                TargetPartition pe = new TargetPartition(sp.getName(), sp.getHost(), sp.getPath(), fbUserKeyMin,
+                        fbUserKeyMax, twUserKeyMin, twUserKeyMax, fbMessageIdMin, fbMessageIdMax, twMessageIdMin,
+                        twMessageIdMax, metrics.getCommonUsers());
+                partitions.add(pe);
+
+                fbUserKeyMin = fbUserKeyMax + 1;
+                twUserKeyMin = twUserKeyMax + 1;
+
+                fbMessageIdMin = fbMessageIdMax + 1;
+                twMessageIdMin = twMessageIdMax + 1;
+            }
+
+            return partitions;
+        }
+
+        public static void writePartitionElement(SourcePartition sourcePartition, TargetPartition partition,
+                Element rootElement, Document doc) {
+            // staff elements
+            Element pe = doc.createElement("Partition");
+            rootElement.appendChild(pe);
+
+            // name element
+            Element name = doc.createElement("name");
+            name.appendChild(doc.createTextNode("" + partition.getName()));
+            pe.appendChild(name);
+
+            // host element
+            Element host = doc.createElement("host");
+            host.appendChild(doc.createTextNode("" + partition.getHost()));
+            pe.appendChild(host);
+
+            // path element
+            Element path = doc.createElement("path");
+            path.appendChild(doc.createTextNode("" + partition.getPath()));
+            pe.appendChild(path);
+
+            // fbUserKeyMin element
+            Element fbUserKeyMin = doc.createElement("fbUserKeyMin");
+            fbUserKeyMin.appendChild(doc.createTextNode("" + partition.getFbUserKeyMin()));
+            pe.appendChild(fbUserKeyMin);
+
+            // fbUserKeyMax element
+            Element fbUserKeyMax = doc.createElement("fbUserKeyMax");
+            fbUserKeyMax.appendChild(doc.createTextNode("" + partition.getFbUserKeyMax()));
+            pe.appendChild(fbUserKeyMax);
+
+            // twUserKeyMin element
+            Element twUserKeyMin = doc.createElement("twUserKeyMin");
+            twUserKeyMin.appendChild(doc.createTextNode("" + partition.getTwUserKeyMin()));
+            pe.appendChild(twUserKeyMin);
+
+            // twUserKeyMax element
+            Element twUserKeyMax = doc.createElement("twUserKeyMax");
+            twUserKeyMax.appendChild(doc.createTextNode("" + partition.getTwUserKeyMax()));
+            pe.appendChild(twUserKeyMax);
+
+            // fbMessgeKeyMin element
+            Element fbMessageKeyMin = doc.createElement("fbMessageKeyMin");
+            fbMessageKeyMin.appendChild(doc.createTextNode("" + partition.getFbMessageIdMin()));
+            pe.appendChild(fbMessageKeyMin);
+
+            // fbMessgeKeyMin element
+            Element fbMessageKeyMax = doc.createElement("fbMessageKeyMax");
+            fbMessageKeyMax.appendChild(doc.createTextNode("" + partition.getFbMessageIdMax()));
+            pe.appendChild(fbMessageKeyMax);
+
+            // twMessgeKeyMin element
+            Element twMessageKeyMin = doc.createElement("twMessageKeyMin");
+            twMessageKeyMin.appendChild(doc.createTextNode("" + partition.getTwMessageIdMin()));
+            pe.appendChild(twMessageKeyMin);
+
+            // twMessgeKeyMin element
+            Element twMessageKeyMax = doc.createElement("twMessageKeyMax");
+            twMessageKeyMax.appendChild(doc.createTextNode("" + partition.getTwMessageIdMax()));
+            pe.appendChild(twMessageKeyMax);
+
+            // twMessgeKeyMin element
+            Element numCommonUsers = doc.createElement("numCommonUsers");
+            numCommonUsers.appendChild(doc.createTextNode("" + partition.getCommonUsers()));
+            pe.appendChild(numCommonUsers);
+
+        }
+
+        public static void main(String args[]) throws Exception {
+            String confFile = "/Users/rgrove1/work/research/asterix/icde/data-gen/conf/conf.xml";
+            String outputPath = "/Users/rgrove1/work/research/asterix/icde/data-gen/output/conf-output.xml";
+            Configuration conf = getConfiguration(confFile);
+            writeToXML(conf, outputPath);
+        }
+
+    }
+
+    public static class Date {
+
+        private int day;
+        private int month;
+        private int year;
+
+        public Date(int month, int day, int year) {
+            this.month = month;
+            this.day = day;
+            this.year = year;
+        }
+
+        public void reset(int month, int day, int year) {
+            this.month = month;
+            this.day = day;
+            this.year = year;
+        }
+
+        public int getDay() {
+            return day;
+        }
+
+        public int getMonth() {
+            return month;
+        }
+
+        public int getYear() {
+            return year;
+        }
+
+        public Date() {
+        }
+
+        public String toString() {
+            StringBuilder builder = new StringBuilder();
+            builder.append("date");
+            builder.append("(\"");
+            builder.append(year);
+            builder.append("-");
+            builder.append(month < 10 ? "0" + month : "" + month);
+            builder.append("-");
+            builder.append(day < 10 ? "0" + day : "" + day);
+            builder.append("\")");
+            return builder.toString();
+        }
+
+        public void setDay(int day) {
+            this.day = day;
+        }
+
+        public void setMonth(int month) {
+            this.month = month;
+        }
+
+        public void setYear(int year) {
+            this.year = year;
+        }
+    }
+
+    public static class PartitionMetrics {
+
+        private final int fbMessages;
+        private final int twMessages;
+
+        private final int fbOnlyUsers;
+        private final int twitterOnlyUsers;
+        private final int commonUsers;
+
+        public PartitionMetrics(float number, String unit, int numPartitions) throws IOException {
+
+            int factor = 0;
+            if (unit.equalsIgnoreCase("MB")) {
+                factor = 1024 * 1024;
+            } else if (unit.equalsIgnoreCase("GB")) {
+                factor = 1024 * 1024 * 1024;
+            } else if (unit.equalsIgnoreCase("TB")) {
+                factor = 1024 * 1024 * 1024 * 1024;
+            } else
+                throw new IOException("Invalid unit:" + unit);
+
+            fbMessages = (int) (((number * factor * 0.80) / 258) / numPartitions);
+            twMessages = (int) (fbMessages * 1.1 / 0.35);
+
+            fbOnlyUsers = (int) ((number * factor * 0.20 * 0.0043)) / numPartitions;
+            twitterOnlyUsers = (int) (0.25 * fbOnlyUsers);
+            commonUsers = (int) (0.1 * fbOnlyUsers);
+        }
+
+        public int getFbMessages() {
+            return fbMessages;
+        }
+
+        public int getTwMessages() {
+            return twMessages;
+        }
+
+        public int getFbOnlyUsers() {
+            return fbOnlyUsers;
+        }
+
+        public int getTwitterOnlyUsers() {
+            return twitterOnlyUsers;
+        }
+
+        public int getCommonUsers() {
+            return commonUsers;
+        }
+
+    }
+
+    public static String[] lastNames = { "Hoopengarner", "Harrow", "Gardner", "Blyant", "Best", "Buttermore", "Gronko",
+            "Mayers", "Countryman", "Neely", "Ruhl", "Taggart", "Bash", "Cason", "Hil", "Zalack", "Mingle", "Carr",
+            "Rohtin", "Wardle", "Pullman", "Wire", "Kellogg", "Hiles", "Keppel", "Bratton", "Sutton", "Wickes",
+            "Muller", "Friedline", "Llora", "Elizabeth", "Anderson", "Gaskins", "Rifler", "Vinsant", "Stanfield",
+            "Black", "Guest", "Hujsak", "Carter", "Weidemann", "Hays", "Patton", "Hayhurst", "Paynter", "Cressman",
+            "Fiddler", "Evans", "Sherlock", "Woodworth", "Jackson", "Bloise", "Schneider", "Ring", "Kepplinger",
+            "James", "Moon", "Bennett", "Bashline", "Ryals", "Zeal", "Christman", "Milliron", "Nash", "Ewing", "Camp",
+            "Mason", "Richardson", "Bowchiew", "Hahn", "Wilson", "Wood", "Toyley", "Williamson", "Lafortune", "Errett",
+            "Saltser", "Hirleman", "Brindle", "Newbiggin", "Ulery", "Lambert", "Shick", "Kuster", "Moore", "Finck",
+            "Powell", "Jolce", "Townsend", "Sauter", "Cowher", "Wolfe", "Cavalet", "Porter", "Laborde", "Ballou",
+            "Murray", "Stoddard", "Pycroft", "Milne", "King", "Todd", "Staymates", "Hall", "Romanoff", "Keilbach",
+            "Sandford", "Hamilton", "Fye", "Kline", "Weeks", "Mcelroy", "Mccullough", "Bryant", "Hill", "Moore",
+            "Ledgerwood", "Prevatt", "Eckert", "Read", "Hastings", "Doverspike", "Allshouse", "Bryan", "Mccallum",
+            "Lombardi", "Mckendrick", "Cattley", "Barkley", "Steiner", "Finlay", "Priebe", "Armitage", "Hall", "Elder",
+            "Erskine", "Hatcher", "Walker", "Pearsall", "Dunkle", "Haile", "Adams", "Miller", "Newbern", "Basinger",
+            "Fuhrer", "Brinigh", "Mench", "Blackburn", "Bastion", "Mccune", "Bridger", "Hynes", "Quinn", "Courtney",
+            "Geddinge", "Field", "Seelig", "Cable", "Earhart", "Harshman", "Roby", "Beals", "Berry", "Reed", "Hector",
+            "Pittman", "Haverrman", "Kalp", "Briner", "Joghs", "Cowart", "Close", "Wynne", "Harden", "Weldy",
+            "Stephenson", "Hildyard", "Moberly", "Wells", "Mackendoerfer", "Fisher", "Oppie", "Oneal", "Churchill",
+            "Keister", "Alice", "Tavoularis", "Fisher", "Hair", "Burns", "Veith", "Wile", "Fuller", "Fields", "Clark",
+            "Randolph", "Stone", "Mcclymonds", "Holtzer", "Donkin", "Wilkinson", "Rosensteel", "Albright", "Stahl",
+            "Fox", "Kadel", "Houser", "Hanseu", "Henderson", "Davis", "Bicknell", "Swain", "Mercer", "Holdeman",
+            "Enderly", "Caesar", "Margaret", "Munshower", "Elless", "Lucy", "Feufer", "Schofield", "Graham",
+            "Blatenberger", "Benford", "Akers", "Campbell", "Ann", "Sadley", "Ling", "Gongaware", "Schmidt", "Endsley",
+            "Groah", "Flanders", "Reichard", "Lowstetter", "Sandblom", "Griffis", "Basmanoff", "Coveney", "Hawker",
+            "Archibald", "Hutton", "Barnes", "Diegel", "Raybould", "Focell", "Breitenstein", "Murray", "Chauvin",
+            "Busk", "Pheleps", "Teagarden", "Northey", "Baumgartner", "Fleming", "Harris", "Parkinson", "Carpenter",
+            "Whirlow", "Bonner", "Wortman", "Rogers", "Scott", "Lowe", "Mckee", "Huston", "Bullard", "Throckmorton",
+            "Rummel", "Mathews", "Dull", "Saline", "Tue", "Woolery", "Lalty", "Schrader", "Ramsey", "Eisenmann",
+            "Philbrick", "Sybilla", "Wallace", "Fonblanque", "Paul", "Orbell", "Higgens", "Casteel", "Franks",
+            "Demuth", "Eisenman", "Hay", "Robinson", "Fischer", "Hincken", "Wylie", "Leichter", "Bousum",
+            "Littlefield", "Mcdonald", "Greif", "Rhodes", "Wall", "Steele", "Baldwin", "Smith", "Stewart", "Schere",
+            "Mary", "Aultman", "Emrick", "Guess", "Mitchell", "Painter", "Aft", "Hasely", "Weldi", "Loewentsein",
+            "Poorbaugh", "Kepple", "Noton", "Judge", "Jackson", "Style", "Adcock", "Diller", "Marriman", "Johnston",
+            "Children", "Monahan", "Ehret", "Shaw", "Congdon", "Pinney", "Millard", "Crissman", "Tanner", "Rose",
+            "Knisely", "Cypret", "Sommer", "Poehl", "Hardie", "Bender", "Overholt", "Gottwine", "Beach", "Leslie",
+            "Trevithick", "Langston", "Magor", "Shotts", "Howe", "Hunter", "Cross", "Kistler", "Dealtry", "Christner",
+            "Pennington", "Thorley", "Eckhardstein", "Van", "Stroh", "Stough", "Stall", "Beedell", "Shea", "Garland",
+            "Mays", "Pritchard", "Frankenberger", "Rowley", "Lane", "Baum", "Alliman", "Park", "Jardine", "Butler",
+            "Cherry", "Kooser", "Baxter", "Billimek", "Downing", "Hurst", "Wood", "Baird", "Watkins", "Edwards",
+            "Kemerer", "Harding", "Owens", "Eiford", "Keener", "Garneis", "Fiscina", "Mang", "Draudy", "Mills",
+            "Gibson", "Reese", "Todd", "Ramos", "Levett", "Wilks", "Ward", "Mosser", "Dunlap", "Kifer", "Christopher",
+            "Ashbaugh", "Wynter", "Rawls", "Cribbs", "Haynes", "Thigpen", "Schreckengost", "Bishop", "Linton",
+            "Chapman", "James", "Jerome", "Hook", "Omara", "Houston", "Maclagan", "Sandys", "Pickering", "Blois",
+            "Dickson", "Kemble", "Duncan", "Woodward", "Southern", "Henley", "Treeby", "Cram", "Elsas", "Driggers",
+            "Warrick", "Overstreet", "Hindman", "Buck", "Sulyard", "Wentzel", "Swink", "Butt", "Schaeffer",
+            "Hoffhants", "Bould", "Willcox", "Lotherington", "Bagley", "Graff", "White", "Wheeler", "Sloan",
+            "Rodacker", "Hanford", "Jowers", "Kunkle", "Cass", "Powers", "Gilman", "Mcmichaels", "Hobbs", "Herndon",
+            "Prescott", "Smail", "Mcdonald", "Biery", "Orner", "Richards", "Mueller", "Isaman", "Bruxner", "Goodman",
+            "Barth", "Turzanski", "Vorrasi", "Stainforth", "Nehling", "Rahl", "Erschoff", "Greene", "Mckinnon",
+            "Reade", "Smith", "Pery", "Roose", "Greenwood", "Weisgarber", "Curry", "Holts", "Zadovsky", "Parrish",
+            "Putnam", "Munson", "Mcindoe", "Nickolson", "Brooks", "Bollinger", "Stroble", "Siegrist", "Fulton",
+            "Tomey", "Zoucks", "Roberts", "Otis", "Clarke", "Easter", "Johnson", "Fylbrigg", "Taylor", "Swartzbaugh",
+            "Weinstein", "Gadow", "Sayre", "Marcotte", "Wise", "Atweeke", "Mcfall", "Napier", "Eisenhart", "Canham",
+            "Sealis", "Baughman", "Gertraht", "Losey", "Laurence", "Eva", "Pershing", "Kern", "Pirl", "Rega",
+            "Sanborn", "Kanaga", "Sanders", "Anderson", "Dickinson", "Osteen", "Gettemy", "Crom", "Snyder", "Reed",
+            "Laurenzi", "Riggle", "Tillson", "Fowler", "Raub", "Jenner", "Koepple", "Soames", "Goldvogel", "Dimsdale",
+            "Zimmer", "Giesen", "Baker", "Beail", "Mortland", "Bard", "Sanner", "Knopsnider", "Jenkins", "Bailey",
+            "Werner", "Barrett", "Faust", "Agg", "Tomlinson", "Williams", "Little", "Greenawalt", "Wells", "Wilkins",
+            "Gisiko", "Bauerle", "Harrold", "Prechtl", "Polson", "Faast", "Winton", "Garneys", "Peters", "Potter",
+            "Porter", "Tennant", "Eve", "Dugger", "Jones", "Burch", "Cowper", "Whittier" };
+
+    public static String[] firstNames = { "Albert", "Jacquelin", "Dona", "Alia", "Mayme", "Genoveva", "Emma", "Lena",
+            "Melody", "Vilma", "Katelyn", "Jeremy", "Coral", "Leann", "Lita", "Gilda", "Kayla", "Alvina", "Maranda",
+            "Verlie", "Khadijah", "Karey", "Patrice", "Kallie", "Corey", "Mollie", "Daisy", "Melanie", "Sarita",
+            "Nichole", "Pricilla", "Terresa", "Berneice", "Arianne", "Brianne", "Lavinia", "Ulrike", "Lesha", "Adell",
+            "Ardelle", "Marisha", "Laquita", "Karyl", "Maryjane", "Kendall", "Isobel", "Raeann", "Heike", "Barbera",
+            "Norman", "Yasmine", "Nevada", "Mariam", "Edith", "Eugena", "Lovie", "Maren", "Bennie", "Lennie", "Tamera",
+            "Crystal", "Randi", "Anamaria", "Chantal", "Jesenia", "Avis", "Shela", "Randy", "Laurena", "Sharron",
+            "Christiane", "Lorie", "Mario", "Elizabeth", "Reina", "Adria", "Lakisha", "Brittni", "Azzie", "Dori",
+            "Shaneka", "Asuncion", "Katheryn", "Laurice", "Sharita", "Krystal", "Reva", "Inger", "Alpha", "Makeda",
+            "Anabel", "Loni", "Tiara", "Meda", "Latashia", "Leola", "Chin", "Daisey", "Ivory", "Amalia", "Logan",
+            "Tyler", "Kyong", "Carolann", "Maryetta", "Eufemia", "Anya", "Doreatha", "Lorna", "Rutha", "Ehtel",
+            "Debbie", "Chassidy", "Sang", "Christa", "Lottie", "Chun", "Karine", "Peggie", "Amina", "Melany", "Alayna",
+            "Scott", "Romana", "Naomi", "Christiana", "Salena", "Taunya", "Mitsue", "Regina", "Chelsie", "Charity",
+            "Dacia", "Aletha", "Latosha", "Lia", "Tamica", "Chery", "Bianca", "Shu", "Georgianne", "Myriam", "Austin",
+            "Wan", "Mallory", "Jana", "Georgie", "Jenell", "Kori", "Vicki", "Delfina", "June", "Mellisa", "Catherina",
+            "Claudie", "Tynisha", "Dayle", "Enriqueta", "Belen", "Pia", "Sarai", "Rosy", "Renay", "Kacie", "Frieda",
+            "Cayla", "Elissa", "Claribel", "Sabina", "Mackenzie", "Raina", "Cira", "Mitzie", "Aubrey", "Serafina",
+            "Maria", "Katharine", "Esperanza", "Sung", "Daria", "Billye", "Stefanie", "Kasha", "Holly", "Suzanne",
+            "Inga", "Flora", "Andria", "Genevie", "Eladia", "Janet", "Erline", "Renna", "Georgeanna", "Delorse",
+            "Elnora", "Rudy", "Rima", "Leanora", "Letisha", "Love", "Alverta", "Pinkie", "Domonique", "Jeannie",
+            "Jose", "Jacqueline", "Tara", "Lily", "Erna", "Tennille", "Galina", "Tamala", "Kirby", "Nichelle",
+            "Myesha", "Farah", "Santa", "Ludie", "Kenia", "Yee", "Micheline", "Maryann", "Elaina", "Ethelyn",
+            "Emmaline", "Shanell", "Marina", "Nila", "Alane", "Shakira", "Dorris", "Belinda", "Elois", "Barbie",
+            "Carita", "Gisela", "Lura", "Fransisca", "Helga", "Peg", "Leonarda", "Earlie", "Deetta", "Jacquetta",
+            "Blossom", "Kayleigh", "Deloras", "Keshia", "Christinia", "Dulce", "Bernie", "Sheba", "Lashanda", "Tula",
+            "Claretta", "Kary", "Jeanette", "Lupita", "Lenora", "Hisako", "Sherise", "Glynda", "Adela", "Chia",
+            "Sudie", "Mindy", "Caroyln", "Lindsey", "Xiomara", "Mercedes", "Onie", "Loan", "Alexis", "Tommie",
+            "Donette", "Monica", "Soo", "Camellia", "Lavera", "Valery", "Ariana", "Sophia", "Loris", "Ginette",
+            "Marielle", "Tari", "Julissa", "Alesia", "Suzanna", "Emelda", "Erin", "Ladawn", "Sherilyn", "Candice",
+            "Nereida", "Fairy", "Carl", "Joel", "Marilee", "Gracia", "Cordie", "So", "Shanita", "Drew", "Cassie",
+            "Sherie", "Marget", "Norma", "Delois", "Debera", "Chanelle", "Catarina", "Aracely", "Carlene", "Tricia",
+            "Aleen", "Katharina", "Marguerita", "Guadalupe", "Margorie", "Mandie", "Kathe", "Chong", "Sage", "Faith",
+            "Maryrose", "Stephany", "Ivy", "Pauline", "Susie", "Cristen", "Jenifer", "Annette", "Debi", "Karmen",
+            "Luci", "Shayla", "Hope", "Ocie", "Sharie", "Tami", "Breana", "Kerry", "Rubye", "Lashay", "Sondra",
+            "Katrice", "Brunilda", "Cortney", "Yan", "Zenobia", "Penni", "Addie", "Lavona", "Noel", "Anika",
+            "Herlinda", "Valencia", "Bunny", "Tory", "Victoria", "Carrie", "Mikaela", "Wilhelmina", "Chung",
+            "Hortencia", "Gerda", "Wen", "Ilana", "Sibyl", "Candida", "Victorina", "Chantell", "Casie", "Emeline",
+            "Dominica", "Cecila", "Delora", "Miesha", "Nova", "Sally", "Ronald", "Charlette", "Francisca", "Mina",
+            "Jenna", "Loraine", "Felisa", "Lulu", "Page", "Lyda", "Babara", "Flor", "Walter", "Chan", "Sherika",
+            "Kala", "Luna", "Vada", "Syreeta", "Slyvia", "Karin", "Renata", "Robbi", "Glenda", "Delsie", "Lizzie",
+            "Genia", "Caitlin", "Bebe", "Cory", "Sam", "Leslee", "Elva", "Caren", "Kasie", "Leticia", "Shannan",
+            "Vickey", "Sandie", "Kyle", "Chang", "Terrilyn", "Sandra", "Elida", "Marketta", "Elsy", "Tu", "Carman",
+            "Ashlie", "Vernia", "Albertine", "Vivian", "Elba", "Bong", "Margy", "Janetta", "Xiao", "Teofila", "Danyel",
+            "Nickole", "Aleisha", "Tera", "Cleotilde", "Dara", "Paulita", "Isela", "Maricela", "Rozella", "Marivel",
+            "Aurora", "Melissa", "Carylon", "Delinda", "Marvella", "Candelaria", "Deidre", "Tawanna", "Myrtie",
+            "Milagro", "Emilie", "Coretta", "Ivette", "Suzann", "Ammie", "Lucina", "Lory", "Tena", "Eleanor",
+            "Cherlyn", "Tiana", "Brianna", "Myra", "Flo", "Carisa", "Kandi", "Erlinda", "Jacqulyn", "Fermina", "Riva",
+            "Palmira", "Lindsay", "Annmarie", "Tamiko", "Carline", "Amelia", "Quiana", "Lashawna", "Veola", "Belva",
+            "Marsha", "Verlene", "Alex", "Leisha", "Camila", "Mirtha", "Melva", "Lina", "Arla", "Cythia", "Towanda",
+            "Aracelis", "Tasia", "Aurore", "Trinity", "Bernadine", "Farrah", "Deneen", "Ines", "Betty", "Lorretta",
+            "Dorethea", "Hertha", "Rochelle", "Juli", "Shenika", "Yung", "Lavon", "Deeanna", "Nakia", "Lynnette",
+            "Dinorah", "Nery", "Elene", "Carolee", "Mira", "Franchesca", "Lavonda", "Leida", "Paulette", "Dorine",
+            "Allegra", "Keva", "Jeffrey", "Bernardina", "Maryln", "Yoko", "Faviola", "Jayne", "Lucilla", "Charita",
+            "Ewa", "Ella", "Maggie", "Ivey", "Bettie", "Jerri", "Marni", "Bibi", "Sabrina", "Sarah", "Marleen",
+            "Katherin", "Remona", "Jamika", "Antonina", "Oliva", "Lajuana", "Fonda", "Sigrid", "Yael", "Billi",
+            "Verona", "Arminda", "Mirna", "Tesha", "Katheleen", "Bonita", "Kamilah", "Patrica", "Julio", "Shaina",
+            "Mellie", "Denyse", "Deandrea", "Alena", "Meg", "Kizzie", "Krissy", "Karly", "Alleen", "Yahaira", "Lucie",
+            "Karena", "Elaine", "Eloise", "Buena", "Marianela", "Renee", "Nan", "Carolynn", "Windy", "Avril", "Jane",
+            "Vida", "Thea", "Marvel", "Rosaline", "Tifany", "Robena", "Azucena", "Carlota", "Mindi", "Andera", "Jenny",
+            "Courtney", "Lyndsey", "Willette", "Kristie", "Shaniqua", "Tabatha", "Ngoc", "Una", "Marlena", "Louetta",
+            "Vernie", "Brandy", "Jacquelyne", "Jenelle", "Elna", "Erminia", "Ida", "Audie", "Louis", "Marisol",
+            "Shawana", "Harriette", "Karol", "Kitty", "Esmeralda", "Vivienne", "Eloisa", "Iris", "Jeanice", "Cammie",
+            "Jacinda", "Shena", "Floy", "Theda", "Lourdes", "Jayna", "Marg", "Kati", "Tanna", "Rosalyn", "Maxima",
+            "Soon", "Angelika", "Shonna", "Merle", "Kassandra", "Deedee", "Heidi", "Marti", "Renae", "Arleen",
+            "Alfredia", "Jewell", "Carley", "Pennie", "Corina", "Tonisha", "Natividad", "Lilliana", "Darcie", "Shawna",
+            "Angel", "Piedad", "Josefa", "Rebbeca", "Natacha", "Nenita", "Petrina", "Carmon", "Chasidy", "Temika",
+            "Dennise", "Renetta", "Augusta", "Shirlee", "Valeri", "Casimira", "Janay", "Berniece", "Deborah", "Yaeko",
+            "Mimi", "Digna", "Irish", "Cher", "Yong", "Lucila", "Jimmie", "Junko", "Lezlie", "Waneta", "Sandee",
+            "Marquita", "Eura", "Freeda", "Annabell", "Laree", "Jaye", "Wendy", "Toshia", "Kylee", "Aleta", "Emiko",
+            "Clorinda", "Sixta", "Audrea", "Juanita", "Birdie", "Reita", "Latanya", "Nia", "Leora", "Laurine",
+            "Krysten", "Jerrie", "Chantel", "Ira", "Sena", "Andre", "Jann", "Marla", "Precious", "Katy", "Gabrielle",
+            "Yvette", "Brook", "Shirlene", "Eldora", "Laura", "Milda", "Euna", "Jettie", "Debora", "Lise", "Edythe",
+            "Leandra", "Shandi", "Araceli", "Johanne", "Nieves", "Denese", "Carmelita", "Nohemi", "Annice", "Natalie",
+            "Yolande", "Jeffie", "Vashti", "Vickie", "Obdulia", "Youlanda", "Lupe", "Tomoko", "Monserrate", "Domitila",
+            "Etsuko", "Adrienne", "Lakesha", "Melissia", "Odessa", "Meagan", "Veronika", "Jolyn", "Isabelle", "Leah",
+            "Rhiannon", "Gianna", "Audra", "Sommer", "Renate", "Perla", "Thao", "Myong", "Lavette", "Mark", "Emilia",
+            "Ariane", "Karl", "Dorie", "Jacquie", "Mia", "Malka", "Shenita", "Tashina", "Christine", "Cherri", "Roni",
+            "Fran", "Mildred", "Sara", "Clarissa", "Fredia", "Elease", "Samuel", "Earlene", "Vernita", "Mae", "Concha",
+            "Renea", "Tamekia", "Hye", "Ingeborg", "Tessa", "Kelly", "Kristin", "Tam", "Sacha", "Kanisha", "Jillian",
+            "Tiffanie", "Ashlee", "Madelyn", "Donya", "Clementine", "Mickie", "My", "Zena", "Terrie", "Samatha",
+            "Gertie", "Tarra", "Natalia", "Sharlene", "Evie", "Shalon", "Rosalee", "Numbers", "Jodi", "Hattie",
+            "Naoma", "Valene", "Whitley", "Claude", "Alline", "Jeanne", "Camie", "Maragret", "Viola", "Kris", "Marlo",
+            "Arcelia", "Shari", "Jalisa", "Corrie", "Eleonor", "Angelyn", "Merry", "Lauren", "Melita", "Gita",
+            "Elenor", "Aurelia", "Janae", "Lyndia", "Margeret", "Shawanda", "Rolande", "Shirl", "Madeleine", "Celinda",
+            "Jaleesa", "Shemika", "Joye", "Tisa", "Trudie", "Kathrine", "Clarita", "Dinah", "Georgia", "Antoinette",
+            "Janis", "Suzette", "Sherri", "Herta", "Arie", "Hedy", "Cassi", "Audrie", "Caryl", "Jazmine", "Jessica",
+            "Beverly", "Elizbeth", "Marylee", "Londa", "Fredericka", "Argelia", "Nana", "Donnette", "Damaris",
+            "Hailey", "Jamee", "Kathlene", "Glayds", "Lydia", "Apryl", "Verla", "Adam", "Concepcion", "Zelda",
+            "Shonta", "Vernice", "Detra", "Meghann", "Sherley", "Sheri", "Kiyoko", "Margarita", "Adaline", "Mariela",
+            "Velda", "Ailene", "Juliane", "Aiko", "Edyth", "Cecelia", "Shavon", "Florance", "Madeline", "Rheba",
+            "Deann", "Ignacia", "Odelia", "Heide", "Mica", "Jennette", "Maricruz", "Ouida", "Darcy", "Laure",
+            "Justina", "Amada", "Laine", "Cruz", "Sunny", "Francene", "Roxanna", "Nam", "Nancie", "Deanna", "Letty",
+            "Britni", "Kazuko", "Lacresha", "Simon", "Caleb", "Milton", "Colton", "Travis", "Miles", "Jonathan",
+            "Logan", "Rolf", "Emilio", "Roberto", "Marcus", "Tim", "Delmar", "Devon", "Kurt", "Edward", "Jeffrey",
+            "Elvis", "Alfonso", "Blair", "Wm", "Sheldon", "Leonel", "Michal", "Federico", "Jacques", "Leslie",
+            "Augustine", "Hugh", "Brant", "Hong", "Sal", "Modesto", "Curtis", "Jefferey", "Adam", "John", "Glenn",
+            "Vance", "Alejandro", "Refugio", "Lucio", "Demarcus", "Chang", "Huey", "Neville", "Preston", "Bert",
+            "Abram", "Foster", "Jamison", "Kirby", "Erich", "Manual", "Dustin", "Derrick", "Donnie", "Jospeh", "Chris",
+            "Josue", "Stevie", "Russ", "Stanley", "Nicolas", "Samuel", "Waldo", "Jake", "Max", "Ernest", "Reinaldo",
+            "Rene", "Gale", "Morris", "Nathan", "Maximo", "Courtney", "Theodore", "Octavio", "Otha", "Delmer",
+            "Graham", "Dean", "Lowell", "Myles", "Colby", "Boyd", "Adolph", "Jarrod", "Nick", "Mark", "Clinton", "Kim",
+            "Sonny", "Dalton", "Tyler", "Jody", "Orville", "Luther", "Rubin", "Hollis", "Rashad", "Barton", "Vicente",
+            "Ted", "Rick", "Carmine", "Clifton", "Gayle", "Christopher", "Jessie", "Bradley", "Clay", "Theo", "Josh",
+            "Mitchell", "Boyce", "Chung", "Eugenio", "August", "Norbert", "Sammie", "Jerry", "Adan", "Edmundo",
+            "Homer", "Hilton", "Tod", "Kirk", "Emmett", "Milan", "Quincy", "Jewell", "Herb", "Steve", "Carmen",
+            "Bobby", "Odis", "Daron", "Jeremy", "Carl", "Hunter", "Tuan", "Thurman", "Asa", "Brenton", "Shane",
+            "Donny", "Andreas", "Teddy", "Dario", "Cyril", "Hoyt", "Teodoro", "Vincenzo", "Hilario", "Daren",
+            "Agustin", "Marquis", "Ezekiel", "Brendan", "Johnson", "Alden", "Richie", "Granville", "Chad", "Joseph",
+            "Lamont", "Jordon", "Gilberto", "Chong", "Rosendo", "Eddy", "Rob", "Dewitt", "Andre", "Titus", "Russell",
+            "Rigoberto", "Dick", "Garland", "Gabriel", "Hank", "Darius", "Ignacio", "Lazaro", "Johnie", "Mauro",
+            "Edmund", "Trent", "Harris", "Osvaldo", "Marvin", "Judson", "Rodney", "Randall", "Renato", "Richard",
+            "Denny", "Jon", "Doyle", "Cristopher", "Wilson", "Christian", "Jamie", "Roland", "Ken", "Tad", "Romeo",
+            "Seth", "Quinton", "Byron", "Ruben", "Darrel", "Deandre", "Broderick", "Harold", "Ty", "Monroe", "Landon",
+            "Mohammed", "Angel", "Arlen", "Elias", "Andres", "Carlton", "Numbers", "Tony", "Thaddeus", "Issac",
+            "Elmer", "Antoine", "Ned", "Fermin", "Grover", "Benito", "Abdul", "Cortez", "Eric", "Maxwell", "Coy",
+            "Gavin", "Rich", "Andy", "Del", "Giovanni", "Major", "Efren", "Horacio", "Joaquin", "Charles", "Noah",
+            "Deon", "Pasquale", "Reed", "Fausto", "Jermaine", "Irvin", "Ray", "Tobias", "Carter", "Yong", "Jorge",
+            "Brent", "Daniel", "Zane", "Walker", "Thad", "Shaun", "Jaime", "Mckinley", "Bradford", "Nathanial",
+            "Jerald", "Aubrey", "Virgil", "Abel", "Philip", "Chester", "Chadwick", "Dominick", "Britt", "Emmitt",
+            "Ferdinand", "Julian", "Reid", "Santos", "Dwain", "Morgan", "James", "Marion", "Micheal", "Eddie", "Brett",
+            "Stacy", "Kerry", "Dale", "Nicholas", "Darrick", "Freeman", "Scott", "Newton", "Sherman", "Felton",
+            "Cedrick", "Winfred", "Brad", "Fredric", "Dewayne", "Virgilio", "Reggie", "Edgar", "Heriberto", "Shad",
+            "Timmy", "Javier", "Nestor", "Royal", "Lynn", "Irwin", "Ismael", "Jonas", "Wiley", "Austin", "Kieth",
+            "Gonzalo", "Paris", "Earnest", "Arron", "Jarred", "Todd", "Erik", "Maria", "Chauncey", "Neil", "Conrad",
+            "Maurice", "Roosevelt", "Jacob", "Sydney", "Lee", "Basil", "Louis", "Rodolfo", "Rodger", "Roman", "Corey",
+            "Ambrose", "Cristobal", "Sylvester", "Benton", "Franklin", "Marcelo", "Guillermo", "Toby", "Jeramy",
+            "Donn", "Danny", "Dwight", "Clifford", "Valentine", "Matt", "Jules", "Kareem", "Ronny", "Lonny", "Son",
+            "Leopoldo", "Dannie", "Gregg", "Dillon", "Orlando", "Weston", "Kermit", "Damian", "Abraham", "Walton",
+            "Adrian", "Rudolf", "Will", "Les", "Norberto", "Fred", "Tyrone", "Ariel", "Terry", "Emmanuel", "Anderson",
+            "Elton", "Otis", "Derek", "Frankie", "Gino", "Lavern", "Jarod", "Kenny", "Dane", "Keenan", "Bryant",
+            "Eusebio", "Dorian", "Ali", "Lucas", "Wilford", "Jeremiah", "Warner", "Woodrow", "Galen", "Bob",
+            "Johnathon", "Amado", "Michel", "Harry", "Zachery", "Taylor", "Booker", "Hershel", "Mohammad", "Darrell",
+            "Kyle", "Stuart", "Marlin", "Hyman", "Jeffery", "Sidney", "Merrill", "Roy", "Garrett", "Porter", "Kenton",
+            "Giuseppe", "Terrance", "Trey", "Felix", "Buster", "Von", "Jackie", "Linwood", "Darron", "Francisco",
+            "Bernie", "Diego", "Brendon", "Cody", "Marco", "Ahmed", "Antonio", "Vince", "Brooks", "Kendrick", "Ross",
+            "Mohamed", "Jim", "Benny", "Gerald", "Pablo", "Charlie", "Antony", "Werner", "Hipolito", "Minh", "Mel",
+            "Derick", "Armand", "Fidel", "Lewis", "Donnell", "Desmond", "Vaughn", "Guadalupe", "Keneth", "Rodrick",
+            "Spencer", "Chas", "Gus", "Harlan", "Wes", "Carmelo", "Jefferson", "Gerard", "Jarvis", "Haywood", "Hayden",
+            "Sergio", "Gene", "Edgardo", "Colin", "Horace", "Dominic", "Aldo", "Adolfo", "Juan", "Man", "Lenard",
+            "Clement", "Everett", "Hal", "Bryon", "Mason", "Emerson", "Earle", "Laurence", "Columbus", "Lamar",
+            "Douglas", "Ian", "Fredrick", "Marc", "Loren", "Wallace", "Randell", "Noble", "Ricardo", "Rory", "Lindsey",
+            "Boris", "Bill", "Carlos", "Domingo", "Grant", "Craig", "Ezra", "Matthew", "Van", "Rudy", "Danial",
+            "Brock", "Maynard", "Vincent", "Cole", "Damion", "Ellsworth", "Marcel", "Markus", "Rueben", "Tanner",
+            "Reyes", "Hung", "Kennith", "Lindsay", "Howard", "Ralph", "Jed", "Monte", "Garfield", "Avery", "Bernardo",
+            "Malcolm", "Sterling", "Ezequiel", "Kristofer", "Luciano", "Casey", "Rosario", "Ellis", "Quintin",
+            "Trevor", "Miquel", "Jordan", "Arthur", "Carson", "Tyron", "Grady", "Walter", "Jonathon", "Ricky",
+            "Bennie", "Terrence", "Dion", "Dusty", "Roderick", "Isaac", "Rodrigo", "Harrison", "Zack", "Dee", "Devin",
+            "Rey", "Ulysses", "Clint", "Greg", "Dino", "Frances", "Wade", "Franklyn", "Jude", "Bradly", "Salvador",
+            "Rocky", "Weldon", "Lloyd", "Milford", "Clarence", "Alec", "Allan", "Bobbie", "Oswaldo", "Wilfred",
+            "Raleigh", "Shelby", "Willy", "Alphonso", "Arnoldo", "Robbie", "Truman", "Nicky", "Quinn", "Damien",
+            "Lacy", "Marcos", "Parker", "Burt", "Carroll", "Denver", "Buck", "Dong", "Normand", "Billie", "Edwin",
+            "Troy", "Arden", "Rusty", "Tommy", "Kenneth", "Leo", "Claud", "Joel", "Kendall", "Dante", "Milo", "Cruz",
+            "Lucien", "Ramon", "Jarrett", "Scottie", "Deshawn", "Ronnie", "Pete", "Alonzo", "Whitney", "Stefan",
+            "Sebastian", "Edmond", "Enrique", "Branden", "Leonard", "Loyd", "Olin", "Ron", "Rhett", "Frederic",
+            "Orval", "Tyrell", "Gail", "Eli", "Antonia", "Malcom", "Sandy", "Stacey", "Nickolas", "Hosea", "Santo",
+            "Oscar", "Fletcher", "Dave", "Patrick", "Dewey", "Bo", "Vito", "Blaine", "Randy", "Robin", "Winston",
+            "Sammy", "Edwardo", "Manuel", "Valentin", "Stanford", "Filiberto", "Buddy", "Zachariah", "Johnnie",
+            "Elbert", "Paul", "Isreal", "Jerrold", "Leif", "Owen", "Sung", "Junior", "Raphael", "Josef", "Donte",
+            "Allen", "Florencio", "Raymond", "Lauren", "Collin", "Eliseo", "Bruno", "Martin", "Lyndon", "Kurtis",
+            "Salvatore", "Erwin", "Michael", "Sean", "Davis", "Alberto", "King", "Rolland", "Joe", "Tory", "Chase",
+            "Dallas", "Vernon", "Beau", "Terrell", "Reynaldo", "Monty", "Jame", "Dirk", "Florentino", "Reuben", "Saul",
+            "Emory", "Esteban", "Michale", "Claudio", "Jacinto", "Kelley", "Levi", "Andrea", "Lanny", "Wendell",
+            "Elwood", "Joan", "Felipe", "Palmer", "Elmo", "Lawrence", "Hubert", "Rudolph", "Duane", "Cordell",
+            "Everette", "Mack", "Alan", "Efrain", "Trenton", "Bryan", "Tom", "Wilmer", "Clyde", "Chance", "Lou",
+            "Brain", "Justin", "Phil", "Jerrod", "George", "Kris", "Cyrus", "Emery", "Rickey", "Lincoln", "Renaldo",
+            "Mathew", "Luke", "Dwayne", "Alexis", "Jackson", "Gil", "Marty", "Burton", "Emil", "Glen", "Willian",
+            "Clemente", "Keven", "Barney", "Odell", "Reginald", "Aurelio", "Damon", "Ward", "Gustavo", "Harley",
+            "Peter", "Anibal", "Arlie", "Nigel", "Oren", "Zachary", "Scot", "Bud", "Wilbert", "Bart", "Josiah",
+            "Marlon", "Eldon", "Darryl", "Roger", "Anthony", "Omer", "Francis", "Patricia", "Moises", "Chuck",
+            "Waylon", "Hector", "Jamaal", "Cesar", "Julius", "Rex", "Norris", "Ollie", "Isaias", "Quentin", "Graig",
+            "Lyle", "Jeffry", "Karl", "Lester", "Danilo", "Mike", "Dylan", "Carlo", "Ryan", "Leon", "Percy", "Lucius",
+            "Jamel", "Lesley", "Joey", "Cornelius", "Rico", "Arnulfo", "Chet", "Margarito", "Ernie", "Nathanael",
+            "Amos", "Cleveland", "Luigi", "Alfonzo", "Phillip", "Clair", "Elroy", "Alva", "Hans", "Shon", "Gary",
+            "Jesus", "Cary", "Silas", "Keith", "Israel", "Willard", "Randolph", "Dan", "Adalberto", "Claude",
+            "Delbert", "Garry", "Mary", "Larry", "Riley", "Robt", "Darwin", "Barrett", "Steven", "Kelly", "Herschel",
+            "Darnell", "Scotty", "Armando", "Miguel", "Lawerence", "Wesley", "Garth", "Carol", "Micah", "Alvin",
+            "Billy", "Earl", "Pat", "Brady", "Cory", "Carey", "Bernard", "Jayson", "Nathaniel", "Gaylord", "Archie",
+            "Dorsey", "Erasmo", "Angelo", "Elisha", "Long", "Augustus", "Hobert", "Drew", "Stan", "Sherwood",
+            "Lorenzo", "Forrest", "Shawn", "Leigh", "Hiram", "Leonardo", "Gerry", "Myron", "Hugo", "Alvaro", "Leland",
+            "Genaro", "Jamey", "Stewart", "Elden", "Irving", "Olen", "Antone", "Freddy", "Lupe", "Joshua", "Gregory",
+            "Andrew", "Sang", "Wilbur", "Gerardo", "Merlin", "Williams", "Johnny", "Alex", "Tommie", "Jimmy",
+            "Donovan", "Dexter", "Gaston", "Tracy", "Jeff", "Stephen", "Berry", "Anton", "Darell", "Fritz", "Willis",
+            "Noel", "Mariano", "Crawford", "Zoey", "Alex", "Brianna", "Carlie", "Lloyd", "Cal", "Astor", "Randolf",
+            "Magdalene", "Trevelyan", "Terance", "Roy", "Kermit", "Harriett", "Crystal", "Laurinda", "Kiersten",
+            "Phyllida", "Liz", "Bettie", "Rena", "Colten", "Berenice", "Sindy", "Wilma", "Amos", "Candi", "Ritchie",
+            "Dirk", "Kathlyn", "Callista", "Anona", "Flossie", "Sterling", "Calista", "Regan", "Erica", "Jeana",
+            "Keaton", "York", "Nolan", "Daniel", "Benton", "Tommie", "Serenity", "Deanna", "Chas", "Heron", "Marlyn",
+            "Xylia", "Tristin", "Lyndon", "Andriana", "Madelaine", "Maddison", "Leila", "Chantelle", "Audrey",
+            "Connor", "Daley", "Tracee", "Tilda", "Eliot", "Merle", "Linwood", "Kathryn", "Silas", "Alvina",
+            "Phinehas", "Janis", "Alvena", "Zubin", "Gwendolen", "Caitlyn", "Bertram", "Hailee", "Idelle", "Homer",
+            "Jannah", "Delbert", "Rhianna", "Cy", "Jefferson", "Wayland", "Nona", "Tempest", "Reed", "Jenifer",
+            "Ellery", "Nicolina", "Aldous", "Prince", "Lexia", "Vinnie", "Doug", "Alberic", "Kayleen", "Woody",
+            "Rosanne", "Ysabel", "Skyler", "Twyla", "Geordie", "Leta", "Clive", "Aaron", "Scottie", "Celeste", "Chuck",
+            "Erle", "Lallie", "Jaycob", "Ray", "Carrie", "Laurita", "Noreen", "Meaghan", "Ulysses", "Andy", "Drogo",
+            "Dina", "Yasmin", "Mya", "Luvenia", "Urban", "Jacob", "Laetitia", "Sherry", "Love", "Michaela", "Deonne",
+            "Summer", "Brendon", "Sheena", "Mason", "Jayson", "Linden", "Salal", "Darrell", "Diana", "Hudson",
+            "Lennon", "Isador", "Charley", "April", "Ralph", "James", "Mina", "Jolyon", "Laurine", "Monna", "Carita",
+            "Munro", "Elsdon", "Everette", "Radclyffe", "Darrin", "Herbert", "Gawain", "Sheree", "Trudy", "Emmaline",
+            "Kassandra", "Rebecca", "Basil", "Jen", "Don", "Osborne", "Lilith", "Hannah", "Fox", "Rupert", "Paulene",
+            "Darius", "Wally", "Baptist", "Sapphire", "Tia", "Sondra", "Kylee", "Ashton", "Jepson", "Joetta", "Val",
+            "Adela", "Zacharias", "Zola", "Marmaduke", "Shannah", "Posie", "Oralie", "Brittany", "Ernesta", "Raymund",
+            "Denzil", "Daren", "Roosevelt", "Nelson", "Fortune", "Mariel", "Nick", "Jaden", "Upton", "Oz", "Margaux",
+            "Precious", "Albert", "Bridger", "Jimmy", "Nicola", "Rosalynne", "Keith", "Walt", "Della", "Joanna",
+            "Xenia", "Esmeralda", "Major", "Simon", "Rexana", "Stacy", "Calanthe", "Sherley", "Kaitlyn", "Graham",
+            "Ramsey", "Abbey", "Madlyn", "Kelvin", "Bill", "Rue", "Monica", "Caileigh", "Laraine", "Booker", "Jayna",
+            "Greta", "Jervis", "Sherman", "Kendrick", "Tommy", "Iris", "Geffrey", "Kaelea", "Kerr", "Garrick", "Jep",
+            "Audley", "Nic", "Bronte", "Beulah", "Patricia", "Jewell", "Deidra", "Cory", "Everett", "Harper",
+            "Charity", "Godfrey", "Jaime", "Sinclair", "Talbot", "Dayna", "Cooper", "Rosaline", "Jennie", "Eileen",
+            "Latanya", "Corinna", "Roxie", "Caesar", "Charles", "Pollie", "Lindsey", "Sorrel", "Dwight", "Jocelyn",
+            "Weston", "Shyla", "Valorie", "Bessie", "Josh", "Lessie", "Dayton", "Kathi", "Chasity", "Wilton", "Adam",
+            "William", "Ash", "Angela", "Ivor", "Ria", "Jazmine", "Hailey", "Jo", "Silvestra", "Ernie", "Clifford",
+            "Levi", "Matilda", "Quincey", "Camilla", "Delicia", "Phemie", "Laurena", "Bambi", "Lourdes", "Royston",
+            "Chastity", "Lynwood", "Elle", "Brenda", "Phoebe", "Timothy", "Raschelle", "Lilly", "Burt", "Rina",
+            "Rodney", "Maris", "Jaron", "Wilf", "Harlan", "Audra", "Vincent", "Elwyn", "Drew", "Wynter", "Ora",
+            "Lissa", "Virgil", "Xavier", "Chad", "Ollie", "Leyton", "Karolyn", "Skye", "Roni", "Gladys", "Dinah",
+            "Penny", "August", "Osmund", "Whitaker", "Brande", "Cornell", "Phil", "Zara", "Kilie", "Gavin", "Coty",
+            "Randy", "Teri", "Keira", "Pru", "Clemency", "Kelcey", "Nevil", "Poppy", "Gareth", "Christabel", "Bastian",
+            "Wynonna", "Roselyn", "Goddard", "Collin", "Trace", "Neal", "Effie", "Denys", "Virginia", "Richard",
+            "Isiah", "Harrietta", "Gaylord", "Diamond", "Trudi", "Elaine", "Jemmy", "Gage", "Annabel", "Quincy", "Syd",
+            "Marianna", "Philomena", "Aubree", "Kathie", "Jacki", "Kelley", "Bess", "Cecil", "Maryvonne", "Kassidy",
+            "Anselm", "Dona", "Darby", "Jamison", "Daryl", "Darell", "Teal", "Lennie", "Bartholomew", "Katie",
+            "Maybelline", "Kimball", "Elvis", "Les", "Flick", "Harley", "Beth", "Bidelia", "Montague", "Helen", "Ozzy",
+            "Stef", "Debra", "Maxene", "Stefanie", "Russ", "Avril", "Johnathan", "Orson", "Chelsey", "Josephine",
+            "Deshaun", "Wendell", "Lula", "Ferdinanda", "Greg", "Brad", "Kynaston", "Dena", "Russel", "Robertina",
+            "Misti", "Leon", "Anjelica", "Bryana", "Myles", "Judi", "Curtis", "Davin", "Kristia", "Chrysanta",
+            "Hayleigh", "Hector", "Osbert", "Eustace", "Cary", "Tansy", "Cayley", "Maryann", "Alissa", "Ike",
+            "Tranter", "Reina", "Alwilda", "Sidony", "Columbine", "Astra", "Jillie", "Stephania", "Jonah", "Kennedy",
+            "Ferdinand", "Allegria", "Donella", "Kelleigh", "Darian", "Eldreda", "Jayden", "Herbie", "Jake", "Winston",
+            "Vi", "Annie", "Cherice", "Hugo", "Tricia", "Haydee", "Cassarah", "Darden", "Mallory", "Alton", "Hadley",
+            "Romayne", "Lacey", "Ern", "Alayna", "Cecilia", "Seward", "Tilly", "Edgar", "Concordia", "Ibbie", "Dahlia",
+            "Oswin", "Stu", "Brett", "Maralyn", "Kristeen", "Dotty", "Robyn", "Nessa", "Tresha", "Guinevere",
+            "Emerson", "Haze", "Lyn", "Henderson", "Lexa", "Jaylen", "Gail", "Lizette", "Tiara", "Robbie", "Destiny",
+            "Alice", "Livia", "Rosy", "Leah", "Jan", "Zach", "Vita", "Gia", "Micheal", "Rowina", "Alysha", "Bobbi",
+            "Delores", "Osmond", "Karaugh", "Wilbur", "Kasandra", "Renae", "Kaety", "Dora", "Gaye", "Amaryllis",
+            "Katelyn", "Dacre", "Prudence", "Ebony", "Camron", "Jerrold", "Vivyan", "Randall", "Donna", "Misty",
+            "Damon", "Selby", "Esmund", "Rian", "Garry", "Julius", "Raelene", "Clement", "Dom", "Tibby", "Moss",
+            "Millicent", "Gwendoline", "Berry", "Ashleigh", "Lilac", "Quin", "Vere", "Creighton", "Harriet", "Malvina",
+            "Lianne", "Pearle", "Kizzie", "Kara", "Petula", "Jeanie", "Maria", "Pacey", "Victoria", "Huey", "Toni",
+            "Rose", "Wallis", "Diggory", "Josiah", "Delma", "Keysha", "Channing", "Prue", "Lee", "Ryan", "Sidney",
+            "Valerie", "Clancy", "Ezra", "Gilbert", "Clare", "Laz", "Crofton", "Mike", "Annabella", "Tara", "Eldred",
+            "Arthur", "Jaylon", "Peronel", "Paden", "Dot", "Marian", "Amyas", "Alexus", "Esmond", "Abbie", "Stanley",
+            "Brittani", "Vickie", "Errol", "Kimberlee", "Uland", "Ebenezer", "Howie", "Eveline", "Andrea", "Trish",
+            "Hopkin", "Bryanna", "Temperance", "Valarie", "Femie", "Alix", "Terrell", "Lewin", "Lorrin", "Happy",
+            "Micah", "Rachyl", "Sloan", "Gertrude", "Elizabeth", "Dorris", "Andra", "Bram", "Gary", "Jeannine",
+            "Maurene", "Irene", "Yolonda", "Jonty", "Coleen", "Cecelia", "Chantal", "Stuart", "Caris", "Ros",
+            "Kaleigh", "Mirabelle", "Kolby", "Primrose", "Susannah", "Ginny", "Jinny", "Dolly", "Lettice", "Sonny",
+            "Melva", "Ernest", "Garret", "Reagan", "Trenton", "Gallagher", "Edwin", "Nikolas", "Corrie", "Lynette",
+            "Ettie", "Sly", "Debbi", "Eudora", "Brittney", "Tacey", "Marius", "Anima", "Gordon", "Olivia", "Kortney",
+            "Shantel", "Kolleen", "Nevaeh", "Buck", "Sera", "Liliana", "Aric", "Kalyn", "Mick", "Libby", "Ingram",
+            "Alexandria", "Darleen", "Jacklyn", "Hughie", "Tyler", "Aida", "Ronda", "Deemer", "Taryn", "Laureen",
+            "Samantha", "Dave", "Hardy", "Baldric", "Montgomery", "Gus", "Ellis", "Titania", "Luke", "Chase", "Haidee",
+            "Mayra", "Isabell", "Trinity", "Milo", "Abigail", "Tacita", "Meg", "Hervey", "Natasha", "Sadie", "Holden",
+            "Dee", "Mansel", "Perry", "Randi", "Frederica", "Georgina", "Kolour", "Debbie", "Seraphina", "Elspet",
+            "Julyan", "Raven", "Zavia", "Jarvis", "Jaymes", "Grover", "Cairo", "Alea", "Jordon", "Braxton", "Donny",
+            "Rhoda", "Tonya", "Bee", "Alyssia", "Ashlyn", "Reanna", "Lonny", "Arlene", "Deb", "Jane", "Nikole",
+            "Bettina", "Harrison", "Tamzen", "Arielle", "Adelaide", "Faith", "Bridie", "Wilburn", "Fern", "Nan",
+            "Shaw", "Zeke", "Alan", "Dene", "Gina", "Alexa", "Bailey", "Sal", "Tammy", "Maximillian", "America",
+            "Sylvana", "Fitz", "Mo", "Marissa", "Cass", "Eldon", "Wilfrid", "Tel", "Joann", "Kendra", "Tolly",
+            "Leanne", "Ferdie", "Haven", "Lucas", "Marlee", "Cyrilla", "Red", "Phoenix", "Jazmin", "Carin", "Gena",
+            "Lashonda", "Tucker", "Genette", "Kizzy", "Winifred", "Melody", "Keely", "Kaylyn", "Radcliff", "Lettie",
+            "Foster", "Lyndsey", "Nicholas", "Farley", "Louisa", "Dana", "Dortha", "Francine", "Doran", "Bonita",
+            "Hal", "Sawyer", "Reginald", "Aislin", "Nathan", "Baylee", "Abilene", "Ladonna", "Maurine", "Shelly",
+            "Deandre", "Jasmin", "Roderic", "Tiffany", "Amanda", "Verity", "Wilford", "Gayelord", "Whitney", "Demelza",
+            "Kenton", "Alberta", "Kyra", "Tabitha", "Sampson", "Korey", "Lillian", "Edison", "Clayton", "Steph",
+            "Maya", "Dusty", "Jim", "Ronny", "Adrianne", "Bernard", "Harris", "Kiley", "Alexander", "Kisha", "Ethalyn",
+            "Patience", "Briony", "Indigo", "Aureole", "Makenzie", "Molly", "Sherilyn", "Barry", "Laverne", "Hunter",
+            "Rocky", "Tyreek", "Madalyn", "Phyliss", "Chet", "Beatrice", "Faye", "Lavina", "Madelyn", "Tracey",
+            "Gyles", "Patti", "Carlyn", "Stephanie", "Jackalyn", "Larrie", "Kimmy", "Isolda", "Emelina", "Lis",
+            "Zillah", "Cody", "Sheard", "Rufus", "Paget", "Mae", "Rexanne", "Luvinia", "Tamsen", "Rosanna", "Greig",
+            "Stacia", "Mabelle", "Quianna", "Lotus", "Delice", "Bradford", "Angus", "Cosmo", "Earlene", "Adrian",
+            "Arlie", "Noelle", "Sabella", "Isa", "Adelle", "Innocent", "Kirby", "Trixie", "Kenelm", "Nelda", "Melia",
+            "Kendal", "Dorinda", "Placid", "Linette", "Kam", "Sherisse", "Evan", "Ewart", "Janice", "Linton",
+            "Jacaline", "Charissa", "Douglas", "Aileen", "Kemp", "Oli", "Amethyst", "Rosie", "Nigella", "Sherill",
+            "Anderson", "Alanna", "Eric", "Claudia", "Jennifer", "Boniface", "Harriet", "Vernon", "Lucy", "Shawnee",
+            "Gerard", "Cecily", "Romey", "Randall", "Wade", "Lux", "Dawson", "Gregg", "Kade", "Roxanne", "Melinda",
+            "Rolland", "Rowanne", "Fannie", "Isidore", "Melia", "Harvie", "Salal", "Eleonor", "Jacquette", "Lavone",
+            "Shanika", "Tarquin", "Janet", "Josslyn", "Maegan", "Augusta", "Aubree", "Francene", "Martie", "Marisa",
+            "Tyreek", "Tatianna", "Caleb", "Sheridan", "Nellie", "Barbara", "Wat", "Jayla", "Esmaralda", "Graeme",
+            "Lavena", "Jemima", "Nikolas", "Triston", "Portia", "Kyla", "Marcus", "Raeburn", "Jamison", "Earl", "Wren",
+            "Leighton", "Lagina", "Lucasta", "Dina", "Amaranta", "Jessika", "Claud", "Bernard", "Winifred", "Ebba",
+            "Sammi", "Gall", "Chloe", "Ottoline", "Herbert", "Janice", "Gareth", "Channing", "Caleigh", "Kailee",
+            "Ralphie", "Tamzen", "Quincy", "Beaumont", "Albert", "Jadyn", "Violet", "Luanna", "Moriah", "Humbert",
+            "Jed", "Leona", "Hale", "Mitch", "Marlin", "Nivek", "Darwin", "Dirk", "Liliana", "Meadow", "Bernadine",
+            "Jorie", "Peyton", "Astra", "Roscoe", "Gina", "Lovell", "Jewel", "Romayne", "Rosy", "Imogene",
+            "Margaretta", "Lorinda", "Hopkin", "Bobby", "Flossie", "Bennie", "Horatio", "Jonah", "Lyn", "Deana",
+            "Juliana", "Blanch", "Wright", "Kendal", "Woodrow", "Tania", "Austyn", "Val", "Mona", "Charla", "Rudyard",
+            "Pamela", "Raven", "Zena", "Nicola", "Kaelea", "Conor", "Virgil", "Sonnie", "Goodwin", "Christianne",
+            "Linford", "Myron", "Denton", "Charita", "Brody", "Ginnie", "Harrison", "Jeanine", "Quin", "Isolda",
+            "Zoie", "Pearce", "Margie", "Larrie", "Angelina", "Marcia", "Jessamine", "Delilah", "Dick", "Luana",
+            "Delicia", "Lake", "Luvenia", "Vaughan", "Concordia", "Gayelord", "Cheyenne", "Felix", "Dorris", "Pen",
+            "Kristeen", "Parris", "Everitt", "Josephina", "Amy", "Tommie", "Adrian", "April", "Rosaline", "Zachery",
+            "Trace", "Phoebe", "Jenelle", "Kameron", "Katharine", "Media", "Colton", "Tad", "Quianna", "Kerenza",
+            "Greta", "Luvinia", "Pete", "Tonya", "Beckah", "Barbra", "Jon", "Tetty", "Corey", "Sylvana", "Kizzy",
+            "Korey", "Trey", "Haydee", "Penny", "Mandy", "Panda", "Coline", "Ramsey", "Sukie", "Annabel", "Sarina",
+            "Corbin", "Suzanna", "Rob", "Duana", "Shell", "Jason", "Eddy", "Rube", "Roseann", "Celia", "Brianne",
+            "Nerissa", "Jera", "Humphry", "Ashlynn", "Terrence", "Philippina", "Coreen", "Kolour", "Indiana", "Paget",
+            "Marlyn", "Hester", "Isbel", "Ocean", "Harris", "Leslie", "Vere", "Monroe", "Isabelle", "Bertie", "Clitus",
+            "Dave", "Alethea", "Lessie", "Louiza", "Madlyn", "Garland", "Wolf", "Lalo", "Donny", "Amabel", "Tianna",
+            "Louie", "Susie", "Mackenzie", "Renie", "Tess", "Marmaduke", "Gwendolen", "Bettina", "Beatrix", "Esmund",
+            "Minnie", "Carlie", "Barnabas", "Ruthie", "Honour", "Haylie", "Xavior", "Freddie", "Ericka", "Aretha",
+            "Edie", "Madelina", "Anson", "Tabby", "Derrick", "Jocosa", "Deirdre", "Aislin", "Chastity", "Abigail",
+            "Wynonna", "Zo", "Eldon", "Krystine", "Ghislaine", "Zavia", "Nolene", "Marigold", "Kelley", "Sylvester",
+            "Odell", "George", "Laurene", "Franklyn", "Clarice", "Mo", "Dustin", "Debbi", "Lina", "Tony", "Acacia",
+            "Hettie", "Natalee", "Marcie", "Brittany", "Elnora", "Rachel", "Dawn", "Basil", "Christal", "Anjelica",
+            "Fran", "Tawny", "Delroy", "Tameka", "Lillie", "Ceara", "Deanna", "Deshaun", "Ken", "Bradford", "Justina",
+            "Merle", "Draven", "Gretta", "Harriette", "Webster", "Nathaniel", "Anemone", "Coleen", "Ruth", "Chryssa",
+            "Hortensia", "Saffie", "Deonne", "Leopold", "Harlan", "Lea", "Eppie", "Lucinda", "Tilda", "Fanny", "Titty",
+            "Lockie", "Jepson", "Sherisse", "Maralyn", "Ethel", "Sly", "Ebenezer", "Canute", "Ella", "Freeman",
+            "Reuben", "Olivette", "Nona", "Rik", "Amice", "Kristine", "Kathie", "Jayne", "Jeri", "Mckenna", "Bertram",
+            "Kaylee", "Livia", "Gil", "Wallace", "Maryann", "Keeleigh", "Laurinda", "Doran", "Khloe", "Dakota",
+            "Yaron", "Kimberleigh", "Gytha", "Doris", "Marylyn", "Benton", "Linnette", "Esther", "Jakki", "Rowina",
+            "Marian", "Roselyn", "Norbert", "Maggie", "Caesar", "Phinehas", "Jerry", "Jasmine", "Antonette", "Miriam",
+            "Monna", "Maryvonne", "Jacquetta", "Bernetta", "Napier", "Annie", "Gladwin", "Sheldon", "Aric", "Elouise",
+            "Gawain", "Kristia", "Gabe", "Kyra", "Red", "Tod", "Dudley", "Lorraine", "Ryley", "Sabina", "Poppy",
+            "Leland", "Aileen", "Eglantine", "Alicia", "Jeni", "Addy", "Tiffany", "Geffrey", "Lavina", "Collin",
+            "Clover", "Vin", "Jerome", "Doug", "Vincent", "Florence", "Scarlet", "Celeste", "Desdemona", "Tiphanie",
+            "Kassandra", "Ashton", "Madison", "Art", "Magdalene", "Iona", "Josepha", "Anise", "Ferne", "Derek",
+            "Huffie", "Qiana", "Ysabel", "Tami", "Shannah", "Xavier", "Willard", "Winthrop", "Vickie", "Maura",
+            "Placid", "Tiara", "Reggie", "Elissa", "Isa", "Chrysanta", "Jeff", "Bessie", "Terri", "Amilia", "Brett",
+            "Daniella", "Damion", "Carolina", "Maximillian", "Travers", "Benjamin", "Oprah", "Darcy", "Yolanda",
+            "Nicolina", "Crofton", "Jarrett", "Kaitlin", "Shauna", "Keren", "Bevis", "Kalysta", "Sharron", "Alyssa",
+            "Blythe", "Zelma", "Caelie", "Norwood", "Billie", "Patrick", "Gary", "Cambria", "Tylar", "Mason", "Helen",
+            "Melyssa", "Gene", "Gilberta", "Carter", "Herbie", "Harmonie", "Leola", "Eugenia", "Clint", "Pauletta",
+            "Edwyna", "Georgina", "Teal", "Harper", "Izzy", "Dillon", "Kezia", "Evangeline", "Colene", "Madelaine",
+            "Zilla", "Rudy", "Dottie", "Caris", "Morton", "Marge", "Tacey", "Parker", "Troy", "Liza", "Lewin",
+            "Tracie", "Justine", "Dallas", "Linden", "Ray", "Loretta", "Teri", "Elvis", "Diane", "Julianna", "Manfred",
+            "Denise", "Eireen", "Ann", "Kenith", "Linwood", "Kathlyn", "Bernice", "Shelley", "Oswald", "Amedeus",
+            "Homer", "Tanzi", "Ted", "Ralphina", "Hyacinth", "Lotus", "Matthias", "Arlette", "Clark", "Cecil",
+            "Elspeth", "Alvena", "Noah", "Millard", "Brenden", "Cole", "Philipa", "Nina", "Thelma", "Iantha", "Reid",
+            "Jefferson", "Meg", "Elsie", "Shirlee", "Nathan", "Nancy", "Simona", "Racheal", "Carin", "Emory", "Delice",
+            "Kristi", "Karaugh", "Kaety", "Tilly", "Em", "Alanis", "Darrin", "Jerrie", "Hollis", "Cary", "Marly",
+            "Carita", "Jody", "Farley", "Hervey", "Rosalin", "Cuthbert", "Stewart", "Jodene", "Caileigh", "Briscoe",
+            "Dolores", "Sheree", "Eustace", "Nigel", "Detta", "Barret", "Rowland", "Kenny", "Githa", "Zoey", "Adela",
+            "Petronella", "Opal", "Coleman", "Niles", "Cyril", "Dona", "Alberic", "Allannah", "Jules", "Avalon",
+            "Hadley", "Thomas", "Renita", "Calanthe", "Heron", "Shawnda", "Chet", "Malina", "Manny", "Rina", "Frieda",
+            "Eveleen", "Deshawn", "Amos", "Raelene", "Paige", "Molly", "Nannie", "Ileen", "Brendon", "Milford",
+            "Unice", "Rebeccah", "Caedmon", "Gae", "Doreen", "Vivian", "Louis", "Raphael", "Vergil", "Lise", "Glenn",
+            "Karyn", "Terance", "Reina", "Jake", "Gordon", "Wisdom", "Isiah", "Gervase", "Fern", "Marylou", "Roddy",
+            "Justy", "Derick", "Shantelle", "Adam", "Chantel", "Madoline", "Emmerson", "Lexie", "Mickey", "Stephen",
+            "Dane", "Stacee", "Elwin", "Tracey", "Alexandra", "Ricky", "Ian", "Kasey", "Rita", "Alanna", "Georgene",
+            "Deon", "Zavier", "Ophelia", "Deforest", "Lowell", "Zubin", "Hardy", "Osmund", "Tabatha", "Debby",
+            "Katlyn", "Tallulah", "Priscilla", "Braden", "Wil", "Keziah", "Jen", "Aggie", "Korbin", "Lemoine",
+            "Barnaby", "Tranter", "Goldie", "Roderick", "Trina", "Emery", "Pris", "Sidony", "Adelle", "Tate", "Wilf",
+            "Zola", "Brande", "Chris", "Calanthia", "Lilly", "Kaycee", "Lashonda", "Jasmin", "Elijah", "Shantel",
+            "Simon", "Rosalind", "Jarod", "Kaylie", "Corrine", "Joselyn", "Archibald", "Mariabella", "Winton",
+            "Merlin", "Chad", "Ursula", "Kristopher", "Hewie", "Adrianna", "Lyndsay", "Jasmyn", "Tim", "Evette",
+            "Margaret", "Samson", "Bronte", "Terence", "Leila", "Candice", "Tori", "Jamey", "Coriander", "Conrad",
+            "Floyd", "Karen", "Lorin", "Maximilian", "Cairo", "Emily", "Yasmin", "Karolyn", "Bryan", "Lanny",
+            "Kimberly", "Rick", "Chaz", "Krystle", "Lyric", "Laura", "Garrick", "Flip", "Monty", "Brendan",
+            "Ermintrude", "Rayner", "Merla", "Titus", "Marva", "Patricia", "Leone", "Tracy", "Jaqueline", "Hallam",
+            "Delores", "Cressida", "Carlyle", "Leann", "Kelcey", "Laurence", "Ryan", "Reynold", "Mark", "Collyn",
+            "Audie", "Sammy", "Ellery", "Sallie", "Pamelia", "Adolph", "Lydia", "Titania", "Ron", "Bridger", "Aline",
+            "Read", "Kelleigh", "Weldon", "Irving", "Garey", "Diggory", "Evander", "Kylee", "Deidre", "Ormond",
+            "Laurine", "Reannon", "Arline", "Pat"
+
+    };
+
+    public static String[] jargon = { "wireless", "signal", "network", "3G", "plan", "touch-screen",
+            "customer-service", "reachability", "voice-command", "shortcut-menu", "customization", "platform", "speed",
+            "voice-clarity", "voicemail-service" };
+
+    public static String[] vendors = { "at&t", "verizon", "t-mobile", "sprint", "motorola", "samsung", "iphone" };
+
+    public static String[] org_list = { "Latsonity", "ganjalax", "Zuncan", "Lexitechno", "Hot-tech", "subtam",
+            "Coneflex", "Ganjatax", "physcane", "Tranzap", "Qvohouse", "Zununoing", "jaydax", "Keytech", "goldendexon",
+            "Villa-tech", "Trustbam", "Newcom", "Voltlane", "Ontohothex", "Ranhotfan", "Alphadax", "Transhigh",
+            "kin-ron", "Doublezone", "Solophase", "Vivaace", "silfind", "Basecone", "sonstreet", "Freshfix",
+            "Techitechi", "Kanelectrics", "linedexon", "Goldcity", "Newfase", "Technohow", "Zimcone", "Salthex",
+            "U-ron", "Solfix", "whitestreet", "Xx-technology", "Hexviafind", "over-it", "Strongtone", "Tripplelane",
+            "geomedia", "Scotcity", "Inchex", "Vaiatech", "Striptaxon", "Hatcom", "tresline", "Sanjodax", "freshdox",
+            "Sumlane", "Quadlane", "Newphase", "overtech", "Voltbam", "Icerunin", "Fixdintex", "Hexsanhex", "Statcode",
+            "Greencare", "U-electrics", "Zamcorporation", "Ontotanin", "Tanzimcare", "Groovetex", "Ganjastrip",
+            "Redelectronics", "Dandamace", "Whitemedia", "strongex", "Streettax", "highfax", "Mathtech", "Xx-drill",
+            "Sublamdox", "Unijobam", "Rungozoom", "Fixelectrics", "Villa-dox", "Ransaofan", "Plexlane", "itlab",
+            "Lexicone", "Fax-fax", "Viatechi", "Inchdox", "Kongreen", "Doncare", "Y-geohex", "Opeelectronics",
+            "Medflex", "Dancode", "Roundhex", "Labzatron", "Newhotplus", "Sancone", "Ronholdings", "Quoline",
+            "zoomplus", "Fix-touch", "Codetechno", "Tanzumbam", "Indiex", "Canline" };
+}
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/DataGeneratorDriver.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/DataGeneratorDriver.java
new file mode 100644
index 0000000..b2ba88e
--- /dev/null
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/DataGeneratorDriver.java
@@ -0,0 +1,18 @@
+package edu.uci.ics.asterix.tools.external.data;
+
+import java.util.Iterator;
+
+import edu.uci.ics.asterix.tools.external.data.DataGenerator.InitializationInfo;
+import edu.uci.ics.asterix.tools.external.data.DataGenerator.TweetMessage;
+
+public class DataGeneratorDriver {
+
+    public static void main(String[] args) {
+
+        DataGenerator.initialize(new InitializationInfo());
+        Iterator<TweetMessage> tweetIterator = DataGenerator.getTwitterMessageIterator();
+        while (tweetIterator.hasNext()) {
+            System.out.println(tweetIterator.next().toString());
+        }
+    }
+}
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapter.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapter.java
index 84b989d..a522336 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapter.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapter.java
@@ -40,240 +40,288 @@
 import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
 
 /**
- * An adapter that simulates a feed from the contents of a source file. The file can be on the local file
- * system or on HDFS. The feed ends when the content of the source file has been ingested.
+ * An adapter that simulates a feed from the contents of a source file. The file
+ * can be on the local file system or on HDFS. The feed ends when the content of
+ * the source file has been ingested.
  */
-public class RateControlledFileSystemBasedAdapter extends FileSystemBasedAdapter implements ITypedDatasourceAdapter,
-        IManagedFeedAdapter {
+public class RateControlledFileSystemBasedAdapter extends
+		FileSystemBasedAdapter implements ITypedDatasourceAdapter,
+		IManagedFeedAdapter {
 
-    private static final long serialVersionUID = 1L;
+	private static final long serialVersionUID = 1L;
 
-    public static final String KEY_FILE_SYSTEM = "fs";
-    public static final String LOCAL_FS = "localfs";
-    public static final String HDFS = "hdfs";
+	public static final String KEY_FILE_SYSTEM = "fs";
+	public static final String LOCAL_FS = "localfs";
+	public static final String HDFS = "hdfs";
 
-    private final FileSystemBasedAdapter coreAdapter;
-    private final Map<String, String> configuration;
-    private final String fileSystem;
-    private final String format;
+	private final FileSystemBasedAdapter coreAdapter;
+	private final Map<String, String> configuration;
+	private final String fileSystem;
+	private final String format;
 
-    public RateControlledFileSystemBasedAdapter(ARecordType atype, Map<String, String> configuration) throws Exception {
-        super(atype);
-        checkRequiredArgs(configuration);
-        fileSystem = configuration.get(KEY_FILE_SYSTEM);
-        String adapterFactoryClass = null;
-        if (fileSystem.equalsIgnoreCase(LOCAL_FS)) {
-            adapterFactoryClass = "edu.uci.ics.asterix.external.adapter.factory.NCFileSystemAdapterFactory";
-        } else if (fileSystem.equals(HDFS)) {
-            adapterFactoryClass = "edu.uci.ics.asterix.external.adapter.factory.HDFSAdapterFactory";
-        } else {
-            throw new AsterixException("Unsupported file system type " + fileSystem);
-        }
-        format = configuration.get(KEY_FORMAT);
-        IGenericDatasetAdapterFactory adapterFactory = (IGenericDatasetAdapterFactory) Class.forName(
-                adapterFactoryClass).newInstance();
-        coreAdapter = (FileSystemBasedAdapter) adapterFactory.createAdapter(configuration, atype);
-        this.configuration = configuration;
-    }
+	public RateControlledFileSystemBasedAdapter(ARecordType atype,
+			Map<String, String> configuration) throws Exception {
+		super(atype);
+		checkRequiredArgs(configuration);
+		fileSystem = configuration.get(KEY_FILE_SYSTEM);
+		String adapterFactoryClass = null;
+		if (fileSystem.equalsIgnoreCase(LOCAL_FS)) {
+			adapterFactoryClass = "edu.uci.ics.asterix.external.adapter.factory.NCFileSystemAdapterFactory";
+		} else if (fileSystem.equals(HDFS)) {
+			adapterFactoryClass = "edu.uci.ics.asterix.external.adapter.factory.HDFSAdapterFactory";
+		} else {
+			throw new AsterixException("Unsupported file system type "
+					+ fileSystem);
+		}
+		format = configuration.get(KEY_FORMAT);
+		IGenericDatasetAdapterFactory adapterFactory = (IGenericDatasetAdapterFactory) Class
+				.forName(adapterFactoryClass).newInstance();
+		coreAdapter = (FileSystemBasedAdapter) adapterFactory.createAdapter(
+				configuration, atype);
+		this.configuration = configuration;
+	}
 
-    private void checkRequiredArgs(Map<String, String> configuration) throws Exception {
-        if (configuration.get(KEY_FILE_SYSTEM) == null) {
-            throw new Exception("File system type not specified. (fs=?) File system could be 'localfs' or 'hdfs'");
-        }
-        if (configuration.get(IGenericDatasetAdapterFactory.KEY_TYPE_NAME) == null) {
-            throw new Exception("Record type not specified (output-type-name=?)");
-        }
-        if (configuration.get(KEY_PATH) == null) {
-            throw new Exception("File path not specified (path=?)");
-        }
-        if (configuration.get(KEY_FORMAT) == null) {
-            throw new Exception("File format not specified (format=?)");
-        }
-    }
+	private void checkRequiredArgs(Map<String, String> configuration)
+			throws Exception {
+		if (configuration.get(KEY_FILE_SYSTEM) == null) {
+			throw new Exception(
+					"File system type not specified. (fs=?) File system could be 'localfs' or 'hdfs'");
+		}
+		if (configuration.get(IGenericDatasetAdapterFactory.KEY_TYPE_NAME) == null) {
+			throw new Exception(
+					"Record type not specified (output-type-name=?)");
+		}
+		if (configuration.get(KEY_PATH) == null) {
+			throw new Exception("File path not specified (path=?)");
+		}
+		if (configuration.get(KEY_FORMAT) == null) {
+			throw new Exception("File format not specified (format=?)");
+		}
+	}
 
-    @Override
-    public InputStream getInputStream(int partition) throws IOException {
-        return coreAdapter.getInputStream(partition);
-    }
+	@Override
+	public InputStream getInputStream(int partition) throws IOException {
+		return coreAdapter.getInputStream(partition);
+	}
 
-    @Override
-    public void initialize(IHyracksTaskContext ctx) throws Exception {
-        coreAdapter.initialize(ctx);
-        this.ctx = ctx;
-    }
+	@Override
+	public void initialize(IHyracksTaskContext ctx) throws Exception {
+		coreAdapter.initialize(ctx);
+		this.ctx = ctx;
+	}
 
-    @Override
-    public void configure(Map<String, String> arguments) throws Exception {
-        coreAdapter.configure(arguments);
-    }
+	@Override
+	public void configure(Map<String, String> arguments) throws Exception {
+		coreAdapter.configure(arguments);
+	}
 
-    @Override
-    public AdapterType getAdapterType() {
-        return coreAdapter.getAdapterType();
-    }
+	@Override
+	public AdapterType getAdapterType() {
+		return coreAdapter.getAdapterType();
+	}
 
-    @Override
-    protected ITupleParser getTupleParser() throws Exception {
-        ITupleParser parser = null;
-        if (format.equals(FORMAT_DELIMITED_TEXT)) {
-            parser = getRateControlledDelimitedDataTupleParser((ARecordType) atype);
-        } else if (format.equals(FORMAT_ADM)) {
-            parser = getRateControlledADMDataTupleParser((ARecordType) atype);
-        } else {
-            throw new IllegalArgumentException(" format " + configuration.get(KEY_FORMAT) + " not supported");
-        }
-        return parser;
+	@Override
+	protected ITupleParser getTupleParser() throws Exception {
+		ITupleParser parser = null;
+		if (format.equals(FORMAT_DELIMITED_TEXT)) {
+			parser = getRateControlledDelimitedDataTupleParser((ARecordType) atype);
+		} else if (format.equals(FORMAT_ADM)) {
+			parser = getRateControlledADMDataTupleParser((ARecordType) atype);
+		} else {
+			throw new IllegalArgumentException(" format "
+					+ configuration.get(KEY_FORMAT) + " not supported");
+		}
+		return parser;
 
-    }
+	}
 
-    protected ITupleParser getRateControlledDelimitedDataTupleParser(ARecordType recordType) throws AsterixException {
-        ITupleParser parser;
-        int n = recordType.getFieldTypes().length;
-        IValueParserFactory[] fieldParserFactories = new IValueParserFactory[n];
-        for (int i = 0; i < n; i++) {
-            ATypeTag tag = recordType.getFieldTypes()[i].getTypeTag();
-            IValueParserFactory vpf = typeToValueParserFactMap.get(tag);
-            if (vpf == null) {
-                throw new NotImplementedException("No value parser factory for delimited fields of type " + tag);
-            }
-            fieldParserFactories[i] = vpf;
+	protected ITupleParser getRateControlledDelimitedDataTupleParser(
+			ARecordType recordType) throws AsterixException {
+		ITupleParser parser;
+		int n = recordType.getFieldTypes().length;
+		IValueParserFactory[] fieldParserFactories = new IValueParserFactory[n];
+		for (int i = 0; i < n; i++) {
+			ATypeTag tag = recordType.getFieldTypes()[i].getTypeTag();
+			IValueParserFactory vpf = typeToValueParserFactMap.get(tag);
+			if (vpf == null) {
+				throw new NotImplementedException(
+						"No value parser factory for delimited fields of type "
+								+ tag);
+			}
+			fieldParserFactories[i] = vpf;
 
-        }
-        String delimiterValue = (String) configuration.get(KEY_DELIMITER);
-        if (delimiterValue != null && delimiterValue.length() > 1) {
-            throw new AsterixException("improper delimiter");
-        }
+		}
+		String delimiterValue = (String) configuration.get(KEY_DELIMITER);
+		if (delimiterValue != null && delimiterValue.length() > 1) {
+			throw new AsterixException("improper delimiter");
+		}
 
-        Character delimiter = delimiterValue.charAt(0);
-        parser = new RateControlledTupleParserFactory(recordType, fieldParserFactories, delimiter, configuration)
-                .createTupleParser(ctx);
-        return parser;
-    }
+		Character delimiter = delimiterValue.charAt(0);
+		parser = new RateControlledTupleParserFactory(recordType,
+				fieldParserFactories, delimiter, configuration)
+				.createTupleParser(ctx);
+		return parser;
+	}
 
-    protected ITupleParser getRateControlledADMDataTupleParser(ARecordType recordType) throws AsterixException {
-        ITupleParser parser = null;
-        try {
-            parser = new RateControlledTupleParserFactory(recordType, configuration).createTupleParser(ctx);
-            return parser;
-        } catch (Exception e) {
-            throw new AsterixException(e);
-        }
+	protected ITupleParser getRateControlledADMDataTupleParser(
+			ARecordType recordType) throws AsterixException {
+		ITupleParser parser = null;
+		try {
+			parser = new RateControlledTupleParserFactory(recordType,
+					configuration).createTupleParser(ctx);
+			return parser;
+		} catch (Exception e) {
+			throw new AsterixException(e);
+		}
 
-    }
+	}
 
-    @Override
-    public ARecordType getAdapterOutputType() {
-        return (ARecordType) atype;
-    }
+	@Override
+	public ARecordType getAdapterOutputType() {
+		return (ARecordType) atype;
+	}
 
-    @Override
-    public void alter(Map<String, String> properties) {
-        ((RateControlledTupleParser) parser).setInterTupleInterval(Long.parseLong(properties
-                .get(RateControlledTupleParser.INTER_TUPLE_INTERVAL)));
-    }
+	@Override
+	public void alter(Map<String, String> properties) {
+		((RateControlledTupleParser) parser).setInterTupleInterval(Long
+				.parseLong(properties
+						.get(RateControlledTupleParser.INTER_TUPLE_INTERVAL)));
+	}
 
-    @Override
-    public void stop() {
-        ((RateControlledTupleParser) parser).stop();
-    }
+	@Override
+	public void stop() {
+		((RateControlledTupleParser) parser).stop();
+	}
 
-    @Override
-    public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
-        return coreAdapter.getPartitionConstraint();
-    }
+	@Override
+	public AlgebricksPartitionConstraint getPartitionConstraint()
+			throws Exception {
+		return coreAdapter.getPartitionConstraint();
+	}
 }
 
 class RateControlledTupleParserFactory implements ITupleParserFactory {
 
-    private static final long serialVersionUID = 1L;
+	private static final long serialVersionUID = 1L;
 
-    private final ARecordType recordType;
-    private final IDataParser dataParser;
-    private final Map<String, String> configuration;
+	private final ARecordType recordType;
+	private final IDataParser dataParser;
+	private final Map<String, String> configuration;
 
-    public RateControlledTupleParserFactory(ARecordType recordType, IValueParserFactory[] valueParserFactories,
-            char fieldDelimiter, Map<String, String> configuration) {
-        this.recordType = recordType;
-        dataParser = new DelimitedDataParser(recordType, valueParserFactories, fieldDelimiter);
-        this.configuration = configuration;
-    }
+	public RateControlledTupleParserFactory(ARecordType recordType,
+			IValueParserFactory[] valueParserFactories, char fieldDelimiter,
+			Map<String, String> configuration) {
+		this.recordType = recordType;
+		dataParser = new DelimitedDataParser(recordType, valueParserFactories,
+				fieldDelimiter);
+		this.configuration = configuration;
+	}
 
-    public RateControlledTupleParserFactory(ARecordType recordType, Map<String, String> configuration) {
-        this.recordType = recordType;
-        dataParser = new ADMDataParser();
-        this.configuration = configuration;
-    }
+	public RateControlledTupleParserFactory(ARecordType recordType,
+			Map<String, String> configuration) {
+		this.recordType = recordType;
+		dataParser = new ADMDataParser();
+		this.configuration = configuration;
+	}
 
-    @Override
-    public ITupleParser createTupleParser(IHyracksTaskContext ctx) {
-        return new RateControlledTupleParser(ctx, recordType, dataParser, configuration);
-    }
+	@Override
+	public ITupleParser createTupleParser(IHyracksTaskContext ctx) {
+		return new RateControlledTupleParser(ctx, recordType, dataParser,
+				configuration);
+	}
 
 }
 
 class RateControlledTupleParser extends AbstractTupleParser {
 
-    private final IDataParser dataParser;
-    private long interTupleInterval;
-    private boolean delayConfigured;
-    private boolean continueIngestion = true;
+	private final IDataParser dataParser;
+	private long interTupleInterval;
+	private int failAfterNRecords = NO_FAILURE;
+	private boolean delayConfigured;
+	private boolean continueIngestion = true;
+	private boolean failureConfigured = false;
+	private int tupleCount = 0;
+	private FailurePattern failurePattern = null;
 
-    public static final String INTER_TUPLE_INTERVAL = "tuple-interval";
+	public static final int NO_FAILURE = -1;
+	public static final String INTER_TUPLE_INTERVAL = "tuple-interval";
+	public static final String FAIL_AFTER = "fail-after";
+	public static final String FAIL_PATTERN = "failure-pattern";
 
-    public RateControlledTupleParser(IHyracksTaskContext ctx, ARecordType recType, IDataParser dataParser,
-            Map<String, String> configuration) {
-        super(ctx, recType);
-        this.dataParser = dataParser;
-        String propValue = configuration.get(INTER_TUPLE_INTERVAL);
-        if (propValue != null) {
-            interTupleInterval = Long.parseLong(propValue);
-        } else {
-            interTupleInterval = 0;
-        }
-        delayConfigured = interTupleInterval != 0;
-    }
+	public enum FailurePattern {
+		ONCE, REPEAT
+	}
 
-    public void setInterTupleInterval(long val) {
-        this.interTupleInterval = val;
-        this.delayConfigured = val > 0;
-    }
+	public RateControlledTupleParser(IHyracksTaskContext ctx,
+			ARecordType recType, IDataParser dataParser,
+			Map<String, String> configuration) {
+		super(ctx, recType);
+		this.dataParser = dataParser;
+		String propValue = configuration.get(INTER_TUPLE_INTERVAL);
+		if (propValue != null) {
+			interTupleInterval = Long.parseLong(propValue);
+		} else {
+			interTupleInterval = 0;
+		}
+		delayConfigured = interTupleInterval != 0;
 
-    public void stop() {
-        continueIngestion = false;
-    }
+		propValue = configuration.get(FAIL_AFTER);
 
-    @Override
-    public IDataParser getDataParser() {
-        return dataParser;
-    }
+		if (propValue != null) {
+			failAfterNRecords = Integer.parseInt(propValue);
+			failureConfigured = failAfterNRecords != NO_FAILURE;
+		}
+	}
 
-    @Override
-    public void parse(InputStream in, IFrameWriter writer) throws HyracksDataException {
+	public void setInterTupleInterval(long val) {
+		this.interTupleInterval = val;
+		this.delayConfigured = val > 0;
+	}
 
-        appender.reset(frame, true);
-        IDataParser parser = getDataParser();
-        try {
-            parser.initialize(in, recType, true);
-            while (continueIngestion) {
-                tb.reset();
-                if (!parser.parse(tb.getDataOutput())) {
-                    break;
-                }
-                tb.addFieldEndOffset();
-                if (delayConfigured) {
-                    Thread.sleep(interTupleInterval);
-                }
-                addTupleToFrame(writer);
-            }
-            if (appender.getTupleCount() > 0) {
-                FrameUtils.flushFrame(frame, writer);
-            }
-        } catch (AsterixException ae) {
-            throw new HyracksDataException(ae);
-        } catch (IOException ioe) {
-            throw new HyracksDataException(ioe);
-        } catch (InterruptedException ie) {
-            throw new HyracksDataException(ie);
-        }
-    }
+	public void stop() {
+		continueIngestion = false;
+	}
+
+	@Override
+	public IDataParser getDataParser() {
+		return dataParser;
+	}
+
+	@Override
+	public void parse(InputStream in, IFrameWriter writer)
+			throws HyracksDataException {
+
+		appender.reset(frame, true);
+		IDataParser parser = getDataParser();
+		try {
+			parser.initialize(in, recType, true);
+			while (continueIngestion) {
+				tb.reset();
+				if (!parser.parse(tb.getDataOutput())) {
+					break;
+				}
+				tb.addFieldEndOffset();
+				if (delayConfigured) {
+					Thread.sleep(interTupleInterval);
+				}
+				addTupleToFrame(writer);
+				tupleCount++;
+				if (failureConfigured) {
+					if (tupleCount > failAfterNRecords) {
+						throw new HyracksDataException(
+								" inject failure at tuple no " + tupleCount);
+					}
+				}
+			}
+			if (appender.getTupleCount() > 0) {
+				FrameUtils.flushFrame(frame, writer);
+			}
+		} catch (AsterixException ae) {
+			throw new HyracksDataException(ae);
+		} catch (IOException ioe) {
+			throw new HyracksDataException(ioe);
+		} catch (InterruptedException ie) {
+			throw new HyracksDataException(ie);
+		}
+	}
 }
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapter.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapter.java
new file mode 100644
index 0000000..f66d9e1
--- /dev/null
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapter.java
@@ -0,0 +1,223 @@
+package edu.uci.ics.asterix.tools.external.data;
+
+import java.util.List;
+import java.util.Map;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ARecordSerializerDeserializer;
+import edu.uci.ics.asterix.external.dataset.adapter.IPullBasedFeedClient;
+import edu.uci.ics.asterix.external.dataset.adapter.PullBasedAdapter;
+import edu.uci.ics.asterix.external.dataset.adapter.PullBasedFeedClient;
+import edu.uci.ics.asterix.om.base.AMutableDateTime;
+import edu.uci.ics.asterix.om.base.AMutableInt32;
+import edu.uci.ics.asterix.om.base.AMutablePoint;
+import edu.uci.ics.asterix.om.base.AMutableRecord;
+import edu.uci.ics.asterix.om.base.AMutableString;
+import edu.uci.ics.asterix.om.base.AMutableUnorderedList;
+import edu.uci.ics.asterix.om.base.IAObject;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.AUnorderedListType;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.tools.external.data.DataGenerator.InitializationInfo;
+import edu.uci.ics.asterix.tools.external.data.DataGenerator.Message;
+import edu.uci.ics.asterix.tools.external.data.DataGenerator.TweetMessage;
+import edu.uci.ics.asterix.tools.external.data.DataGenerator.TweetMessageIterator;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+
+/**
+ * TPS can be configured between 1 and 20,000  
+ * @author ramang
+ *
+ */
+public class SyntheticTwitterFeedAdapter extends PullBasedAdapter {
+
+    private static final long serialVersionUID = 1L;
+    private Map<String, String> configuration;
+
+    public SyntheticTwitterFeedAdapter(Map<String, String> configuration) throws AsterixException {
+        this.configuration = configuration;
+
+        String[] userFieldNames = new String[] { "screen-name", "lang", "friends_count", "statuses_count", "name",
+                "followers_count" };
+
+        IAType[] userFieldTypes = new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.AINT32,
+                BuiltinType.AINT32, BuiltinType.ASTRING, BuiltinType.AINT32 };
+        ARecordType userRecordType = new ARecordType("TwitterUserType", userFieldNames, userFieldTypes, false);
+
+        String[] fieldNames = new String[] { "tweetid", "user", "sender-location", "send-time", "referred-topics",
+                "message-text" };
+
+        AUnorderedListType unorderedListType = new AUnorderedListType(BuiltinType.ASTRING, "referred-topics");
+        IAType[] fieldTypes = new IAType[] { BuiltinType.ASTRING, userRecordType, BuiltinType.APOINT,
+                BuiltinType.ADATETIME, unorderedListType, BuiltinType.ASTRING };
+        adapterOutputType = new ARecordType("TweetMessageType", fieldNames, fieldTypes, false);
+    }
+
+    @Override
+    public AdapterType getAdapterType() {
+        return AdapterType.READ;
+    }
+
+    @Override
+    public void configure(Map<String, String> configuration) throws Exception {
+        this.configuration = configuration;
+
+    }
+
+    @Override
+    public void initialize(IHyracksTaskContext ctx) throws Exception {
+        this.ctx = ctx;
+    }
+
+    @Override
+    public IPullBasedFeedClient getFeedClient(int partition) throws Exception {
+        return new OkSyntheticTwitterFeedClient(configuration, adapterOutputType, partition);
+    }
+
+    @Override
+    public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
+        return new AlgebricksCountPartitionConstraint(1);
+    }
+
+    private static class OkSyntheticTwitterFeedClient extends PullBasedFeedClient implements IPullBasedFeedClient {
+
+        private static final Logger LOGGER = Logger.getLogger(OkSyntheticTwitterFeedClient.class.getName());
+
+        public static final String KEY_DURATION = "duration";
+        public static final String KEY_TPS = "tps";
+
+        private int duration;
+        private long tweetInterval;
+        private int numTweetsBeforeDelay;
+        private TweetMessageIterator tweetIterator = null;
+
+        private IAObject[] mutableFields;
+        private ARecordType outputRecordType;
+        private int partition;
+        private int tweetCount = 0;
+
+        public OkSyntheticTwitterFeedClient(Map<String, String> configuration, ARecordType outputRecordType,
+                int partition) throws AsterixException {
+            this.outputRecordType = outputRecordType;
+            String value = configuration.get(KEY_DURATION);
+            duration = value != null ? Integer.parseInt(value) : 60;
+            initializeTweetRate(configuration.get(KEY_TPS));
+            InitializationInfo info = new InitializationInfo();
+            info.timeDurationInSecs = duration;
+            DataGenerator.initialize(info);
+            tweetIterator = new TweetMessageIterator(duration);
+            initialize();
+        }
+
+        private void initializeTweetRate(String tps) {
+            numTweetsBeforeDelay = 0;
+            if (tps == null) {
+                tweetInterval = 0;
+            } else {
+                int val = Integer.parseInt(tps);
+                double interval = new Double(((double) 1000 / val));
+                if (interval > 1) {
+                    tweetInterval = (long) interval;
+                    numTweetsBeforeDelay = 1;
+                } else {
+                    tweetInterval = 1;
+                    Double numTweets = new Double(1 / interval);
+                    if (numTweets.intValue() != numTweets) {
+                        tweetInterval = 10;
+                        numTweetsBeforeDelay = (new Double(10 * numTweets * 1.4)).intValue();
+                    } else {
+                        numTweetsBeforeDelay = new Double((numTweets * 1.4)).intValue();
+                    }
+                }
+            }
+
+        }
+
+        private void writeTweet(TweetMessage next) {
+
+            //tweet id
+            ((AMutableString) mutableFields[0]).setValue(next.getTweetid());
+            mutableRecord.setValueAtPos(0, mutableFields[0]);
+
+            // user 
+            AMutableRecord userRecord = ((AMutableRecord) mutableFields[1]);
+            ((AMutableString) userRecord.getValueByPos(0)).setValue(next.getUser().getScreenName());
+            ((AMutableString) userRecord.getValueByPos(1)).setValue("en");
+            ((AMutableInt32) userRecord.getValueByPos(2)).setValue(next.getUser().getFriendsCount());
+            ((AMutableInt32) userRecord.getValueByPos(3)).setValue(next.getUser().getStatusesCount());
+            ((AMutableString) userRecord.getValueByPos(4)).setValue(next.getUser().getName());
+            ((AMutableInt32) userRecord.getValueByPos(5)).setValue(next.getUser().getFollowersCount());
+            mutableRecord.setValueAtPos(1, userRecord);
+
+            // location
+            ((AMutablePoint) mutableFields[2]).setValue(next.getSenderLocation().getLatitude(), next
+                    .getSenderLocation().getLongitude());
+            mutableRecord.setValueAtPos(2, mutableFields[2]);
+
+            // time
+            ((AMutableDateTime) mutableFields[3]).setValue(next.getSendTime().getChrononTime());
+            mutableRecord.setValueAtPos(3, mutableFields[3]);
+
+            // referred topics
+            ((AMutableUnorderedList) mutableFields[4]).clear();
+            List<String> referredTopics = next.getReferredTopics();
+            for (String topic : referredTopics) {
+                ((AMutableUnorderedList) mutableFields[4]).add(new AMutableString(topic));
+            }
+            mutableRecord.setValueAtPos(4, mutableFields[4]);
+
+            // text
+            Message m = next.getMessageText();
+            char[] content = m.getMessage();
+            ((AMutableString) mutableFields[5]).setValue(new String(content, 0, m.getLength()));
+            mutableRecord.setValueAtPos(5, mutableFields[5]);
+
+        }
+
+        @Override
+        public void resetOnFailure(Exception e) throws AsterixException {
+            // TODO Auto-generated method stub
+
+        }
+
+        @Override
+        public boolean alter(Map<String, String> configuration) {
+            // TODO Auto-generated method stub
+            return false;
+        }
+
+        @Override
+        public InflowState setNextRecord() throws Exception {
+            boolean moreData = tweetIterator.hasNext();
+            if (!moreData) {
+                return InflowState.NO_MORE_DATA;
+            }
+            writeTweet(tweetIterator.next());
+            if (tweetInterval != 0) {
+                tweetCount++;
+                if (tweetCount == numTweetsBeforeDelay) {
+                    Thread.sleep(tweetInterval);
+                    tweetCount = 0;
+                }
+            }
+            return InflowState.DATA_AVAILABLE;
+        }
+
+        private void initialize() throws AsterixException {
+            ARecordType userRecordType = (ARecordType) outputRecordType.getFieldTypes()[1];
+            IAObject[] userMutableFields = new IAObject[] { new AMutableString(""), new AMutableString(""),
+                    new AMutableInt32(0), new AMutableInt32(0), new AMutableString(""), new AMutableInt32(0) };
+            AUnorderedListType unorderedListType = new AUnorderedListType(BuiltinType.ASTRING, "referred-topics");
+            mutableFields = new IAObject[] { new AMutableString(""),
+                    new AMutableRecord(userRecordType, userMutableFields), new AMutablePoint(0, 0),
+                    new AMutableDateTime(0), new AMutableUnorderedList(unorderedListType), new AMutableString("") };
+            recordSerDe = new ARecordSerializerDeserializer(outputRecordType);
+            mutableRecord = new AMutableRecord(outputRecordType, mutableFields);
+
+        }
+    }
+}
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapterFactory.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapterFactory.java
new file mode 100644
index 0000000..5883a62
--- /dev/null
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapterFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.tools.external.data;
+
+import java.util.Map;
+
+import edu.uci.ics.asterix.external.adapter.factory.ITypedDatasetAdapterFactory;
+import edu.uci.ics.asterix.external.dataset.adapter.IDatasourceAdapter;
+
+/**
+ * Factory class for creating @see{RateControllerFileSystemBasedAdapter} The
+ * adapter simulates a feed from the contents of a source file. The file can be
+ * on the local file system or on HDFS. The feed ends when the content of the
+ * source file has been ingested.
+ */
+public class SyntheticTwitterFeedAdapterFactory implements ITypedDatasetAdapterFactory {
+
+    @Override
+    public String getName() {
+        return "synthetic_twitter_feed";
+    }
+
+    @Override
+    public IDatasourceAdapter createAdapter(Map<String, String> configuration) throws Exception {
+        return new SyntheticTwitterFeedAdapter(configuration);
+    }
+
+}
\ No newline at end of file