Add user-stream for Twitter Adaptor

1. Add user-stream option for Twitter Adaptor
2. Refactor part of TwitterRecordReaderFactory
3. To create a user-stream feed, using following ddl:
  create feed TwitterFeed using twitter_user_stream(
      ("format"="twitter-status"),
      ("type-name"="Tweet"),
      ...
   // rest is same as push feed

Change-Id: I99cdd4cb667306d378317616f9811dfce3e6d838
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1272
Reviewed-by: Steven Jacobs <sjaco002@ucr.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java
index 9ead8a9..ffffbd7 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java
@@ -26,6 +26,8 @@
 import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
 import org.apache.asterix.external.input.record.GenericRecord;
 import org.apache.asterix.external.util.FeedLogManager;
+import org.apache.asterix.external.util.TwitterUtil;
+import twitter4j.DirectMessage;
 import twitter4j.FilterQuery;
 import twitter4j.StallWarning;
 import twitter4j.Status;
@@ -33,6 +35,9 @@
 import twitter4j.StatusListener;
 import twitter4j.TwitterObjectFactory;
 import twitter4j.TwitterStream;
+import twitter4j.User;
+import twitter4j.UserList;
+import twitter4j.UserStreamListener;
 
 public class TwitterPushRecordReader implements IRecordReader<String> {
     private LinkedBlockingQueue<String> inputQ;
@@ -40,20 +45,32 @@
     private GenericRecord<String> record;
     private boolean closed = false;
 
-    public TwitterPushRecordReader(TwitterStream twitterStream, FilterQuery query) {
-        record = new GenericRecord<>();
-        inputQ = new LinkedBlockingQueue<>();
-        this.twitterStream = twitterStream;//TwitterUtil.getTwitterStream(configuration);
-        this.twitterStream.addListener(new TweetListener(inputQ));
+    public TwitterPushRecordReader(TwitterStream twitterStream, TwitterUtil.TweetListener tweetListener,
+            FilterQuery query) {
+        init(twitterStream);
+        tweetListener.setInputQ(inputQ);
+        this.twitterStream.addListener(tweetListener);
         this.twitterStream.filter(query);
     }
 
-    public TwitterPushRecordReader(TwitterStream twitterStream) {
+    public TwitterPushRecordReader(TwitterStream twitterStream, TwitterUtil.TweetListener tweetListener) {
+        init(twitterStream);
+        tweetListener.setInputQ(inputQ);
+        this.twitterStream.addListener(tweetListener);
+        twitterStream.sample();
+    }
+
+    public TwitterPushRecordReader(TwitterStream twitterStream, TwitterUtil.UserTweetsListener tweetListener) {
+        init(twitterStream);
+        tweetListener.setInputQ(inputQ);
+        this.twitterStream.addListener(tweetListener);
+        twitterStream.user();
+    }
+
+    private void init(TwitterStream twitterStream) {
         record = new GenericRecord<>();
         inputQ = new LinkedBlockingQueue<>();
-        this.twitterStream = twitterStream;//
-        this.twitterStream.addListener(new TweetListener(inputQ));
-        twitterStream.sample();
+        this.twitterStream = twitterStream;
     }
 
     @Override
@@ -91,46 +108,6 @@
         return true;
     }
 
-    private class TweetListener implements StatusListener {
-
-        private LinkedBlockingQueue<String> inputQ;
-
-        public TweetListener(LinkedBlockingQueue<String> inputQ) {
-            this.inputQ = inputQ;
-        }
-
-        @Override
-        public void onStatus(Status tweet) {
-            String jsonTweet = TwitterObjectFactory.getRawJSON(tweet);
-            inputQ.add(jsonTweet);
-        }
-
-        @Override
-        public void onException(Exception arg0) {
-            // do nothing
-        }
-
-        @Override
-        public void onDeletionNotice(StatusDeletionNotice arg0) {
-            // do nothing
-        }
-
-        @Override
-        public void onScrubGeo(long arg0, long arg1) {
-            // do nothing
-        }
-
-        @Override
-        public void onStallWarning(StallWarning arg0) {
-            // do nothing
-        }
-
-        @Override
-        public void onTrackLimitationNotice(int arg0) {
-            // do nothing
-        }
-    }
-
     @Override
     public void setFeedLogManager(FeedLogManager feedLogManager) {
         // do nothing
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
index 73d1b39..570155c 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
@@ -46,7 +46,6 @@
     private static final int INTAKE_CARDINALITY = 1; // degree of parallelism at intake stage
 
     private Map<String, String> configuration;
-    private boolean pull;
     private transient AlgebricksAbsolutePartitionConstraint clusterLocations;
 
     @Override
@@ -73,8 +72,8 @@
             builder.append(AuthenticationConstants.OAUTH_ACCESS_TOKEN_SECRET);
             throw new AsterixException(builder.toString());
         }
-        if (TwitterRecordReaderFactory.isTwitterPull(configuration)) {
-            pull = true;
+
+        if (configuration.get(ExternalDataConstants.KEY_READER).equals(ExternalDataConstants.READER_PULL_TWITTER)) {
             if (configuration.get(SearchAPIConstants.QUERY) == null) {
                 throw new AsterixException(
                         "parameter " + SearchAPIConstants.QUERY + " not specified as part of adaptor configuration");
@@ -94,20 +93,9 @@
                             + DEFAULT_INTERVAL + ")");
                 }
             }
-        } else {
-            pull = false;
         }
     }
 
-    public static boolean isTwitterPull(Map<String, String> configuration) {
-        String reader = configuration.get(ExternalDataConstants.KEY_READER);
-        if (reader.equals(ExternalDataConstants.READER_TWITTER_PULL)
-                || reader.equals(ExternalDataConstants.READER_PULL_TWITTER)) {
-            return true;
-        }
-        return false;
-    }
-
     @Override
     public boolean isIndexible() {
         return false;
@@ -116,20 +104,34 @@
     @Override
     public IRecordReader<? extends String> createRecordReader(IHyracksTaskContext ctx, int partition)
             throws HyracksDataException {
-        if (pull) {
-            return new TwitterPullRecordReader(TwitterUtil.getTwitterService(configuration),
-                    configuration.get(SearchAPIConstants.QUERY),
-                    Integer.parseInt(configuration.get(SearchAPIConstants.INTERVAL)));
-        } else {
-            FilterQuery query;
-            try {
-                query = TwitterUtil.getFilterQuery(configuration);
-                return (query == null) ? new TwitterPushRecordReader(TwitterUtil.getTwitterStream(configuration))
-                        : new TwitterPushRecordReader(TwitterUtil.getTwitterStream(configuration), query);
-            } catch (AsterixException e) {
-                throw new HyracksDataException(e);
-            }
+        IRecordReader<? extends String> recordReader;
+        switch (configuration.get(ExternalDataConstants.KEY_READER)) {
+            case ExternalDataConstants.READER_PULL_TWITTER:
+                recordReader = new TwitterPullRecordReader(TwitterUtil.getTwitterService(configuration),
+                        configuration.get(SearchAPIConstants.QUERY),
+                        Integer.parseInt(configuration.get(SearchAPIConstants.INTERVAL)));
+                break;
+            case ExternalDataConstants.READER_PUSH_TWITTER:
+                FilterQuery query;
+                try {
+                    query = TwitterUtil.getFilterQuery(configuration);
+                    recordReader = (query == null)
+                            ? new TwitterPushRecordReader(TwitterUtil.getTwitterStream(configuration),
+                                    TwitterUtil.getTweetListener())
+                            : new TwitterPushRecordReader(TwitterUtil.getTwitterStream(configuration),
+                                    TwitterUtil.getTweetListener(), query);
+                } catch (AsterixException e) {
+                    throw new HyracksDataException(e);
+                }
+                break;
+            case ExternalDataConstants.READER_USER_STREAM_TWITTER:
+                recordReader = new TwitterPushRecordReader(TwitterUtil.getTwitterStream(configuration),
+                        TwitterUtil.getUserTweetsListener());
+                break;
+            default:
+                throw new HyracksDataException("No Record reader found!");
         }
+        return recordReader;
     }
 
     @Override
@@ -147,4 +149,5 @@
         }
         return true;
     }
+
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
index 7ab6430..73a5302 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
@@ -97,6 +97,7 @@
             case ExternalDataConstants.READER_TWITTER_PUSH:
             case ExternalDataConstants.READER_PUSH_TWITTER:
             case ExternalDataConstants.READER_PULL_TWITTER:
+            case ExternalDataConstants.READER_USER_STREAM_TWITTER:
                 return new TwitterRecordReaderFactory();
             case ExternalDataConstants.ALIAS_SOCKET_ADAPTER:
             case ExternalDataConstants.SOCKET:
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index a270e88..881c498 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -127,6 +127,7 @@
     public static final String READER_PUSH_TWITTER = "push_twitter";
     public static final String READER_TWITTER_PULL = "twitter_pull";
     public static final String READER_PULL_TWITTER = "pull_twitter";
+    public static final String READER_USER_STREAM_TWITTER = "twitter_user_stream";
 
     public static final String CLUSTER_LOCATIONS = "cluster-locations";
     public static final String SCHEDULER = "hdfs-scheduler";
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/TwitterUtil.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/TwitterUtil.java
index d8f375b..bd8e52d 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/TwitterUtil.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/TwitterUtil.java
@@ -22,6 +22,7 @@
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 import java.util.regex.Matcher;
@@ -29,11 +30,20 @@
 
 import org.apache.asterix.common.exceptions.AsterixException;
 
+import twitter4j.DirectMessage;
 import twitter4j.FilterQuery;
+import twitter4j.StallWarning;
+import twitter4j.Status;
+import twitter4j.StatusDeletionNotice;
+import twitter4j.StatusListener;
 import twitter4j.Twitter;
 import twitter4j.TwitterFactory;
+import twitter4j.TwitterObjectFactory;
 import twitter4j.TwitterStream;
 import twitter4j.TwitterStreamFactory;
+import twitter4j.User;
+import twitter4j.UserList;
+import twitter4j.UserStreamListener;
 import twitter4j.conf.ConfigurationBuilder;
 
 public class TwitterUtil {
@@ -291,4 +301,187 @@
         public static final String INTERVAL = "interval";
     }
 
+    public static UserTweetsListener getUserTweetsListener() {
+        return new UserTweetsListener();
+    }
+
+    public static TweetListener getTweetListener() {
+        return new TweetListener();
+    }
+
+    public static class UserTweetsListener implements UserStreamListener {
+
+        private LinkedBlockingQueue<String> inputQ;
+
+        public void setInputQ(LinkedBlockingQueue<String> inputQ) {
+            this.inputQ = inputQ;
+        }
+
+        @Override
+        public void onDeletionNotice(long l, long l1) {
+            //do nothing
+        }
+
+        @Override
+        public void onFriendList(long[] longs) {
+            //do nothing
+        }
+
+        @Override
+        public void onFavorite(User user, User user1, Status status) {
+            //do nothing
+        }
+
+        @Override
+        public void onUnfavorite(User user, User user1, Status status) {
+            //do nothing
+        }
+
+        @Override
+        public void onFollow(User user, User user1) {
+            //do nothing
+        }
+
+        @Override
+        public void onUnfollow(User user, User user1) {
+            //do nothing
+        }
+
+        @Override
+        public void onDirectMessage(DirectMessage directMessage) {
+            //do nothing
+        }
+
+        @Override
+        public void onUserListMemberAddition(User user, User user1, UserList userList) {
+            //do nothing
+        }
+
+        @Override
+        public void onUserListMemberDeletion(User user, User user1, UserList userList) {
+            //do nothing
+        }
+
+        @Override
+        public void onUserListSubscription(User user, User user1, UserList userList) {
+            //do nothing
+        }
+
+        @Override
+        public void onUserListUnsubscription(User user, User user1, UserList userList) {
+            //do nothing
+        }
+
+        @Override
+        public void onUserListCreation(User user, UserList userList) {
+            //do nothing
+        }
+
+        @Override
+        public void onUserListUpdate(User user, UserList userList) {
+            //do nothing
+        }
+
+        @Override
+        public void onUserListDeletion(User user, UserList userList) {
+            //do nothing
+        }
+
+        @Override
+        public void onUserProfileUpdate(User user) {
+            //do nothing
+        }
+
+        @Override
+        public void onUserSuspension(long l) {
+            //do nothing
+        }
+
+        @Override
+        public void onUserDeletion(long l) {
+            //do nothing
+        }
+
+        @Override
+        public void onBlock(User user, User user1) {
+            //do nothing
+        }
+
+        @Override
+        public void onUnblock(User user, User user1) {
+            //do nothing
+        }
+
+        @Override
+        public void onStatus(Status status) {
+            String jsonTweet = TwitterObjectFactory.getRawJSON(status);
+            inputQ.add(jsonTweet);
+        }
+
+        @Override
+        public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) {
+            //do nothing
+        }
+
+        @Override
+        public void onTrackLimitationNotice(int i) {
+            //do nothing
+        }
+
+        @Override
+        public void onScrubGeo(long l, long l1) {
+            //do nothing
+        }
+
+        @Override
+        public void onStallWarning(StallWarning stallWarning) {
+            //do nothing
+        }
+
+        @Override
+        public void onException(Exception e) {
+            //do nothing
+        }
+    }
+
+    public static class TweetListener implements StatusListener {
+
+        private LinkedBlockingQueue<String> inputQ;
+
+        public void setInputQ(LinkedBlockingQueue<String> inputQ) {
+            this.inputQ = inputQ;
+        }
+
+        @Override
+        public void onStatus(Status tweet) {
+            String jsonTweet = TwitterObjectFactory.getRawJSON(tweet);
+            inputQ.add(jsonTweet);
+        }
+
+        @Override
+        public void onException(Exception arg0) {
+            // do nothing
+        }
+
+        @Override
+        public void onDeletionNotice(StatusDeletionNotice arg0) {
+            // do nothing
+        }
+
+        @Override
+        public void onScrubGeo(long arg0, long arg1) {
+            // do nothing
+        }
+
+        @Override
+        public void onStallWarning(StallWarning arg0) {
+            // do nothing
+        }
+
+        @Override
+        public void onTrackLimitationNotice(int arg0) {
+            // do nothing
+        }
+    }
+
 }