ASTERIXDB-1134: Updated the "push_twitter" adapter to allow better filtering of tweet streams (merged with current master)
Change-Id: I054da2c6b809c68d665e81829567bd7224fbaaf2
Reviewed-on: https://asterix-gerrit.ics.uci.edu/441
Reviewed-by: Ian Maxon <imaxon@apache.org>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterix-external-data/pom.xml b/asterix-external-data/pom.xml
index 19387bc..86e466e 100644
--- a/asterix-external-data/pom.xml
+++ b/asterix-external-data/pom.xml
@@ -135,12 +135,12 @@
<dependency>
<groupId>org.twitter4j</groupId>
<artifactId>twitter4j-core</artifactId>
- <version>[3.0,)</version>
+ <version>[4.0,)</version>
</dependency>
<dependency>
<groupId>org.twitter4j</groupId>
<artifactId>twitter4j-stream</artifactId>
- <version>4.0.2</version>
+ <version>[4.0,)</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/PushBasedTwitterFeedClient.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/PushBasedTwitterFeedClient.java
index fb8738c..38e8a9e 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/PushBasedTwitterFeedClient.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/PushBasedTwitterFeedClient.java
@@ -18,9 +18,12 @@
*/
package org.apache.asterix.external.dataset.adapter;
-import java.util.Map;
-import java.util.concurrent.LinkedBlockingQueue;
-
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.dataflow.data.nontagged.serde.ARecordSerializerDeserializer;
+import org.apache.asterix.external.util.TweetProcessor;
+import org.apache.asterix.external.util.TwitterUtil;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
import twitter4j.FilterQuery;
import twitter4j.Query;
import twitter4j.StallWarning;
@@ -28,13 +31,8 @@
import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener;
import twitter4j.TwitterStream;
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.dataflow.data.nontagged.serde.ARecordSerializerDeserializer;
-import org.apache.asterix.external.util.TweetProcessor;
-import org.apache.asterix.external.util.TwitterUtil;
-import org.apache.asterix.external.util.TwitterUtil.SearchAPIConstants;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
+
+import java.util.concurrent.LinkedBlockingQueue;
/**
* An implementation of @see {PullBasedFeedClient} for the Twitter service. The
@@ -55,7 +53,6 @@
this.tweetProcessor = new TweetProcessor(recordType);
this.recordSerDe = new ARecordSerializerDeserializer(recordType);
this.mutableRecord = tweetProcessor.getMutableRecord();
- this.initialize(adapter.getConfiguration());
this.inputQ = new LinkedBlockingQueue<Status>();
TwitterStream twitterStream = TwitterUtil.getTwitterStream(adapter.getConfiguration());
twitterStream.addListener(new TweetListener(inputQ));
@@ -113,10 +110,4 @@
return InflowState.DATA_AVAILABLE;
}
- private void initialize(Map<String, String> params) {
- this.keywords = (String) params.get(SearchAPIConstants.QUERY);
- this.query = new Query(keywords);
- this.query.setCount(100);
- }
-
}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/TwitterUtil.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/TwitterUtil.java
index b4be631..4fb602b 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/util/TwitterUtil.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/TwitterUtil.java
@@ -18,68 +18,159 @@
*/
package org.apache.asterix.external.util;
-import java.io.InputStream;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-import java.util.logging.Logger;
-import java.util.logging.Level;
-
+import org.apache.asterix.common.exceptions.AsterixException;
import twitter4j.FilterQuery;
import twitter4j.Twitter;
import twitter4j.TwitterFactory;
import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;
import twitter4j.conf.ConfigurationBuilder;
-import org.apache.asterix.common.exceptions.AsterixException;
+
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
public class TwitterUtil {
private static Logger LOGGER = Logger.getLogger(TwitterUtil.class.getName());
public static class ConfigurationConstants {
- public static final String KEY_LOCATION = "location";
+ public static final String KEY_LOCATIONS = "locations"; // locations to track
+ public static final String KEY_LOCATION = "location"; // location to track
public static final String LOCATION_US = "US";
+ public static final String LOCATION_EU = "Europe";
+ public static final String LANGUAGES = "languages"; // languages to track
+ public static final String TRACK = "keywords"; // terms to track
+ public static final String FILTER_LEVEL = "filter-level";
}
public static class GeoConstants {
- public static Map<String, double[][]> boundingBoxes = initializeBoundingBoxes();
public static final double[][] US = new double[][] { { -124.848974, 24.396308 }, { -66.885444, 49.384358 } };
+ public static final double[][] EU = new double[][]{{-29.7,36.7},{79.2,72.0}};
+ public static Map<String, double[][]> boundingBoxes = initializeBoundingBoxes();
}
private static Map<String, double[][]> initializeBoundingBoxes() {
Map<String, double[][]> boundingBoxes = new HashMap<String, double[][]>();
- boundingBoxes.put(ConfigurationConstants.LOCATION_US, new double[][] { { -124.848974, 24.396308 },
- { -66.885444, 49.384358 } });
+ boundingBoxes.put(ConfigurationConstants.LOCATION_US, GeoConstants.US);
+ boundingBoxes.put(ConfigurationConstants.LOCATION_EU, GeoConstants.EU);
return boundingBoxes;
}
- public static FilterQuery getFilterQuery(Map<String, String> configuration) throws AsterixException {
- String locationValue = configuration.get(ConfigurationConstants.KEY_LOCATION);
+ /**
+ * Gets more than one bounding box from a sequences of coordinates
+ * (following Twitter formats) + predefined location names, as US and EU.
+ *
+ * E.g., for EU and US, we would use -29.7, 79.2, 36.7, 72.0; -124.848974,
+ * -66.885444, 24.396308, 49.384358.
+ *
+ * @param locationValue
+ * String value of the location coordinates or names (comma-separated)
+ * @return
+ * @throws AsterixException
+ */
+ public static double[][] getBoundingBoxes(String locationValue) throws AsterixException {
double[][] locations = null;
- if (locationValue != null) {
- if (locationValue.contains(",")) {
- String[] coordinatesString = locationValue.trim().split(",");
- locations = new double[2][2];
- for (int i = 0; i < 2; i++) {
- for (int j = 0; j < 2; j++) {
- try {
- locations[i][j] = Double.parseDouble(coordinatesString[2 * i + j]);
- } catch (NumberFormatException ne) {
- throw new AsterixException("Incorrect coordinate value " + coordinatesString[2 * i + j]);
+
+ String coordRegex = "^((((\\-?\\d+\\.\\d+),\\s*){3}(\\-?\\d+\\.\\d+)|\\w+);\\s*)*(((\\-?\\d+\\.\\d+),\\s*){3}(\\-?\\d+\\.\\d+)|\\w+)$";
+ Pattern p = Pattern.compile(coordRegex);
+ Matcher m = p.matcher(locationValue);
+
+ if (m.matches()) {
+ String[] locationStrings = locationValue.trim().split(";\\s*");
+ locations = new double[locationStrings.length*2][2];
+
+ for(int i=0; i<locationStrings.length; i++) {
+ if (locationStrings[i].contains(",")) {
+ String[] coordinatesString = locationStrings[i].split(",");
+ for(int k=0; k < 2; k++) {
+ for (int l = 0; l < 2; l++) {
+ try {
+ locations[2 * i + k][l] = Double.parseDouble(coordinatesString[2 * k + l]);
+ } catch (NumberFormatException ne) {
+ throw new AsterixException("Incorrect coordinate value " + coordinatesString[2 * k + l]);
+ }
+ }
+ }
+ } else if (GeoConstants.boundingBoxes.containsKey(locationStrings[i])) {
+ // Only add known locations
+ double loc[][] = GeoConstants.boundingBoxes.get(locationStrings[i]);
+ for(int k=0; k < 2; k++) {
+ for (int l = 0; l < 2; l++) {
+ locations[2 * i + k][l] = loc[k][l];
}
}
}
- } else {
- locations = GeoConstants.boundingBoxes.get(locationValue);
- }
- if (locations != null) {
- FilterQuery filterQuery = new FilterQuery();
- filterQuery.locations(locations);
- return filterQuery;
}
}
- return null;
+ return locations;
+ }
+
+ public static FilterQuery getFilterQuery(Map<String, String> configuration) throws AsterixException {
+ String locationValue = null;
+
+ // For backward compatibility
+ if (configuration.containsKey(ConfigurationConstants.KEY_LOCATIONS)) {
+ locationValue = configuration.get(ConfigurationConstants.KEY_LOCATIONS);
+ } else {
+ locationValue = configuration.get(ConfigurationConstants.KEY_LOCATION);
+ }
+ String langValue = configuration.get(ConfigurationConstants.LANGUAGES);
+ String trackValue = configuration.get(ConfigurationConstants.TRACK);
+
+ FilterQuery filterQuery = null;
+
+ // Terms to track
+ if (trackValue != null) {
+ String keywords[] = null;
+ filterQuery = new FilterQuery();
+ if (trackValue.contains(",")) {
+ keywords = trackValue.trim().split(",\\s*");
+ } else {
+ keywords = new String[] {trackValue};
+ }
+ filterQuery = filterQuery.track(keywords);
+ }
+
+ // Language filtering parameter
+ if (langValue != null) {
+ if (filterQuery == null) {
+ filterQuery = new FilterQuery();
+ }
+ String languages[];
+ if (langValue.contains(",")) {
+ languages = langValue.trim().split(",\\s*");
+ } else {
+ languages = new String[] {langValue};
+ }
+ filterQuery = filterQuery.language(languages);
+ }
+
+ // Location filtering parameter
+ if (locationValue != null) {
+ double[][] locations = getBoundingBoxes(locationValue);
+ if (locations != null) {
+ if (filterQuery == null) {
+ filterQuery = new FilterQuery();
+ }
+ filterQuery = filterQuery.locations(locations);
+ }
+ }
+
+ // Filtering level: none, low or medium (defaul=none)
+ if(filterQuery != null) {
+ String filterValue = configuration.get(ConfigurationConstants.FILTER_LEVEL);
+ if (filterValue!=null) {
+ filterQuery = filterQuery.filterLevel(filterValue);
+ }
+ }
+
+ return filterQuery;
}
diff --git a/asterix-runtime/pom.xml b/asterix-runtime/pom.xml
index d53ab4b..a5dedb0 100644
--- a/asterix-runtime/pom.xml
+++ b/asterix-runtime/pom.xml
@@ -159,7 +159,12 @@
<dependency>
<groupId>org.twitter4j</groupId>
<artifactId>twitter4j-core</artifactId>
- <version>2.2.3</version>
+ <version>[4.0,)</version>
+ </dependency>
+ <dependency>
+ <groupId>org.twitter4j</groupId>
+ <artifactId>twitter4j-stream</artifactId>
+ <version>[4.0,)</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>