ASTERIXDB-1134: Updated the "push_twitter" adapter to allow better filtering of tweet streams (merged with current master)

Change-Id: I054da2c6b809c68d665e81829567bd7224fbaaf2
Reviewed-on: https://asterix-gerrit.ics.uci.edu/441
Reviewed-by: Ian Maxon <imaxon@apache.org>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterix-external-data/pom.xml b/asterix-external-data/pom.xml
index 19387bc..86e466e 100644
--- a/asterix-external-data/pom.xml
+++ b/asterix-external-data/pom.xml
@@ -135,12 +135,12 @@
 		<dependency>
 			<groupId>org.twitter4j</groupId>
 			<artifactId>twitter4j-core</artifactId>
-			<version>[3.0,)</version>
+			<version>[4.0,)</version>
 		</dependency>
 		<dependency>
             <groupId>org.twitter4j</groupId>
             <artifactId>twitter4j-stream</artifactId>
-            <version>4.0.2</version>
+            <version>[4.0,)</version>
         </dependency>
 		<dependency>
 			<groupId>org.apache.hadoop</groupId>
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/PushBasedTwitterFeedClient.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/PushBasedTwitterFeedClient.java
index fb8738c..38e8a9e 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/PushBasedTwitterFeedClient.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/PushBasedTwitterFeedClient.java
@@ -18,9 +18,12 @@
  */
 package org.apache.asterix.external.dataset.adapter;
 
-import java.util.Map;
-import java.util.concurrent.LinkedBlockingQueue;
-
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.dataflow.data.nontagged.serde.ARecordSerializerDeserializer;
+import org.apache.asterix.external.util.TweetProcessor;
+import org.apache.asterix.external.util.TwitterUtil;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 import twitter4j.FilterQuery;
 import twitter4j.Query;
 import twitter4j.StallWarning;
@@ -28,13 +31,8 @@
 import twitter4j.StatusDeletionNotice;
 import twitter4j.StatusListener;
 import twitter4j.TwitterStream;
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.dataflow.data.nontagged.serde.ARecordSerializerDeserializer;
-import org.apache.asterix.external.util.TweetProcessor;
-import org.apache.asterix.external.util.TwitterUtil;
-import org.apache.asterix.external.util.TwitterUtil.SearchAPIConstants;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
+
+import java.util.concurrent.LinkedBlockingQueue;
 
 /**
  * An implementation of @see {PullBasedFeedClient} for the Twitter service. The
@@ -55,7 +53,6 @@
         this.tweetProcessor = new TweetProcessor(recordType);
         this.recordSerDe = new ARecordSerializerDeserializer(recordType);
         this.mutableRecord = tweetProcessor.getMutableRecord();
-        this.initialize(adapter.getConfiguration());
         this.inputQ = new LinkedBlockingQueue<Status>();
         TwitterStream twitterStream = TwitterUtil.getTwitterStream(adapter.getConfiguration());
         twitterStream.addListener(new TweetListener(inputQ));
@@ -113,10 +110,4 @@
         return InflowState.DATA_AVAILABLE;
     }
 
-    private void initialize(Map<String, String> params) {
-        this.keywords = (String) params.get(SearchAPIConstants.QUERY);
-        this.query = new Query(keywords);
-        this.query.setCount(100);
-    }
-
 }
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/TwitterUtil.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/TwitterUtil.java
index b4be631..4fb602b 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/util/TwitterUtil.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/TwitterUtil.java
@@ -18,68 +18,159 @@
  */
 package org.apache.asterix.external.util;
 
-import java.io.InputStream;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-import java.util.logging.Logger;
-import java.util.logging.Level;
-
+import org.apache.asterix.common.exceptions.AsterixException;
 import twitter4j.FilterQuery;
 import twitter4j.Twitter;
 import twitter4j.TwitterFactory;
 import twitter4j.TwitterStream;
 import twitter4j.TwitterStreamFactory;
 import twitter4j.conf.ConfigurationBuilder;
-import org.apache.asterix.common.exceptions.AsterixException;
+
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 public class TwitterUtil {
 
     private static Logger LOGGER = Logger.getLogger(TwitterUtil.class.getName());
 
     public static class ConfigurationConstants {
-        public static final String KEY_LOCATION = "location";
+        public static final String KEY_LOCATIONS = "locations"; // locations to track
+        public static final String KEY_LOCATION = "location"; // location to track
         public static final String LOCATION_US = "US";
+        public static final String LOCATION_EU = "Europe";
+        public static final String LANGUAGES = "languages"; // languages to track
+        public static final String TRACK = "keywords"; // terms to track
+        public static final String  FILTER_LEVEL = "filter-level";
     }
 
     public static class GeoConstants {
-        public static Map<String, double[][]> boundingBoxes = initializeBoundingBoxes();
         public static final double[][] US = new double[][] { { -124.848974, 24.396308 }, { -66.885444, 49.384358 } };
+        public static final double[][] EU = new double[][]{{-29.7,36.7},{79.2,72.0}};
+        public static Map<String, double[][]> boundingBoxes = initializeBoundingBoxes();
     }
 
     private static Map<String, double[][]> initializeBoundingBoxes() {
         Map<String, double[][]> boundingBoxes = new HashMap<String, double[][]>();
-        boundingBoxes.put(ConfigurationConstants.LOCATION_US, new double[][] { { -124.848974, 24.396308 },
-                { -66.885444, 49.384358 } });
+        boundingBoxes.put(ConfigurationConstants.LOCATION_US, GeoConstants.US);
+        boundingBoxes.put(ConfigurationConstants.LOCATION_EU, GeoConstants.EU);
         return boundingBoxes;
     }
 
-    public static FilterQuery getFilterQuery(Map<String, String> configuration) throws AsterixException {
-        String locationValue = configuration.get(ConfigurationConstants.KEY_LOCATION);
+    /**
+     * Gets more than one bounding box from a sequences of coordinates
+     * (following Twitter formats) + predefined location names, as US and EU.
+     *
+     * E.g., for EU and US, we would use -29.7, 79.2, 36.7, 72.0; -124.848974,
+     *      -66.885444, 24.396308, 49.384358.
+     *
+     * @param locationValue
+     *          String value of the location coordinates or names (comma-separated)
+     * @return
+     * @throws AsterixException
+     */
+    public static double[][] getBoundingBoxes(String locationValue) throws AsterixException {
         double[][] locations = null;
-        if (locationValue != null) {
-            if (locationValue.contains(",")) {
-                String[] coordinatesString = locationValue.trim().split(",");
-                locations = new double[2][2];
-                for (int i = 0; i < 2; i++) {
-                    for (int j = 0; j < 2; j++) {
-                        try {
-                            locations[i][j] = Double.parseDouble(coordinatesString[2 * i + j]);
-                        } catch (NumberFormatException ne) {
-                            throw new AsterixException("Incorrect coordinate value " + coordinatesString[2 * i + j]);
+
+        String coordRegex = "^((((\\-?\\d+\\.\\d+),\\s*){3}(\\-?\\d+\\.\\d+)|\\w+);\\s*)*(((\\-?\\d+\\.\\d+),\\s*){3}(\\-?\\d+\\.\\d+)|\\w+)$";
+        Pattern p = Pattern.compile(coordRegex);
+        Matcher m = p.matcher(locationValue);
+
+        if (m.matches()) {
+            String[] locationStrings = locationValue.trim().split(";\\s*");
+            locations = new double[locationStrings.length*2][2];
+
+            for(int i=0; i<locationStrings.length; i++) {
+                if (locationStrings[i].contains(",")) {
+                    String[] coordinatesString = locationStrings[i].split(",");
+                    for(int k=0; k < 2; k++) {
+                        for (int l = 0; l < 2; l++) {
+                            try {
+                                locations[2 * i + k][l] = Double.parseDouble(coordinatesString[2 * k + l]);
+                            } catch (NumberFormatException ne) {
+                                throw new AsterixException("Incorrect coordinate value " + coordinatesString[2 * k + l]);
+                            }
+                        }
+                    }
+                } else if (GeoConstants.boundingBoxes.containsKey(locationStrings[i])) {
+                    // Only add known locations
+                    double loc[][] = GeoConstants.boundingBoxes.get(locationStrings[i]);
+                    for(int k=0; k < 2; k++) {
+                        for (int l = 0; l < 2; l++) {
+                            locations[2 * i + k][l] = loc[k][l];
                         }
                     }
                 }
-            } else {
-                locations = GeoConstants.boundingBoxes.get(locationValue);
-            }
-            if (locations != null) {
-                FilterQuery filterQuery = new FilterQuery();
-                filterQuery.locations(locations);
-                return filterQuery;
             }
         }
-        return null;
+        return locations;
+    }
+
+    public static FilterQuery getFilterQuery(Map<String, String> configuration) throws AsterixException {
+        String locationValue = null;
+
+        // For backward compatibility
+        if (configuration.containsKey(ConfigurationConstants.KEY_LOCATIONS)) {
+            locationValue = configuration.get(ConfigurationConstants.KEY_LOCATIONS);
+        } else {
+            locationValue = configuration.get(ConfigurationConstants.KEY_LOCATION);
+        }
+        String langValue = configuration.get(ConfigurationConstants.LANGUAGES);
+        String trackValue = configuration.get(ConfigurationConstants.TRACK);
+
+        FilterQuery filterQuery = null;
+
+        // Terms to track
+        if (trackValue != null) {
+            String keywords[] = null;
+            filterQuery = new FilterQuery();
+            if (trackValue.contains(",")) {
+                keywords = trackValue.trim().split(",\\s*");
+            } else {
+                keywords = new String[] {trackValue};
+            }
+            filterQuery = filterQuery.track(keywords);
+        }
+
+        // Language filtering parameter
+        if (langValue != null) {
+            if (filterQuery == null) {
+                filterQuery = new FilterQuery();
+            }
+            String languages[];
+            if (langValue.contains(",")) {
+                languages = langValue.trim().split(",\\s*");
+            } else {
+                languages = new String[] {langValue};
+            }
+            filterQuery = filterQuery.language(languages);
+        }
+
+        // Location filtering parameter
+        if (locationValue != null) {
+            double[][] locations = getBoundingBoxes(locationValue);
+            if (locations != null) {
+                if (filterQuery == null) {
+                    filterQuery = new FilterQuery();
+                }
+                filterQuery = filterQuery.locations(locations);
+            }
+        }
+
+        // Filtering level: none, low or medium (defaul=none)
+        if(filterQuery != null) {
+            String filterValue = configuration.get(ConfigurationConstants.FILTER_LEVEL);
+            if (filterValue!=null) {
+                filterQuery = filterQuery.filterLevel(filterValue);
+            }
+        }
+
+        return filterQuery;
 
     }
 
diff --git a/asterix-runtime/pom.xml b/asterix-runtime/pom.xml
index d53ab4b..a5dedb0 100644
--- a/asterix-runtime/pom.xml
+++ b/asterix-runtime/pom.xml
@@ -159,7 +159,12 @@
 		<dependency>
 			<groupId>org.twitter4j</groupId>
 			<artifactId>twitter4j-core</artifactId>
-			<version>2.2.3</version>
+			<version>[4.0,)</version>
+		</dependency>
+		<dependency>
+			<groupId>org.twitter4j</groupId>
+			<artifactId>twitter4j-stream</artifactId>
+			<version>[4.0,)</version>
 		</dependency>
 		<dependency>
 			<groupId>org.apache.hadoop</groupId>