added push based Twitter firehose adapter
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/IPushBasedFeedClient.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/IPushBasedFeedClient.java
new file mode 100644
index 0000000..da49240
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/IPushBasedFeedClient.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.dataset.adapter;
+
+import java.io.InputStream;
+import java.util.Map;
+
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+
+public interface IPushBasedFeedClient {
+
+ /**
+ * @return
+ * @throws AsterixException
+ */
+ public InputStream getInputStream() throws AsterixException;
+
+ /**
+ * Provides logic for any corrective action that feed client needs to execute on
+ * encountering an exception.
+ *
+ * @param e
+ * The exception encountered during fetching of data from external source
+ * @throws AsterixException
+ */
+ public void resetOnFailure(Exception e) throws AsterixException;
+
+ /**
+ * @param configuration
+ */
+ public boolean alter(Map<String, Object> configuration);
+
+ public void stop();
+
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/StreamBasedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/StreamBasedAdapter.java
new file mode 100644
index 0000000..e17db67
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/StreamBasedAdapter.java
@@ -0,0 +1,36 @@
+package edu.uci.ics.asterix.external.dataset.adapter;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.dataflow.std.file.ITupleParser;
+import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
+
+public abstract class StreamBasedAdapter implements IDatasourceAdapter {
+
+ private static final long serialVersionUID = 1L;
+
+ public static final String NODE_RESOLVER_FACTORY_PROPERTY = "node.Resolver";
+
+ public abstract InputStream getInputStream(int partition) throws IOException;
+
+ protected final ITupleParser tupleParser;
+ protected final IAType sourceDatatype;
+ protected IHyracksTaskContext ctx;
+
+ public StreamBasedAdapter(ITupleParserFactory parserFactory, IAType sourceDatatype, IHyracksTaskContext ctx) {
+ this.tupleParser = parserFactory.createTupleParser(ctx);
+ this.sourceDatatype = sourceDatatype;
+ this.ctx = ctx;
+ }
+
+ @Override
+ public void start(int partition, IFrameWriter writer) throws Exception {
+ InputStream in = getInputStream(partition);
+ tupleParser.parse(in, writer);
+ }
+}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
index 553abf9..50fb050 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -121,8 +121,7 @@
MetadataPrimaryIndexes.INDEX_DATASET, MetadataPrimaryIndexes.NODE_DATASET,
MetadataPrimaryIndexes.NODEGROUP_DATASET, MetadataPrimaryIndexes.FUNCTION_DATASET,
MetadataPrimaryIndexes.DATASOURCE_ADAPTER_DATASET, MetadataPrimaryIndexes.FEED_ACTIVITY_DATASET,
- MetadataPrimaryIndexes.FEED_POLICY_DATASET, MetadataPrimaryIndexes.DATASOURCE_ADAPTER_DATASET,
- MetadataPrimaryIndexes.LIBRARY_DATASET };
+ MetadataPrimaryIndexes.FEED_POLICY_DATASET, MetadataPrimaryIndexes.LIBRARY_DATASET };
secondaryIndexes = new IMetadataIndex[] { MetadataSecondaryIndexes.GROUPNAME_ON_DATASET_INDEX,
MetadataSecondaryIndexes.DATATYPENAME_ON_DATASET_INDEX,
MetadataSecondaryIndexes.DATATYPENAME_ON_DATATYPE_INDEX };
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapterFactory.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapterFactory.java
index 036cb1c..92dc394 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapterFactory.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapterFactory.java
@@ -19,15 +19,12 @@
import java.util.Random;
import java.util.Set;
-import edu.uci.ics.asterix.common.exceptions.ACIDException;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
import edu.uci.ics.asterix.metadata.MetadataManager;
import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
import edu.uci.ics.asterix.metadata.entities.Dataset;
import edu.uci.ics.asterix.metadata.entities.FeedDatasetDetails;
import edu.uci.ics.asterix.metadata.entities.NodeGroup;
-import edu.uci.ics.asterix.metadata.feeds.IAdapterFactory.AdapterType;
-import edu.uci.ics.asterix.metadata.feeds.IAdapterFactory.SupportedOperation;
import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
import edu.uci.ics.asterix.metadata.feeds.ITypedAdapterFactory;
import edu.uci.ics.asterix.om.types.ARecordType;
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TweetGenerator.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TweetGenerator.java
new file mode 100644
index 0000000..7b17923
--- /dev/null
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TweetGenerator.java
@@ -0,0 +1,215 @@
+package edu.uci.ics.asterix.tools.external.data;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ARecordSerializerDeserializer;
+import edu.uci.ics.asterix.external.dataset.adapter.IPullBasedFeedClient;
+import edu.uci.ics.asterix.external.dataset.adapter.PullBasedFeedClient;
+import edu.uci.ics.asterix.om.base.AMutableDateTime;
+import edu.uci.ics.asterix.om.base.AMutableInt32;
+import edu.uci.ics.asterix.om.base.AMutablePoint;
+import edu.uci.ics.asterix.om.base.AMutableRecord;
+import edu.uci.ics.asterix.om.base.AMutableString;
+import edu.uci.ics.asterix.om.base.AMutableUnorderedList;
+import edu.uci.ics.asterix.om.base.IAObject;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.AUnorderedListType;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.tools.external.data.DataGenerator.InitializationInfo;
+import edu.uci.ics.asterix.tools.external.data.DataGenerator.Message;
+import edu.uci.ics.asterix.tools.external.data.DataGenerator.TweetMessage;
+import edu.uci.ics.asterix.tools.external.data.DataGenerator.TweetMessageIterator;
+
+public class TweetGenerator extends PullBasedFeedClient implements IPullBasedFeedClient {
+
+ private static final Logger LOGGER = Logger.getLogger(TweetGenerator.class.getName());
+
+ public static final String KEY_DURATION = "duration";
+ public static final String KEY_TPS = "tps";
+ public static final String KEY_EXCEPTION_PERIOD = "exception-period";
+ public static final String OUTPUT_FORMAT = "output-format";
+
+ public static final String OUTPUT_FORMAT_ARECORD = "arecord";
+ public static final String OUTPUT_FORMAT_ADM_STRING = "adm-string";
+
+ private int duration;
+ private long tweetInterval;
+ private int numTweetsBeforeDelay;
+ private TweetMessageIterator tweetIterator = null;
+ private long exeptionInterval;
+
+ private IAObject[] mutableFields;
+ private ARecordType outputRecordType;
+ private int partition;
+ private int tweetCount = 0;
+ private int tweetCountBeforeException = 0;
+ private int exceptionPeriod = -1;
+ private boolean isOutputFormatRecord = false;
+ private byte[] EOL = "\n".getBytes();
+ private OutputStream os;
+
+ public TweetGenerator(Map<String, Object> configuration, ARecordType outputRecordType, int partition, String format)
+ throws AsterixException {
+ this.outputRecordType = outputRecordType;
+ String value = (String) configuration.get(KEY_DURATION);
+ duration = value != null ? Integer.parseInt(value) : 60;
+ initializeTweetRate((String) configuration.get(KEY_TPS));
+ value = (String) configuration.get(KEY_EXCEPTION_PERIOD);
+ if (value != null) {
+ exceptionPeriod = Integer.parseInt(value);
+ }
+ isOutputFormatRecord = format.equalsIgnoreCase(OUTPUT_FORMAT_ARECORD);
+ InitializationInfo info = new InitializationInfo();
+ info.timeDurationInSecs = duration;
+ DataGenerator.initialize(info);
+ tweetIterator = new TweetMessageIterator(duration);
+ initialize();
+ }
+
+ private void initializeTweetRate(String tps) {
+ numTweetsBeforeDelay = 0;
+ if (tps == null) {
+ tweetInterval = 0;
+ } else {
+ int val = Integer.parseInt(tps);
+ double interval = new Double(((double) 1000 / val));
+ if (interval > 1) {
+ tweetInterval = (long) interval;
+ numTweetsBeforeDelay = 1;
+ } else {
+ tweetInterval = 1;
+ Double numTweets = new Double(1 / interval);
+ if (numTweets.intValue() != numTweets) {
+ tweetInterval = 10;
+ numTweetsBeforeDelay = (new Double(10 * numTweets * 1.4)).intValue();
+ } else {
+ numTweetsBeforeDelay = new Double((numTweets * 1.4)).intValue();
+ }
+ }
+ }
+
+ }
+
+ private void writeTweetString(TweetMessage next) throws IOException {
+ String tweet = next.toString();
+ os.write(tweet.getBytes());
+ os.write(EOL);
+ LOGGER.info(tweet);
+ }
+
+ private void writeTweetRecord(TweetMessage next) {
+
+ //tweet id
+ LOGGER.info("Generating next tweet");
+
+ ((AMutableString) mutableFields[0]).setValue(next.getTweetid());
+ mutableRecord.setValueAtPos(0, mutableFields[0]);
+
+ // user
+ AMutableRecord userRecord = ((AMutableRecord) mutableFields[1]);
+ ((AMutableString) userRecord.getValueByPos(0)).setValue(next.getUser().getScreenName());
+ ((AMutableString) userRecord.getValueByPos(1)).setValue("en");
+ ((AMutableInt32) userRecord.getValueByPos(2)).setValue(next.getUser().getFriendsCount());
+ ((AMutableInt32) userRecord.getValueByPos(3)).setValue(next.getUser().getStatusesCount());
+ ((AMutableString) userRecord.getValueByPos(4)).setValue(next.getUser().getName());
+ ((AMutableInt32) userRecord.getValueByPos(5)).setValue(next.getUser().getFollowersCount());
+ mutableRecord.setValueAtPos(1, userRecord);
+
+ // location
+ ((AMutablePoint) mutableFields[2]).setValue(next.getSenderLocation().getLatitude(), next.getSenderLocation()
+ .getLongitude());
+ mutableRecord.setValueAtPos(2, mutableFields[2]);
+
+ // time
+ ((AMutableDateTime) mutableFields[3]).setValue(next.getSendTime().getChrononTime());
+ mutableRecord.setValueAtPos(3, mutableFields[3]);
+
+ // referred topics
+ ((AMutableUnorderedList) mutableFields[4]).clear();
+ List<String> referredTopics = next.getReferredTopics();
+ for (String topic : referredTopics) {
+ ((AMutableUnorderedList) mutableFields[4]).add(new AMutableString(topic));
+ }
+ mutableRecord.setValueAtPos(4, mutableFields[4]);
+
+ // text
+ Message m = next.getMessageText();
+ char[] content = m.getMessage();
+ String tweetText = new String(content, 0, m.getLength());
+ ((AMutableString) mutableFields[5]).setValue(tweetText);
+ mutableRecord.setValueAtPos(5, mutableFields[5]);
+ LOGGER.info(tweetText);
+
+ }
+
+ @Override
+ public void resetOnFailure(Exception e) throws AsterixException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public boolean alter(Map<String, Object> configuration) {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public InflowState setNextRecord() throws Exception {
+ LOGGER.info("requesting next tweet");
+ boolean moreData = tweetIterator.hasNext();
+ if (!moreData) {
+ return InflowState.NO_MORE_DATA;
+ }
+ LOGGER.info("writing next tweet");
+ TweetMessage msg = tweetIterator.next();
+ if (isOutputFormatRecord) {
+ writeTweetRecord(msg);
+ } else {
+ writeTweetString(msg);
+ }
+ if (tweetInterval != 0) {
+ tweetCount++;
+ if (tweetCount == numTweetsBeforeDelay) {
+ Thread.sleep(tweetInterval);
+ tweetCount = 0;
+ }
+ }
+ tweetCountBeforeException++;
+
+ if (tweetCountBeforeException == exceptionPeriod) {
+ tweetCountBeforeException = 0;
+ throw new AsterixException("Delibrate exception");
+ }
+ return InflowState.DATA_AVAILABLE;
+ }
+
+ private void initialize() throws AsterixException {
+ ARecordType userRecordType = (ARecordType) outputRecordType.getFieldTypes()[1];
+ IAObject[] userMutableFields = new IAObject[] { new AMutableString(""), new AMutableString(""),
+ new AMutableInt32(0), new AMutableInt32(0), new AMutableString(""), new AMutableInt32(0) };
+ AUnorderedListType unorderedListType = new AUnorderedListType(BuiltinType.ASTRING, "referred-topics");
+ mutableFields = new IAObject[] { new AMutableString(""), new AMutableRecord(userRecordType, userMutableFields),
+ new AMutablePoint(0, 0), new AMutableDateTime(0), new AMutableUnorderedList(unorderedListType),
+ new AMutableString("") };
+ recordSerDe = new ARecordSerializerDeserializer(outputRecordType);
+ mutableRecord = new AMutableRecord(outputRecordType, mutableFields);
+
+ }
+
+ @Override
+ public void stop() {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void setOutputStream(OutputStream os) {
+ this.os = os;
+ }
+}
\ No newline at end of file
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapter.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapter.java
new file mode 100644
index 0000000..eef551d
--- /dev/null
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapter.java
@@ -0,0 +1,153 @@
+package edu.uci.ics.asterix.tools.external.data;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.UnknownHostException;
+import java.util.Map;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.external.dataset.adapter.IPullBasedFeedClient.InflowState;
+import edu.uci.ics.asterix.external.dataset.adapter.StreamBasedAdapter;
+import edu.uci.ics.asterix.metadata.feeds.IFeedAdapter;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
+
+/**
+ * TPS can be configured between 1 and 20,000
+ *
+ * @author ramang
+ */
+public class TwitterFirehoseFeedAdapter extends StreamBasedAdapter implements IFeedAdapter {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOGGER = Logger.getLogger(TwitterFirehoseFeedAdapter.class.getName());
+
+ private Map<String, Object> configuration;
+
+ private TweetGenerator twitterFeedClient;
+
+ private final TwitterServer twitterServer;
+
+ private TwitterClient twitterClient;
+
+ private static final String LOCALHOST = "127.0.0.1";
+ private static final int PORT = 2909;
+
+ public TwitterFirehoseFeedAdapter(Map<String, Object> configuration, ITupleParserFactory parserFactory,
+ ARecordType outputtype, IHyracksTaskContext ctx) throws AsterixException, IOException {
+ super(parserFactory, outputtype, ctx);
+ this.configuration = configuration;
+ this.twitterFeedClient = new TweetGenerator(configuration, outputtype, 0,
+ TweetGenerator.OUTPUT_FORMAT_ADM_STRING);
+ this.twitterServer = new TwitterServer(configuration, outputtype);
+ this.twitterClient = new TwitterClient(PORT);
+
+ }
+
+ @Override
+ public void start(int partition, IFrameWriter writer) throws Exception {
+ twitterServer.start();
+ twitterClient.start();
+ super.start(partition, writer);
+ }
+
+ @Override
+ public InputStream getInputStream(int partition) throws IOException {
+ return twitterClient.getInputStream();
+ }
+
+ private static class TwitterServer {
+ private final ServerSocket serverSocket;
+ private final Listener listener;
+
+ public TwitterServer(Map<String, Object> configuration, ARecordType outputtype) throws IOException,
+ AsterixException {
+ serverSocket = new ServerSocket(PORT);
+ listener = new Listener(serverSocket, configuration, outputtype);
+ }
+
+ public void start() {
+ Thread t = new Thread(listener);
+ t.start();
+ }
+
+ }
+
+ private static class TwitterClient {
+
+ private Socket socket;
+ private int port;
+
+ public TwitterClient(int port) throws UnknownHostException, IOException {
+ this.port = port;
+ }
+
+ public InputStream getInputStream() throws IOException {
+ return socket.getInputStream();
+ }
+
+ public void start() throws UnknownHostException, IOException {
+ socket = new Socket(LOCALHOST, port);
+ }
+ }
+
+ private static class Listener implements Runnable {
+
+ private final ServerSocket serverSocket;
+ private Socket socket;
+ private TweetGenerator tweetGenerator;
+
+ public Listener(ServerSocket serverSocket, Map<String, Object> configuration, ARecordType outputtype)
+ throws IOException, AsterixException {
+ this.serverSocket = serverSocket;
+ this.tweetGenerator = new TweetGenerator(configuration, outputtype, 0,
+ TweetGenerator.OUTPUT_FORMAT_ADM_STRING);
+ }
+
+ @Override
+ public void run() {
+ while (true) {
+ InflowState state = InflowState.DATA_AVAILABLE;
+ try {
+ socket = serverSocket.accept();
+ OutputStream os = socket.getOutputStream();
+ tweetGenerator.setOutputStream(os);
+ while (state.equals(InflowState.DATA_AVAILABLE)) {
+ state = tweetGenerator.setNextRecord();
+ }
+ os.close();
+ break;
+ } catch (Exception e) {
+
+ } finally {
+ try {
+ socket.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ }
+ }
+ }
+
+ }
+
+ @Override
+ public void stop() throws Exception {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void alter(Map<String, Object> properties) {
+ // TODO Auto-generated method stub
+
+ }
+}
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapterFactory.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapterFactory.java
new file mode 100644
index 0000000..11c9172
--- /dev/null
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapterFactory.java
@@ -0,0 +1,142 @@
+/*
+x * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.tools.external.data;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.external.adapter.factory.FileSystemAdapterFactory;
+import edu.uci.ics.asterix.metadata.MetadataManager;
+import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
+import edu.uci.ics.asterix.metadata.entities.Dataset;
+import edu.uci.ics.asterix.metadata.entities.FeedDatasetDetails;
+import edu.uci.ics.asterix.metadata.entities.NodeGroup;
+import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
+import edu.uci.ics.asterix.metadata.feeds.ITypedAdapterFactory;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.AUnorderedListType;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+
+/**
+ * Factory class for creating @see{RateControllerFileSystemBasedAdapter} The
+ * adapter simulates a feed from the contents of a source file. The file can be
+ * on the local file system or on HDFS. The feed ends when the content of the
+ * source file has been ingested.
+ */
+public class TwitterFirehoseFeedAdapterFactory extends FileSystemAdapterFactory implements ITypedAdapterFactory {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ private static final String KEY_DATAVERSE_DATASET = "dataverse-dataset";
+
+ private static final ARecordType outputType = initOutputType();
+
+ @Override
+ public String getName() {
+ return "twitter_firehose_feed";
+ }
+
+ @Override
+ public AdapterType getAdapterType() {
+ return AdapterType.TYPED;
+ }
+
+ @Override
+ public SupportedOperation getSupportedOperations() {
+ return SupportedOperation.READ;
+ }
+
+ @Override
+ public void configure(Map<String, Object> configuration) throws Exception {
+ this.configuration = configuration;
+ configuration.put(KEY_FORMAT, FORMAT_ADM);
+ this.configureFormat(initOutputType());
+ }
+
+ @Override
+ public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
+ String dvds = (String) configuration.get(KEY_DATAVERSE_DATASET);
+ String[] components = dvds.split(":");
+ String dataverse = components[0];
+ String dataset = components[1];
+ MetadataTransactionContext ctx = null;
+ NodeGroup ng = null;
+ try {
+ ctx = MetadataManager.INSTANCE.beginTransaction();
+ Dataset ds = MetadataManager.INSTANCE.getDataset(ctx, dataverse, dataset);
+ String nodegroupName = ((FeedDatasetDetails) ds.getDatasetDetails()).getNodeGroupName();
+ ng = MetadataManager.INSTANCE.getNodegroup(ctx, nodegroupName);
+ MetadataManager.INSTANCE.commitTransaction(ctx);
+ } catch (Exception e) {
+ MetadataManager.INSTANCE.abortTransaction(ctx);
+ throw e;
+ }
+ List<String> storageNodes = ng.getNodeNames();
+ Set<String> nodes = AsterixAppContextInfo.getInstance().getMetadataProperties().getNodeNames();
+ String ingestionLocation = null;
+ if (nodes.size() > storageNodes.size()) {
+ nodes.removeAll(storageNodes);
+ }
+ String[] nodesArray = nodes.toArray(new String[] {});
+ Random r = new Random();
+ ingestionLocation = nodesArray[r.nextInt(nodes.size())];
+ return new AlgebricksAbsolutePartitionConstraint(new String[] { ingestionLocation });
+ }
+
+ @Override
+ public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx) throws Exception {
+ return new TwitterFirehoseFeedAdapter(configuration, parserFactory, outputType, ctx);
+ }
+
+ @Override
+ public ARecordType getAdapterOutputType() {
+ return outputType;
+ }
+
+ private static ARecordType initOutputType() {
+ ARecordType outputType = null;
+ try {
+ String[] userFieldNames = new String[] { "screen-name", "lang", "friends_count", "statuses_count", "name",
+ "followers_count" };
+
+ IAType[] userFieldTypes = new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.AINT32,
+ BuiltinType.AINT32, BuiltinType.ASTRING, BuiltinType.AINT32 };
+ ARecordType userRecordType = new ARecordType("TwitterUserType", userFieldNames, userFieldTypes, false);
+
+ String[] fieldNames = new String[] { "tweetid", "user", "sender-location", "send-time", "referred-topics",
+ "message-text" };
+
+ AUnorderedListType unorderedListType = new AUnorderedListType(BuiltinType.ASTRING, "referred-topics");
+ IAType[] fieldTypes = new IAType[] { BuiltinType.ASTRING, userRecordType, BuiltinType.APOINT,
+ BuiltinType.ADATETIME, unorderedListType, BuiltinType.ASTRING };
+ outputType = new ARecordType("TweetMessageType", fieldNames, fieldTypes, false);
+
+ } catch (AsterixException e) {
+ throw new IllegalStateException("Unable to initialize output type");
+ }
+ return outputType;
+ }
+}
\ No newline at end of file