Add user-stream for Twitter Adaptor
1. Add user-stream option for Twitter Adaptor
2. Refactor part of TwitterRecordReaderFactory
3. To create a user-stream feed, using following ddl:
create feed TwitterFeed using twitter_user_stream(
("format"="twitter-status"),
("type-name"="Tweet"),
...
// rest is same as push feed
Change-Id: I99cdd4cb667306d378317616f9811dfce3e6d838
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1272
Reviewed-by: Steven Jacobs <sjaco002@ucr.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java
index 9ead8a9..ffffbd7 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java
@@ -26,6 +26,8 @@
import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
import org.apache.asterix.external.input.record.GenericRecord;
import org.apache.asterix.external.util.FeedLogManager;
+import org.apache.asterix.external.util.TwitterUtil;
+import twitter4j.DirectMessage;
import twitter4j.FilterQuery;
import twitter4j.StallWarning;
import twitter4j.Status;
@@ -33,6 +35,9 @@
import twitter4j.StatusListener;
import twitter4j.TwitterObjectFactory;
import twitter4j.TwitterStream;
+import twitter4j.User;
+import twitter4j.UserList;
+import twitter4j.UserStreamListener;
public class TwitterPushRecordReader implements IRecordReader<String> {
private LinkedBlockingQueue<String> inputQ;
@@ -40,20 +45,32 @@
private GenericRecord<String> record;
private boolean closed = false;
- public TwitterPushRecordReader(TwitterStream twitterStream, FilterQuery query) {
- record = new GenericRecord<>();
- inputQ = new LinkedBlockingQueue<>();
- this.twitterStream = twitterStream;//TwitterUtil.getTwitterStream(configuration);
- this.twitterStream.addListener(new TweetListener(inputQ));
+ public TwitterPushRecordReader(TwitterStream twitterStream, TwitterUtil.TweetListener tweetListener,
+ FilterQuery query) {
+ init(twitterStream);
+ tweetListener.setInputQ(inputQ);
+ this.twitterStream.addListener(tweetListener);
this.twitterStream.filter(query);
}
- public TwitterPushRecordReader(TwitterStream twitterStream) {
+ public TwitterPushRecordReader(TwitterStream twitterStream, TwitterUtil.TweetListener tweetListener) {
+ init(twitterStream);
+ tweetListener.setInputQ(inputQ);
+ this.twitterStream.addListener(tweetListener);
+ twitterStream.sample();
+ }
+
+ public TwitterPushRecordReader(TwitterStream twitterStream, TwitterUtil.UserTweetsListener tweetListener) {
+ init(twitterStream);
+ tweetListener.setInputQ(inputQ);
+ this.twitterStream.addListener(tweetListener);
+ twitterStream.user();
+ }
+
+ private void init(TwitterStream twitterStream) {
record = new GenericRecord<>();
inputQ = new LinkedBlockingQueue<>();
- this.twitterStream = twitterStream;//
- this.twitterStream.addListener(new TweetListener(inputQ));
- twitterStream.sample();
+ this.twitterStream = twitterStream;
}
@Override
@@ -91,46 +108,6 @@
return true;
}
- private class TweetListener implements StatusListener {
-
- private LinkedBlockingQueue<String> inputQ;
-
- public TweetListener(LinkedBlockingQueue<String> inputQ) {
- this.inputQ = inputQ;
- }
-
- @Override
- public void onStatus(Status tweet) {
- String jsonTweet = TwitterObjectFactory.getRawJSON(tweet);
- inputQ.add(jsonTweet);
- }
-
- @Override
- public void onException(Exception arg0) {
- // do nothing
- }
-
- @Override
- public void onDeletionNotice(StatusDeletionNotice arg0) {
- // do nothing
- }
-
- @Override
- public void onScrubGeo(long arg0, long arg1) {
- // do nothing
- }
-
- @Override
- public void onStallWarning(StallWarning arg0) {
- // do nothing
- }
-
- @Override
- public void onTrackLimitationNotice(int arg0) {
- // do nothing
- }
- }
-
@Override
public void setFeedLogManager(FeedLogManager feedLogManager) {
// do nothing
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
index 73d1b39..570155c 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
@@ -46,7 +46,6 @@
private static final int INTAKE_CARDINALITY = 1; // degree of parallelism at intake stage
private Map<String, String> configuration;
- private boolean pull;
private transient AlgebricksAbsolutePartitionConstraint clusterLocations;
@Override
@@ -73,8 +72,8 @@
builder.append(AuthenticationConstants.OAUTH_ACCESS_TOKEN_SECRET);
throw new AsterixException(builder.toString());
}
- if (TwitterRecordReaderFactory.isTwitterPull(configuration)) {
- pull = true;
+
+ if (configuration.get(ExternalDataConstants.KEY_READER).equals(ExternalDataConstants.READER_PULL_TWITTER)) {
if (configuration.get(SearchAPIConstants.QUERY) == null) {
throw new AsterixException(
"parameter " + SearchAPIConstants.QUERY + " not specified as part of adaptor configuration");
@@ -94,20 +93,9 @@
+ DEFAULT_INTERVAL + ")");
}
}
- } else {
- pull = false;
}
}
- public static boolean isTwitterPull(Map<String, String> configuration) {
- String reader = configuration.get(ExternalDataConstants.KEY_READER);
- if (reader.equals(ExternalDataConstants.READER_TWITTER_PULL)
- || reader.equals(ExternalDataConstants.READER_PULL_TWITTER)) {
- return true;
- }
- return false;
- }
-
@Override
public boolean isIndexible() {
return false;
@@ -116,20 +104,34 @@
@Override
public IRecordReader<? extends String> createRecordReader(IHyracksTaskContext ctx, int partition)
throws HyracksDataException {
- if (pull) {
- return new TwitterPullRecordReader(TwitterUtil.getTwitterService(configuration),
- configuration.get(SearchAPIConstants.QUERY),
- Integer.parseInt(configuration.get(SearchAPIConstants.INTERVAL)));
- } else {
- FilterQuery query;
- try {
- query = TwitterUtil.getFilterQuery(configuration);
- return (query == null) ? new TwitterPushRecordReader(TwitterUtil.getTwitterStream(configuration))
- : new TwitterPushRecordReader(TwitterUtil.getTwitterStream(configuration), query);
- } catch (AsterixException e) {
- throw new HyracksDataException(e);
- }
+ IRecordReader<? extends String> recordReader;
+ switch (configuration.get(ExternalDataConstants.KEY_READER)) {
+ case ExternalDataConstants.READER_PULL_TWITTER:
+ recordReader = new TwitterPullRecordReader(TwitterUtil.getTwitterService(configuration),
+ configuration.get(SearchAPIConstants.QUERY),
+ Integer.parseInt(configuration.get(SearchAPIConstants.INTERVAL)));
+ break;
+ case ExternalDataConstants.READER_PUSH_TWITTER:
+ FilterQuery query;
+ try {
+ query = TwitterUtil.getFilterQuery(configuration);
+ recordReader = (query == null)
+ ? new TwitterPushRecordReader(TwitterUtil.getTwitterStream(configuration),
+ TwitterUtil.getTweetListener())
+ : new TwitterPushRecordReader(TwitterUtil.getTwitterStream(configuration),
+ TwitterUtil.getTweetListener(), query);
+ } catch (AsterixException e) {
+ throw new HyracksDataException(e);
+ }
+ break;
+ case ExternalDataConstants.READER_USER_STREAM_TWITTER:
+ recordReader = new TwitterPushRecordReader(TwitterUtil.getTwitterStream(configuration),
+ TwitterUtil.getUserTweetsListener());
+ break;
+ default:
+ throw new HyracksDataException("No Record reader found!");
}
+ return recordReader;
}
@Override
@@ -147,4 +149,5 @@
}
return true;
}
+
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
index 7ab6430..73a5302 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
@@ -97,6 +97,7 @@
case ExternalDataConstants.READER_TWITTER_PUSH:
case ExternalDataConstants.READER_PUSH_TWITTER:
case ExternalDataConstants.READER_PULL_TWITTER:
+ case ExternalDataConstants.READER_USER_STREAM_TWITTER:
return new TwitterRecordReaderFactory();
case ExternalDataConstants.ALIAS_SOCKET_ADAPTER:
case ExternalDataConstants.SOCKET:
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index a270e88..881c498 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -127,6 +127,7 @@
public static final String READER_PUSH_TWITTER = "push_twitter";
public static final String READER_TWITTER_PULL = "twitter_pull";
public static final String READER_PULL_TWITTER = "pull_twitter";
+ public static final String READER_USER_STREAM_TWITTER = "twitter_user_stream";
public static final String CLUSTER_LOCATIONS = "cluster-locations";
public static final String SCHEDULER = "hdfs-scheduler";
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/TwitterUtil.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/TwitterUtil.java
index d8f375b..bd8e52d 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/TwitterUtil.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/TwitterUtil.java
@@ -22,6 +22,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.regex.Matcher;
@@ -29,11 +30,20 @@
import org.apache.asterix.common.exceptions.AsterixException;
+import twitter4j.DirectMessage;
import twitter4j.FilterQuery;
+import twitter4j.StallWarning;
+import twitter4j.Status;
+import twitter4j.StatusDeletionNotice;
+import twitter4j.StatusListener;
import twitter4j.Twitter;
import twitter4j.TwitterFactory;
+import twitter4j.TwitterObjectFactory;
import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;
+import twitter4j.User;
+import twitter4j.UserList;
+import twitter4j.UserStreamListener;
import twitter4j.conf.ConfigurationBuilder;
public class TwitterUtil {
@@ -291,4 +301,187 @@
public static final String INTERVAL = "interval";
}
+ public static UserTweetsListener getUserTweetsListener() {
+ return new UserTweetsListener();
+ }
+
+ public static TweetListener getTweetListener() {
+ return new TweetListener();
+ }
+
+ public static class UserTweetsListener implements UserStreamListener {
+
+ private LinkedBlockingQueue<String> inputQ;
+
+ public void setInputQ(LinkedBlockingQueue<String> inputQ) {
+ this.inputQ = inputQ;
+ }
+
+ @Override
+ public void onDeletionNotice(long l, long l1) {
+ //do nothing
+ }
+
+ @Override
+ public void onFriendList(long[] longs) {
+ //do nothing
+ }
+
+ @Override
+ public void onFavorite(User user, User user1, Status status) {
+ //do nothing
+ }
+
+ @Override
+ public void onUnfavorite(User user, User user1, Status status) {
+ //do nothing
+ }
+
+ @Override
+ public void onFollow(User user, User user1) {
+ //do nothing
+ }
+
+ @Override
+ public void onUnfollow(User user, User user1) {
+ //do nothing
+ }
+
+ @Override
+ public void onDirectMessage(DirectMessage directMessage) {
+ //do nothing
+ }
+
+ @Override
+ public void onUserListMemberAddition(User user, User user1, UserList userList) {
+ //do nothing
+ }
+
+ @Override
+ public void onUserListMemberDeletion(User user, User user1, UserList userList) {
+ //do nothing
+ }
+
+ @Override
+ public void onUserListSubscription(User user, User user1, UserList userList) {
+ //do nothing
+ }
+
+ @Override
+ public void onUserListUnsubscription(User user, User user1, UserList userList) {
+ //do nothing
+ }
+
+ @Override
+ public void onUserListCreation(User user, UserList userList) {
+ //do nothing
+ }
+
+ @Override
+ public void onUserListUpdate(User user, UserList userList) {
+ //do nothing
+ }
+
+ @Override
+ public void onUserListDeletion(User user, UserList userList) {
+ //do nothing
+ }
+
+ @Override
+ public void onUserProfileUpdate(User user) {
+ //do nothing
+ }
+
+ @Override
+ public void onUserSuspension(long l) {
+ //do nothing
+ }
+
+ @Override
+ public void onUserDeletion(long l) {
+ //do nothing
+ }
+
+ @Override
+ public void onBlock(User user, User user1) {
+ //do nothing
+ }
+
+ @Override
+ public void onUnblock(User user, User user1) {
+ //do nothing
+ }
+
+ @Override
+ public void onStatus(Status status) {
+ String jsonTweet = TwitterObjectFactory.getRawJSON(status);
+ inputQ.add(jsonTweet);
+ }
+
+ @Override
+ public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) {
+ //do nothing
+ }
+
+ @Override
+ public void onTrackLimitationNotice(int i) {
+ //do nothing
+ }
+
+ @Override
+ public void onScrubGeo(long l, long l1) {
+ //do nothing
+ }
+
+ @Override
+ public void onStallWarning(StallWarning stallWarning) {
+ //do nothing
+ }
+
+ @Override
+ public void onException(Exception e) {
+ //do nothing
+ }
+ }
+
+ public static class TweetListener implements StatusListener {
+
+ private LinkedBlockingQueue<String> inputQ;
+
+ public void setInputQ(LinkedBlockingQueue<String> inputQ) {
+ this.inputQ = inputQ;
+ }
+
+ @Override
+ public void onStatus(Status tweet) {
+ String jsonTweet = TwitterObjectFactory.getRawJSON(tweet);
+ inputQ.add(jsonTweet);
+ }
+
+ @Override
+ public void onException(Exception arg0) {
+ // do nothing
+ }
+
+ @Override
+ public void onDeletionNotice(StatusDeletionNotice arg0) {
+ // do nothing
+ }
+
+ @Override
+ public void onScrubGeo(long arg0, long arg1) {
+ // do nothing
+ }
+
+ @Override
+ public void onStallWarning(StallWarning arg0) {
+ // do nothing
+ }
+
+ @Override
+ public void onTrackLimitationNotice(int arg0) {
+ // do nothing
+ }
+ }
+
}