added push based Twitter firehose adapter
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/IPushBasedFeedClient.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/IPushBasedFeedClient.java
new file mode 100644
index 0000000..da49240
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/IPushBasedFeedClient.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.dataset.adapter;
+
+import java.io.InputStream;
+import java.util.Map;
+
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+
+public interface IPushBasedFeedClient {
+
+    /**
+     * @return
+     * @throws AsterixException
+     */
+    public InputStream getInputStream() throws AsterixException;
+
+    /**
+     * Provides logic for any corrective action that feed client needs to execute on
+     * encountering an exception.
+     * 
+     * @param e
+     *            The exception encountered during fetching of data from external source
+     * @throws AsterixException
+     */
+    public void resetOnFailure(Exception e) throws AsterixException;
+
+    /**
+     * @param configuration
+     */
+    public boolean alter(Map<String, Object> configuration);
+
+    public void stop();
+
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/StreamBasedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/StreamBasedAdapter.java
new file mode 100644
index 0000000..e17db67
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/StreamBasedAdapter.java
@@ -0,0 +1,36 @@
+package edu.uci.ics.asterix.external.dataset.adapter;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.dataflow.std.file.ITupleParser;
+import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
+
+public abstract class StreamBasedAdapter implements IDatasourceAdapter {
+
+    private static final long serialVersionUID = 1L;
+
+    public static final String NODE_RESOLVER_FACTORY_PROPERTY = "node.Resolver";
+
+    public abstract InputStream getInputStream(int partition) throws IOException;
+
+    protected final ITupleParser tupleParser;
+    protected final IAType sourceDatatype;
+    protected IHyracksTaskContext ctx;
+
+    public StreamBasedAdapter(ITupleParserFactory parserFactory, IAType sourceDatatype, IHyracksTaskContext ctx) {
+        this.tupleParser = parserFactory.createTupleParser(ctx);
+        this.sourceDatatype = sourceDatatype;
+        this.ctx = ctx;
+    }
+
+    @Override
+    public void start(int partition, IFrameWriter writer) throws Exception {
+        InputStream in = getInputStream(partition);
+        tupleParser.parse(in, writer);
+    }
+}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
index 553abf9..50fb050 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -121,8 +121,7 @@
                 MetadataPrimaryIndexes.INDEX_DATASET, MetadataPrimaryIndexes.NODE_DATASET,
                 MetadataPrimaryIndexes.NODEGROUP_DATASET, MetadataPrimaryIndexes.FUNCTION_DATASET,
                 MetadataPrimaryIndexes.DATASOURCE_ADAPTER_DATASET, MetadataPrimaryIndexes.FEED_ACTIVITY_DATASET,
-                MetadataPrimaryIndexes.FEED_POLICY_DATASET, MetadataPrimaryIndexes.DATASOURCE_ADAPTER_DATASET,
-                MetadataPrimaryIndexes.LIBRARY_DATASET };
+                MetadataPrimaryIndexes.FEED_POLICY_DATASET, MetadataPrimaryIndexes.LIBRARY_DATASET };
         secondaryIndexes = new IMetadataIndex[] { MetadataSecondaryIndexes.GROUPNAME_ON_DATASET_INDEX,
                 MetadataSecondaryIndexes.DATATYPENAME_ON_DATASET_INDEX,
                 MetadataSecondaryIndexes.DATATYPENAME_ON_DATATYPE_INDEX };
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapterFactory.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapterFactory.java
index 036cb1c..92dc394 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapterFactory.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapterFactory.java
@@ -19,15 +19,12 @@
 import java.util.Random;
 import java.util.Set;
 
-import edu.uci.ics.asterix.common.exceptions.ACIDException;
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
 import edu.uci.ics.asterix.metadata.MetadataManager;
 import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
 import edu.uci.ics.asterix.metadata.entities.Dataset;
 import edu.uci.ics.asterix.metadata.entities.FeedDatasetDetails;
 import edu.uci.ics.asterix.metadata.entities.NodeGroup;
-import edu.uci.ics.asterix.metadata.feeds.IAdapterFactory.AdapterType;
-import edu.uci.ics.asterix.metadata.feeds.IAdapterFactory.SupportedOperation;
 import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
 import edu.uci.ics.asterix.metadata.feeds.ITypedAdapterFactory;
 import edu.uci.ics.asterix.om.types.ARecordType;
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TweetGenerator.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TweetGenerator.java
new file mode 100644
index 0000000..7b17923
--- /dev/null
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TweetGenerator.java
@@ -0,0 +1,215 @@
+package edu.uci.ics.asterix.tools.external.data;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ARecordSerializerDeserializer;
+import edu.uci.ics.asterix.external.dataset.adapter.IPullBasedFeedClient;
+import edu.uci.ics.asterix.external.dataset.adapter.PullBasedFeedClient;
+import edu.uci.ics.asterix.om.base.AMutableDateTime;
+import edu.uci.ics.asterix.om.base.AMutableInt32;
+import edu.uci.ics.asterix.om.base.AMutablePoint;
+import edu.uci.ics.asterix.om.base.AMutableRecord;
+import edu.uci.ics.asterix.om.base.AMutableString;
+import edu.uci.ics.asterix.om.base.AMutableUnorderedList;
+import edu.uci.ics.asterix.om.base.IAObject;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.AUnorderedListType;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.tools.external.data.DataGenerator.InitializationInfo;
+import edu.uci.ics.asterix.tools.external.data.DataGenerator.Message;
+import edu.uci.ics.asterix.tools.external.data.DataGenerator.TweetMessage;
+import edu.uci.ics.asterix.tools.external.data.DataGenerator.TweetMessageIterator;
+
+public class TweetGenerator extends PullBasedFeedClient implements IPullBasedFeedClient {
+
+    private static final Logger LOGGER = Logger.getLogger(TweetGenerator.class.getName());
+
+    public static final String KEY_DURATION = "duration";
+    public static final String KEY_TPS = "tps";
+    public static final String KEY_EXCEPTION_PERIOD = "exception-period";
+    public static final String OUTPUT_FORMAT = "output-format";
+
+    public static final String OUTPUT_FORMAT_ARECORD = "arecord";
+    public static final String OUTPUT_FORMAT_ADM_STRING = "adm-string";
+
+    private int duration;
+    private long tweetInterval;
+    private int numTweetsBeforeDelay;
+    private TweetMessageIterator tweetIterator = null;
+    private long exeptionInterval;
+
+    private IAObject[] mutableFields;
+    private ARecordType outputRecordType;
+    private int partition;
+    private int tweetCount = 0;
+    private int tweetCountBeforeException = 0;
+    private int exceptionPeriod = -1;
+    private boolean isOutputFormatRecord = false;
+    private byte[] EOL = "\n".getBytes();
+    private OutputStream os;
+
+    public TweetGenerator(Map<String, Object> configuration, ARecordType outputRecordType, int partition, String format)
+            throws AsterixException {
+        this.outputRecordType = outputRecordType;
+        String value = (String) configuration.get(KEY_DURATION);
+        duration = value != null ? Integer.parseInt(value) : 60;
+        initializeTweetRate((String) configuration.get(KEY_TPS));
+        value = (String) configuration.get(KEY_EXCEPTION_PERIOD);
+        if (value != null) {
+            exceptionPeriod = Integer.parseInt(value);
+        }
+        isOutputFormatRecord = format.equalsIgnoreCase(OUTPUT_FORMAT_ARECORD);
+        InitializationInfo info = new InitializationInfo();
+        info.timeDurationInSecs = duration;
+        DataGenerator.initialize(info);
+        tweetIterator = new TweetMessageIterator(duration);
+        initialize();
+    }
+
+    private void initializeTweetRate(String tps) {
+        numTweetsBeforeDelay = 0;
+        if (tps == null) {
+            tweetInterval = 0;
+        } else {
+            int val = Integer.parseInt(tps);
+            double interval = new Double(((double) 1000 / val));
+            if (interval > 1) {
+                tweetInterval = (long) interval;
+                numTweetsBeforeDelay = 1;
+            } else {
+                tweetInterval = 1;
+                Double numTweets = new Double(1 / interval);
+                if (numTweets.intValue() != numTweets) {
+                    tweetInterval = 10;
+                    numTweetsBeforeDelay = (new Double(10 * numTweets * 1.4)).intValue();
+                } else {
+                    numTweetsBeforeDelay = new Double((numTweets * 1.4)).intValue();
+                }
+            }
+        }
+
+    }
+
+    private void writeTweetString(TweetMessage next) throws IOException {
+        String tweet = next.toString();
+        os.write(tweet.getBytes());
+        os.write(EOL);
+        LOGGER.info(tweet);
+    }
+
+    private void writeTweetRecord(TweetMessage next) {
+
+        //tweet id
+        LOGGER.info("Generating next tweet");
+
+        ((AMutableString) mutableFields[0]).setValue(next.getTweetid());
+        mutableRecord.setValueAtPos(0, mutableFields[0]);
+
+        // user 
+        AMutableRecord userRecord = ((AMutableRecord) mutableFields[1]);
+        ((AMutableString) userRecord.getValueByPos(0)).setValue(next.getUser().getScreenName());
+        ((AMutableString) userRecord.getValueByPos(1)).setValue("en");
+        ((AMutableInt32) userRecord.getValueByPos(2)).setValue(next.getUser().getFriendsCount());
+        ((AMutableInt32) userRecord.getValueByPos(3)).setValue(next.getUser().getStatusesCount());
+        ((AMutableString) userRecord.getValueByPos(4)).setValue(next.getUser().getName());
+        ((AMutableInt32) userRecord.getValueByPos(5)).setValue(next.getUser().getFollowersCount());
+        mutableRecord.setValueAtPos(1, userRecord);
+
+        // location
+        ((AMutablePoint) mutableFields[2]).setValue(next.getSenderLocation().getLatitude(), next.getSenderLocation()
+                .getLongitude());
+        mutableRecord.setValueAtPos(2, mutableFields[2]);
+
+        // time
+        ((AMutableDateTime) mutableFields[3]).setValue(next.getSendTime().getChrononTime());
+        mutableRecord.setValueAtPos(3, mutableFields[3]);
+
+        // referred topics
+        ((AMutableUnorderedList) mutableFields[4]).clear();
+        List<String> referredTopics = next.getReferredTopics();
+        for (String topic : referredTopics) {
+            ((AMutableUnorderedList) mutableFields[4]).add(new AMutableString(topic));
+        }
+        mutableRecord.setValueAtPos(4, mutableFields[4]);
+
+        // text
+        Message m = next.getMessageText();
+        char[] content = m.getMessage();
+        String tweetText = new String(content, 0, m.getLength());
+        ((AMutableString) mutableFields[5]).setValue(tweetText);
+        mutableRecord.setValueAtPos(5, mutableFields[5]);
+        LOGGER.info(tweetText);
+
+    }
+
+    @Override
+    public void resetOnFailure(Exception e) throws AsterixException {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public boolean alter(Map<String, Object> configuration) {
+        // TODO Auto-generated method stub
+        return false;
+    }
+
+    @Override
+    public InflowState setNextRecord() throws Exception {
+        LOGGER.info("requesting next tweet");
+        boolean moreData = tweetIterator.hasNext();
+        if (!moreData) {
+            return InflowState.NO_MORE_DATA;
+        }
+        LOGGER.info("writing next tweet");
+        TweetMessage msg = tweetIterator.next();
+        if (isOutputFormatRecord) {
+            writeTweetRecord(msg);
+        } else {
+            writeTweetString(msg);
+        }
+        if (tweetInterval != 0) {
+            tweetCount++;
+            if (tweetCount == numTweetsBeforeDelay) {
+                Thread.sleep(tweetInterval);
+                tweetCount = 0;
+            }
+        }
+        tweetCountBeforeException++;
+
+        if (tweetCountBeforeException == exceptionPeriod) {
+            tweetCountBeforeException = 0;
+            throw new AsterixException("Delibrate exception");
+        }
+        return InflowState.DATA_AVAILABLE;
+    }
+
+    private void initialize() throws AsterixException {
+        ARecordType userRecordType = (ARecordType) outputRecordType.getFieldTypes()[1];
+        IAObject[] userMutableFields = new IAObject[] { new AMutableString(""), new AMutableString(""),
+                new AMutableInt32(0), new AMutableInt32(0), new AMutableString(""), new AMutableInt32(0) };
+        AUnorderedListType unorderedListType = new AUnorderedListType(BuiltinType.ASTRING, "referred-topics");
+        mutableFields = new IAObject[] { new AMutableString(""), new AMutableRecord(userRecordType, userMutableFields),
+                new AMutablePoint(0, 0), new AMutableDateTime(0), new AMutableUnorderedList(unorderedListType),
+                new AMutableString("") };
+        recordSerDe = new ARecordSerializerDeserializer(outputRecordType);
+        mutableRecord = new AMutableRecord(outputRecordType, mutableFields);
+
+    }
+
+    @Override
+    public void stop() {
+        // TODO Auto-generated method stub
+
+    }
+
+    public void setOutputStream(OutputStream os) {
+        this.os = os;
+    }
+}
\ No newline at end of file
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapter.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapter.java
new file mode 100644
index 0000000..eef551d
--- /dev/null
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapter.java
@@ -0,0 +1,153 @@
+package edu.uci.ics.asterix.tools.external.data;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.UnknownHostException;
+import java.util.Map;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.external.dataset.adapter.IPullBasedFeedClient.InflowState;
+import edu.uci.ics.asterix.external.dataset.adapter.StreamBasedAdapter;
+import edu.uci.ics.asterix.metadata.feeds.IFeedAdapter;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
+
+/**
+ * TPS can be configured between 1 and 20,000
+ * 
+ * @author ramang
+ */
+public class TwitterFirehoseFeedAdapter extends StreamBasedAdapter implements IFeedAdapter {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOGGER = Logger.getLogger(TwitterFirehoseFeedAdapter.class.getName());
+
+    private Map<String, Object> configuration;
+
+    private TweetGenerator twitterFeedClient;
+
+    private final TwitterServer twitterServer;
+
+    private TwitterClient twitterClient;
+
+    private static final String LOCALHOST = "127.0.0.1";
+    private static final int PORT = 2909;
+
+    public TwitterFirehoseFeedAdapter(Map<String, Object> configuration, ITupleParserFactory parserFactory,
+            ARecordType outputtype, IHyracksTaskContext ctx) throws AsterixException, IOException {
+        super(parserFactory, outputtype, ctx);
+        this.configuration = configuration;
+        this.twitterFeedClient = new TweetGenerator(configuration, outputtype, 0,
+                TweetGenerator.OUTPUT_FORMAT_ADM_STRING);
+        this.twitterServer = new TwitterServer(configuration, outputtype);
+        this.twitterClient = new TwitterClient(PORT);
+
+    }
+
+    @Override
+    public void start(int partition, IFrameWriter writer) throws Exception {
+        twitterServer.start();
+        twitterClient.start();
+        super.start(partition, writer);
+    }
+
+    @Override
+    public InputStream getInputStream(int partition) throws IOException {
+        return twitterClient.getInputStream();
+    }
+
+    private static class TwitterServer {
+        private final ServerSocket serverSocket;
+        private final Listener listener;
+
+        public TwitterServer(Map<String, Object> configuration, ARecordType outputtype) throws IOException,
+                AsterixException {
+            serverSocket = new ServerSocket(PORT);
+            listener = new Listener(serverSocket, configuration, outputtype);
+        }
+
+        public void start() {
+            Thread t = new Thread(listener);
+            t.start();
+        }
+
+    }
+
+    private static class TwitterClient {
+
+        private Socket socket;
+        private int port;
+
+        public TwitterClient(int port) throws UnknownHostException, IOException {
+            this.port = port;
+        }
+
+        public InputStream getInputStream() throws IOException {
+            return socket.getInputStream();
+        }
+
+        public void start() throws UnknownHostException, IOException {
+            socket = new Socket(LOCALHOST, port);
+        }
+    }
+
+    private static class Listener implements Runnable {
+
+        private final ServerSocket serverSocket;
+        private Socket socket;
+        private TweetGenerator tweetGenerator;
+
+        public Listener(ServerSocket serverSocket, Map<String, Object> configuration, ARecordType outputtype)
+                throws IOException, AsterixException {
+            this.serverSocket = serverSocket;
+            this.tweetGenerator = new TweetGenerator(configuration, outputtype, 0,
+                    TweetGenerator.OUTPUT_FORMAT_ADM_STRING);
+        }
+
+        @Override
+        public void run() {
+            while (true) {
+                InflowState state = InflowState.DATA_AVAILABLE;
+                try {
+                    socket = serverSocket.accept();
+                    OutputStream os = socket.getOutputStream();
+                    tweetGenerator.setOutputStream(os);
+                    while (state.equals(InflowState.DATA_AVAILABLE)) {
+                        state = tweetGenerator.setNextRecord();
+                    }
+                    os.close();
+                    break;
+                } catch (Exception e) {
+
+                } finally {
+                    try {
+                        socket.close();
+                    } catch (IOException e) {
+                        e.printStackTrace();
+                    }
+
+                }
+            }
+        }
+
+    }
+
+    @Override
+    public void stop() throws Exception {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public void alter(Map<String, Object> properties) {
+        // TODO Auto-generated method stub
+
+    }
+}
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapterFactory.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapterFactory.java
new file mode 100644
index 0000000..11c9172
--- /dev/null
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapterFactory.java
@@ -0,0 +1,142 @@
+/*
+x * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.tools.external.data;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.external.adapter.factory.FileSystemAdapterFactory;
+import edu.uci.ics.asterix.metadata.MetadataManager;
+import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
+import edu.uci.ics.asterix.metadata.entities.Dataset;
+import edu.uci.ics.asterix.metadata.entities.FeedDatasetDetails;
+import edu.uci.ics.asterix.metadata.entities.NodeGroup;
+import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
+import edu.uci.ics.asterix.metadata.feeds.ITypedAdapterFactory;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.AUnorderedListType;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+
+/**
+ * Factory class for creating @see{RateControllerFileSystemBasedAdapter} The
+ * adapter simulates a feed from the contents of a source file. The file can be
+ * on the local file system or on HDFS. The feed ends when the content of the
+ * source file has been ingested.
+ */
+public class TwitterFirehoseFeedAdapterFactory extends FileSystemAdapterFactory implements ITypedAdapterFactory {
+
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 1L;
+
+    private static final String KEY_DATAVERSE_DATASET = "dataverse-dataset";
+
+    private static final ARecordType outputType = initOutputType();
+
+    @Override
+    public String getName() {
+        return "twitter_firehose_feed";
+    }
+
+    @Override
+    public AdapterType getAdapterType() {
+        return AdapterType.TYPED;
+    }
+
+    @Override
+    public SupportedOperation getSupportedOperations() {
+        return SupportedOperation.READ;
+    }
+
+    @Override
+    public void configure(Map<String, Object> configuration) throws Exception {
+        this.configuration = configuration;
+        configuration.put(KEY_FORMAT, FORMAT_ADM);
+        this.configureFormat(initOutputType());
+    }
+
+    @Override
+    public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
+        String dvds = (String) configuration.get(KEY_DATAVERSE_DATASET);
+        String[] components = dvds.split(":");
+        String dataverse = components[0];
+        String dataset = components[1];
+        MetadataTransactionContext ctx = null;
+        NodeGroup ng = null;
+        try {
+            ctx = MetadataManager.INSTANCE.beginTransaction();
+            Dataset ds = MetadataManager.INSTANCE.getDataset(ctx, dataverse, dataset);
+            String nodegroupName = ((FeedDatasetDetails) ds.getDatasetDetails()).getNodeGroupName();
+            ng = MetadataManager.INSTANCE.getNodegroup(ctx, nodegroupName);
+            MetadataManager.INSTANCE.commitTransaction(ctx);
+        } catch (Exception e) {
+            MetadataManager.INSTANCE.abortTransaction(ctx);
+            throw e;
+        }
+        List<String> storageNodes = ng.getNodeNames();
+        Set<String> nodes = AsterixAppContextInfo.getInstance().getMetadataProperties().getNodeNames();
+        String ingestionLocation = null;
+        if (nodes.size() > storageNodes.size()) {
+            nodes.removeAll(storageNodes);
+        }
+        String[] nodesArray = nodes.toArray(new String[] {});
+        Random r = new Random();
+        ingestionLocation = nodesArray[r.nextInt(nodes.size())];
+        return new AlgebricksAbsolutePartitionConstraint(new String[] { ingestionLocation });
+    }
+
+    @Override
+    public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx) throws Exception {
+        return new TwitterFirehoseFeedAdapter(configuration, parserFactory, outputType, ctx);
+    }
+
+    @Override
+    public ARecordType getAdapterOutputType() {
+        return outputType;
+    }
+
+    private static ARecordType initOutputType() {
+        ARecordType outputType = null;
+        try {
+            String[] userFieldNames = new String[] { "screen-name", "lang", "friends_count", "statuses_count", "name",
+                    "followers_count" };
+
+            IAType[] userFieldTypes = new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.AINT32,
+                    BuiltinType.AINT32, BuiltinType.ASTRING, BuiltinType.AINT32 };
+            ARecordType userRecordType = new ARecordType("TwitterUserType", userFieldNames, userFieldTypes, false);
+
+            String[] fieldNames = new String[] { "tweetid", "user", "sender-location", "send-time", "referred-topics",
+                    "message-text" };
+
+            AUnorderedListType unorderedListType = new AUnorderedListType(BuiltinType.ASTRING, "referred-topics");
+            IAType[] fieldTypes = new IAType[] { BuiltinType.ASTRING, userRecordType, BuiltinType.APOINT,
+                    BuiltinType.ADATETIME, unorderedListType, BuiltinType.ASTRING };
+            outputType = new ARecordType("TweetMessageType", fieldNames, fieldTypes, false);
+
+        } catch (AsterixException e) {
+            throw new IllegalStateException("Unable to initialize output type");
+        }
+        return outputType;
+    }
+}
\ No newline at end of file