Fix for ASTERIXDB-951, ASTERIXDB-1074
Change-Id: Ibdb5074d0e0d4fb2b7d4303aa405c9fc90f4bd09
Reviewed-on: https://asterix-gerrit.ics.uci.edu/378
Reviewed-by: Chen Li <chenli@gmail.com>
Reviewed-by: Ian Maxon <imaxon@apache.org>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Heri Ramampiaro <heriram@gmail.com>
diff --git a/asterix-doc/src/site/markdown/feeds/tutorial.md b/asterix-doc/src/site/markdown/feeds/tutorial.md
index 16da5aa..1a40327 100644
--- a/asterix-doc/src/site/markdown/feeds/tutorial.md
+++ b/asterix-doc/src/site/markdown/feeds/tutorial.md
@@ -30,14 +30,14 @@
AsterixDB currently provides built-in adaptors for several popular
data sources such as Twitter, CNN, and RSS feeds. AsterixDB additionally
provides a generic socket-based adaptor that can be used
-to ingest data that is directed at a prescribed socket.
+to ingest data that is directed at a prescribed socket.
In this tutorial, we shall describe building two example data ingestion pipelines that cover the popular scenario of ingesting data from (a) Twitter and (b) RSS Feed source.
####Ingesting Twitter Stream
We shall use the built-in push-based Twitter adaptor.
-As a pre-requisite, we must define a Tweet using the AsterixDB Data Model (ADM) and the AsterixDB Query Language (AQL). Given below are the type definition in AQL that create a Tweet datatype which is representative of a real tweet as obtained from Twitter.
+As a pre-requisite, we must define a Tweet using the AsterixDB Data Model (ADM) and the AsterixDB Query Language (AQL). Given below are the type definition in AQL that create a Tweet datatype which is representative of a real tweet as obtained from Twitter.
create dataverse feeds;
use dataverse feeds;
@@ -48,7 +48,7 @@
friends_count: int32,
status_count: int32,
name: string,
- followers_count: string
+ followers_count: int32
};
create type Tweet if not exists as open{
id: string,
@@ -59,15 +59,15 @@
message_text:string
};
- create dataset Tweets (Tweet)
+ create dataset Tweets (Tweet)
primary key id;
-We also create a dataset that we shall use to persist the tweets in AsterixDB.
-Next we make use of the `create feed` AQL statement to define our example data feed.
+We also create a dataset that we shall use to persist the tweets in AsterixDB.
+Next we make use of the `create feed` AQL statement to define our example data feed.
#####Using the "push_twitter" feed adapter#####
The "push_twitter" adaptor requires setting up an application account with Twitter. To retrieve
-tweets, Twitter requires registering an application with Twitter. Registration involves providing a name and a brief description for the application. Each application has an associated OAuth authentication credential that includes OAuth keys and tokens. Accessing the
+tweets, Twitter requires registering an application with Twitter. Registration involves providing a name and a brief description for the application. Each application has an associated OAuth authentication credential that includes OAuth keys and tokens. Accessing the
Twitter API requires providing the following.
1. Consumer Key (API Key)
2. Consumer Secret (API Secret)
@@ -75,21 +75,21 @@
4. Access Token Secret
The "push_twitter" adaptor takes as configuration the above mentioned
-parameters. End users are required to obtain the above authentication credentials prior to using the "push_twitter" adaptor. For further information on obtaining OAuth keys and tokens and registering an application with Twitter, please visit http://apps.twitter.com
+parameters. End users are required to obtain the above authentication credentials prior to using the "push_twitter" adaptor. For further information on obtaining OAuth keys and tokens and registering an application with Twitter, please visit http://apps.twitter.com
-Given below is an example AQL statement that creates a feed called "TwitterFeed" by using the
-"push_twitter" adaptor.
+Given below is an example AQL statement that creates a feed called "TwitterFeed" by using the
+"push_twitter" adaptor.
use dataverse feeds;
create feed TwitterFeed if not exists using "push_twitter"
(("type-name"="Tweet"),
- ("consumer.key"="************"),
+ ("consumer.key"="************"),
("consumer.secret"="**************"),
- ("access.token"="**********"),
+ ("access.token"="**********"),
("access.token.secret"="*************"));
-It is required that the above authentication parameters are provided valid values.
+It is required that the above authentication parameters are provided valid values.
Note that the `create feed` statement does not initiate the flow of data from Twitter into our AsterixDB instance. Instead, the `create feed` statement only results in registering the feed with AsterixDB. The flow of data along a feed is initiated when it is connected
to a target dataset using the connect feed statement (which we shall revisit later).
@@ -119,7 +119,7 @@
the `TwitterFeed` feed in the `Tweets` dataset.
If it is required (by the high-level application) to also retain the raw
tweets obtained from Twitter, the end user may additionally choose
-to connect TwitterFeed to a different dataset.
+to connect TwitterFeed to a different dataset.
Let the feed run for a minute, then run the following query to see the
latest tweets that are stored into the data set.
@@ -141,36 +141,36 @@
####Ingesting an RSS Feed
-RSS (Rich Site Summary), originally RDF Site Summary and often called Really Simple Syndication, uses a family of standard web feed formats to publish frequently updated information: blog entries, news headlines, audio, video. An RSS document (called "feed", "web feed", or "channel") includes full or summarized text, and metadata, like publishing date and author's name. RSS feeds enable publishers to syndicate data automatically.
+RSS (Rich Site Summary), originally RDF Site Summary and often called Really Simple Syndication, uses a family of standard web feed formats to publish frequently updated information: blog entries, news headlines, audio, video. An RSS document (called "feed", "web feed", or "channel") includes full or summarized text, and metadata, like publishing date and author's name. RSS feeds enable publishers to syndicate data automatically.
#####Using the "rss_feed" feed adapter#####
-AsterixDB provides a built-in feed adaptor that allows retrieving data given a collection of RSS end point URLs. As observed in the case of ingesting tweets, it is required to model an RSS data item using AQL.
+AsterixDB provides a built-in feed adaptor that allows retrieving data given a collection of RSS end point URLs. As observed in the case of ingesting tweets, it is required to model an RSS data item using AQL.
use dataverse feeds;
create type Rss if not exists as open {
- id: string,
- title: string,
- description: string,
- link: string
+ id: string,
+ title: string,
+ description: string,
+ link: string
};
create dataset RssDataset (Rss)
- primary key id;
+ primary key id;
-Next, we define an RSS feed using our built-in adaptor "rss_feed".
+Next, we define an RSS feed using our built-in adaptor "rss_feed".
use dataverse feeds;
- create feed my_feed using
- rss_feed (
- ("type-name"="Rss"),
- ("url"="http://rss.cnn.com/rss/edition.rss")
- );
+ create feed my_feed using
+ rss_feed (
+ ("type-name"="Rss"),
+ ("url"="http://rss.cnn.com/rss/edition.rss")
+ );
-In the above definition, the configuration parameter "url" can be a comma-separated list that reflects a collection of RSS URLs, where each URL corresponds to an RSS endpoint or a RSS feed.
-The "rss_adaptor" retrieves data from each of the specified RSS URLs (comma separated values) in parallel.
+In the above definition, the configuration parameter "url" can be a comma-separated list that reflects a collection of RSS URLs, where each URL corresponds to an RSS endpoint or a RSS feed.
+The "rss_adaptor" retrieves data from each of the specified RSS URLs (comma separated values) in parallel.
The following statements connect the feed into the `RssDataset`:
@@ -221,15 +221,15 @@
- *excess.records.spill*: Set to true if records that cannot be processed by an operator for lack of resources (referred to as excess records hereafter) should be persisted to the local disk for deferred processing. (Default: false)
-- *excess.records.discard*: Set to true if excess records should be discarded. (Default: false)
+- *excess.records.discard*: Set to true if excess records should be discarded. (Default: false)
-- *excess.records.throttle*: Set to true if rate of arrival of records is required to be reduced in an adaptive manner to prevent having any excess records (Default: false)
+- *excess.records.throttle*: Set to true if rate of arrival of records is required to be reduced in an adaptive manner to prevent having any excess records (Default: false)
-- *excess.records.elastic*: Set to true if the system should attempt to resolve resource bottlenecks by re-structuring and/or rescheduling the feed ingestion pipeline. (Default: false)
+- *excess.records.elastic*: Set to true if the system should attempt to resolve resource bottlenecks by re-structuring and/or rescheduling the feed ingestion pipeline. (Default: false)
-- *recover.soft.failure*: Set to true if the feed must attempt to survive any runtime exception. A false value permits an early termination of a feed in such an event. (Default: true)
+- *recover.soft.failure*: Set to true if the feed must attempt to survive any runtime exception. A false value permits an early termination of a feed in such an event. (Default: true)
-- *recover.soft.failure*: Set to true if the feed must attempt to survive a hardware failures (loss of AsterixDB node(s)). A false value permits the early termination of a feed in the event of a hardware failure (Default: false)
+- *recover.soft.failure*: Set to true if the feed must attempt to survive a hardware failures (loss of AsterixDB node(s)). A false value permits the early termination of a feed in the event of a hardware failure (Default: false)
Note that the end user may choose to form a custom policy. For example,
it is possible in AsterixDB to create a custom policy that spills excess
@@ -244,5 +244,3 @@
connect feed TwitterFeed to dataset Tweets
using policy Basic ;
-
-
diff --git a/asterix-doc/src/site/markdown/udf.md b/asterix-doc/src/site/markdown/udf.md
new file mode 100644
index 0000000..2a8a2cc
--- /dev/null
+++ b/asterix-doc/src/site/markdown/udf.md
@@ -0,0 +1,170 @@
+# Support for User Defined Functions in AsterixDB #
+
+## <a id="#toc">Table of Contents</a> ##
+* [Using UDF to preprocess feed-collected data](#PreprocessingCollectedData)
+* [Writing an External UDF](#WritingAnExternalUDF)
+* [Creating an AsterixDB Library](#CreatingAnAsterixDBLibrary)
+* [Installing an AsterixDB Library](#installingUDF)
+
+In this document, we describe the support for implementing, using, and installing user-defined functions (UDF) in
+AsterixDB. We will explain how we can use UDFs to preprocess, e.g., data collected using feeds (see the [feeds tutorial](feeds/tutorial.html)).
+
+
+### <a name="installingUDF">Installing an AsterixDB Library</a>###
+
+We assume you have followed the [installation instructions](../install.html) to set up a running AsterixDB instance. Let us refer your AsterixDB instance by the name "my_asterix".
+
+- Step 1: Stop the AsterixDB instance if it is in the ACTIVE state.
+
+ $ managix stop -n my_asterix
+
+- Step 2: Install the library using Managix install command. Just to illustrate, we use the help command to look up the syntax
+
+ $ managix help -cmd install
+ Installs a library to an asterix instance.
+ Options
+ n Name of Asterix Instance
+ d Name of the dataverse under which the library will be installed
+ l Name of the library
+ p Path to library zip bundle
+
+Above is a sample output and explains the usage and the required parameters. Each library has a name and is installed under a dataverse. Recall that we had created a dataverse by the name - "feeds" prior to creating our datatypes and dataset. We shall name our library - "testlib".
+
+We assume you have a library zip bundle that needs to be installed.
+To install the library, use the Managix install command. An example is shown below.
+
+ $ managix install -n my_asterix -d feeds -l testlib -p extlibs/asterix-external-data-0.8.7-binary-assembly.zip
+
+You should see the following message:
+
+ INFO: Installed library testlib
+
+We shall next start our AsterixDB instance using the start command as shown below.
+
+ $ managix start -n my_asterix
+
+You may now use the AsterixDB library in AQL statements and queries. To look at the installed artifacts, you may execute the following query at the AsterixDB web-console.
+
+ for $x in dataset Metadata.Function
+ return $x
+
+ for $x in dataset Metadata.Library
+ return $x
+
+Our library is now installed and is ready to be used.
+
+
+## <a id="PreprocessingCollectedData">Preprocessing Collected Data</a> ###
+
+In the following we assume that you already created the `TwitterFeed` and its corresponding data types and dataset following the instruction explained in the [feeds tutorial](feeds/tutorial.html).
+
+A feed definition may optionally include the specification of a
+user-defined function that is to be applied to each feed record prior
+to persistence. Examples of pre-processing might include adding
+attributes, filtering out records, sampling, sentiment analysis, feature
+extraction, etc. We can express a UDF, which can be defined in AQL or in a programming
+language such as Java, to perform such pre-processing. An AQL UDF is a good fit when
+pre-processing a record requires the result of a query (join or aggregate)
+over data contained in AsterixDB datasets. More sophisticated
+processing such as sentiment analysis of text is better handled
+by providing a Java UDF. A Java UDF has an initialization phase
+that allows the UDF to access any resources it may need to initialize
+itself prior to being used in a data flow. It is assumed by the
+AsterixDB compiler to be stateless and thus usable as an embarrassingly
+parallel black box. In contrast, the AsterixDB compiler can
+reason about an AQL UDF and involve the use of indexes during
+its invocation.
+
+We consider an example transformation of a raw tweet into its
+lightweight version called `ProcessedTweet`, which is defined next.
+
+ use dataverse feeds;
+
+ create type ProcessedTweet if not exists as open {
+ id: string,
+ user_name:string,
+ location:point,
+ created_at:string,
+ message_text:string,
+ country: string,
+ topics: {{string}}
+ };
+
+ create dataset ProcessedTweets(ProcessedTweet)
+ primary key id;
+
+The processing required in transforming a collected tweet to its lighter version of type `ProcessedTweet` involves extracting the topics or hash-tags (if any) in a tweet
+and collecting them in the referred "topics" attribute for the tweet.
+Additionally, the latitude and longitude values (doubles) are combined into the spatial point type. Note that spatial data types are considered as first-class citizens that come with the support for creating indexes. Next we show a revised version of our example TwitterFeed that involves the use of a UDF. We assume that the UDF that contains the transformation logic into a "ProcessedTweet" is available as a Java UDF inside an AsterixDB library named 'testlib'. We defer the writing of a Java UDF and its installation as part of an AsterixDB library to a later section of this document.
+
+ use dataverse feeds;
+
+ create feed ProcessedTwitterFeed if not exists
+ using "push_twitter"
+ (("type-name"="Tweet"),
+ ("consumer.key"="************"),
+ ("consumer.secret"="**************"),
+ ("access.token"="**********"),
+ ("access.token.secret"="*************"))
+
+ apply function testlib#addHashTagsInPlace;
+
+Note that a feed adaptor and a UDF act as pluggable components. These
+contribute towards providing a generic "plug-and-play" model where
+custom implementations can be provided to cater to specific requirements.
+
+####Building a Cascade Network of Feeds####
+Multiple high-level applications may wish to consume the data
+ingested from a data feed. Each such application might perceive the
+feed in a different way and require the arriving data to be processed
+and/or persisted differently. Building a separate flow of data from
+the external source for each application is wasteful of resources as
+the pre-processing or transformations required by each application
+might overlap and could be done together in an incremental fashion
+to avoid redundancy. A single flow of data from the external source
+could provide data for multiple applications. To achieve this, we
+introduce the notion of primary and secondary feeds in AsterixDB.
+
+A feed in AsterixDB is considered to be a primary feed if it gets
+its data from an external data source. The records contained in a
+feed (subsequent to any pre-processing) are directed to a designated
+AsterixDB dataset. Alternatively or additionally, these records can
+be used to derive other feeds known as secondary feeds. A secondary
+feed is similar to its parent feed in every other aspect; it can
+have an associated UDF to allow for any subsequent processing,
+can be persisted into a dataset, and/or can be made to derive other
+secondary feeds to form a cascade network. A primary feed and a
+dependent secondary feed form a hierarchy. As an example, we next show an
+example AQL statement that redefines the previous feed
+"ProcessedTwitterFeed" in terms of their
+respective parent feed (TwitterFeed).
+
+ use dataverse feeds;
+
+ drop feed ProcessedTwitterFeed if exists;
+
+ create secondary feed ProcessedTwitterFeed from feed TwitterFeed
+ apply function testlib#addHashTags;
+
+ connect feed ProcessedTwitterFeed to dataset ProcessedTweets;
+
+The `addHashTags` function is already provided in the example UDF.To see what records
+are being inserted into the dataset, we can perform a simple dataset scan after
+allowing a few moments for the feed to start ingesting data:
+
+ use dataverse feeds;
+
+ for $i in dataset ProcessedTweets limit 10 return $i;
+
+For an example of how to write a Java UDF from scratch, the source for the example
+UDF that has been used in this tutorial is available [here] (https://github.com/apache/incubator-asterixdb/tree/master/asterix-external-data/src/test/java/org/apache/asterix/external/library)
+
+## <a name="installingUDF">Unstalling an AsterixDB Library</a>###
+
+To uninstall a library, use the Managix uninstall command as follows:
+
+ $ managix stop -n my_asterix
+
+ $ managix uninstall -n my_asterix -d feeds -l testlib
+
+
diff --git a/asterix-doc/src/site/site.xml b/asterix-doc/src/site/site.xml
index 832a121..6c1e0cb 100644
--- a/asterix-doc/src/site/site.xml
+++ b/asterix-doc/src/site/site.xml
@@ -86,6 +86,7 @@
<item name="AQL Support of Similarity Queries" href="aql/similarity.html"/>
<item name="Accessing External Data" href="aql/externaldata.html"/>
<item name="Support for Data Ingestion in AsterixDB" href="feeds/tutorial.html" />
+ <item name="Support for User Defined Functions in AsterixDB" href="udf.html" />
<item name="Filter-Based LSM Index Acceleration" href="aql/filters.html"/>
<item name="HTTP API to AsterixDB" href="api.html"/>
</menu>
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectAccessors.java b/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectAccessors.java
index 4e08d79..84e3d38 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectAccessors.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectAccessors.java
@@ -18,11 +18,6 @@
*/
package org.apache.asterix.external.library.java;
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.util.LinkedHashMap;
-import java.util.List;
-
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.dataflow.data.nontagged.serde.ABooleanSerializerDeserializer;
import org.apache.asterix.dataflow.data.nontagged.serde.ACircleSerializerDeserializer;
@@ -86,6 +81,13 @@
import org.apache.asterix.om.util.container.IObjectPool;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.util.ByteArrayAccessibleOutputStream;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.LinkedHashMap;
+import java.util.List;
public class JObjectAccessors {
@@ -224,22 +226,27 @@
}
public static class JStringAccessor implements IJObjectAccessor {
+ private final ByteArrayAccessibleOutputStream baaos = new ByteArrayAccessibleOutputStream();
@Override
public IJObject access(IVisitablePointable pointable, IObjectPool<IJObject, IAType> objectPool)
throws HyracksDataException {
- byte[] b = pointable.getByteArray();
- int s = pointable.getStartOffset();
- int l = pointable.getLength();
-
- String v = null;
- v = AStringSerializerDeserializer.INSTANCE.deserialize(
- new DataInputStream(new ByteArrayInputStream(b, s+1, l-1))).getStringValue();
- //v = new String(b, s+1, l, "UTF-8");
- JObjectUtil.getNormalizedString(v);
-
IJObject jObject = objectPool.allocate(BuiltinType.ASTRING);
- ((JString) jObject).setValue(JObjectUtil.getNormalizedString(v));
+
+ try {
+ byte byteArray[] = pointable.getByteArray();
+ int len = pointable.getLength()-3;
+ int off = pointable.getStartOffset()+3;
+ baaos.reset();
+ if(off >= 0 && off <= byteArray.length && len >= 0 && off + len - byteArray.length <= 0) {
+ baaos.write(byteArray, off, len);
+ ((JString) jObject).setValue(JObjectUtil.getNormalizedString(baaos.toString("UTF-8")));
+ } else {
+ ((JString) jObject).setValue("");
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
return jObject;
}
}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/Datatypes.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/Datatypes.java
index bfac636..a544638 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/util/Datatypes.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/Datatypes.java
@@ -20,20 +20,68 @@
public class Datatypes {
- public static final class Tweet {
+ /*
+ The following assumes this DDL (but ignoring the field name orders):
+
+ create type TwitterUser if not exists as open{
+ screen_name: string,
+ language: string,
+ friends_count: int32,
+ status_count: int32,
+ name: string,
+ followers_count: int32
+ };
+
+ create type Tweet if not exists as open{
+ id: string,
+ user: TwitterUser,
+ latitude:double,
+ longitude:double,
+ created_at:string,
+ message_text:string
+ };
+
+ */
+ public static class Tweet {
public static final String ID = "id";
public static final String USER = "user";
- public static final String MESSAGE = "message_text";
public static final String LATITUDE = "latitude";
public static final String LONGITUDE = "longitude";
public static final String CREATED_AT = "created_at";
- public static final String SCREEN_NAME = "screen_name";
+ public static final String MESSAGE = "message_text";
+
public static final String COUNTRY = "country";
+
+ // User fields (for the sub record "user")
+ public static final String SCREEN_NAME = "screen_name";
+ public static final String LANGUAGE = "language";
+ public static final String FRIENDS_COUNT = "friends_count";
+ public static final String STATUS_COUNT = "status_count";
+ public static final String NAME = "name";
+ public static final String FOLLOWERS_COUNT = "followers_count";
+
}
+
+ /*
+ The following assumes this DDL (but ignoring the field name orders):
+
+ create type ProcessedTweet if not exists as open {
+ id: string,
+ user_name:string,
+ location:point,
+ created_at:string,
+ message_text:string,
+ country: string,
+ topics: [string]
+ };
+
+ */
public static final class ProcessedTweet {
public static final String USER_NAME = "user_name";
public static final String LOCATION = "location";
public static final String TOPICS = "topics";
}
+
+
}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/TweetProcessor.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/TweetProcessor.java
index cfb818d..f8914a6 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/util/TweetProcessor.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/TweetProcessor.java
@@ -19,14 +19,20 @@
package org.apache.asterix.external.util;
import org.apache.asterix.external.library.java.JObjectUtil;
-import twitter4j.Status;
-import twitter4j.User;
+import org.apache.asterix.external.util.Datatypes.Tweet;
import org.apache.asterix.om.base.AMutableDouble;
import org.apache.asterix.om.base.AMutableInt32;
import org.apache.asterix.om.base.AMutableRecord;
import org.apache.asterix.om.base.AMutableString;
import org.apache.asterix.om.base.IAObject;
import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.IAType;
+import twitter4j.Status;
+import twitter4j.User;
+
+import java.util.HashMap;
+import java.util.Map;
public class TweetProcessor {
@@ -35,10 +41,15 @@
private AMutableRecord mutableRecord;
private AMutableRecord mutableUser;
+ private final Map<String, Integer> userFieldNameMap = new HashMap<>();
+ private final Map<String, Integer> tweetFieldNameMap = new HashMap<>();
+
+
public TweetProcessor(ARecordType recordType) {
+ initFieldNames(recordType);
mutableUserFields = new IAObject[] { new AMutableString(null), new AMutableString(null), new AMutableInt32(0),
new AMutableInt32(0), new AMutableString(null), new AMutableInt32(0) };
- mutableUser = new AMutableRecord((ARecordType) recordType.getFieldTypes()[1], mutableUserFields);
+ mutableUser = new AMutableRecord((ARecordType) recordType.getFieldTypes()[tweetFieldNameMap.get(Tweet.USER)], mutableUserFields);
mutableTweetFields = new IAObject[] { new AMutableString(null), mutableUser, new AMutableDouble(0),
new AMutableDouble(0), new AMutableString(null), new AMutableString(null) };
@@ -46,32 +57,56 @@
}
+ // Initialize the hashmap values for the field names and positions
+ private void initFieldNames(ARecordType recordType) {
+ String tweetFields[] = recordType.getFieldNames();
+ for (int i=0; i<tweetFields.length; i++) {
+ tweetFieldNameMap.put(tweetFields[i], i);
+ if (tweetFields[i].equals(Tweet.USER)) {
+ IAType fieldType = recordType.getFieldTypes()[i];
+ if (fieldType.getTypeTag() == ATypeTag.RECORD) {
+ String userFields[] = ((ARecordType)fieldType).getFieldNames();
+ for (int j=0; j<userFields.length; j++) {
+ userFieldNameMap.put(userFields[j], j);
+ }
+ }
+
+ }
+ }
+ }
+
+
public AMutableRecord processNextTweet(Status tweet) {
User user = tweet.getUser();
- ((AMutableString) mutableUserFields[0]).setValue(JObjectUtil.getNormalizedString(user.getScreenName()));
- ((AMutableString) mutableUserFields[1]).setValue(JObjectUtil.getNormalizedString(user.getLang()));
- ((AMutableInt32) mutableUserFields[2]).setValue(user.getFriendsCount());
- ((AMutableInt32) mutableUserFields[3]).setValue(user.getStatusesCount());
- ((AMutableString) mutableUserFields[4]).setValue(JObjectUtil.getNormalizedString(user.getName()));
- ((AMutableInt32) mutableUserFields[5]).setValue(user.getFollowersCount());
- ((AMutableString) mutableTweetFields[0]).setValue(tweet.getId() + "");
+ // Tweet user data
+ ((AMutableString) mutableUserFields[userFieldNameMap.get(Tweet.SCREEN_NAME)]).setValue(JObjectUtil.getNormalizedString(user.getScreenName()));
+ ((AMutableString) mutableUserFields[userFieldNameMap.get(Tweet.LANGUAGE)]).setValue(JObjectUtil.getNormalizedString(user.getLang()));
+ ((AMutableInt32) mutableUserFields[userFieldNameMap.get(Tweet.FRIENDS_COUNT)]).setValue(user.getFriendsCount());
+ ((AMutableInt32) mutableUserFields[userFieldNameMap.get(Tweet.STATUS_COUNT)]).setValue(user.getStatusesCount());
+ ((AMutableString) mutableUserFields[userFieldNameMap.get(Tweet.NAME)]).setValue(JObjectUtil.getNormalizedString(user.getName()));
+ ((AMutableInt32) mutableUserFields[userFieldNameMap.get(Tweet.FOLLOWERS_COUNT)]).setValue(user.getFollowersCount());
- for (int i = 0; i < 6; i++) {
- ((AMutableRecord) mutableTweetFields[1]).setValueAtPos(i, mutableUserFields[i]);
+
+ // Tweet data
+ ((AMutableString) mutableTweetFields[tweetFieldNameMap.get(Tweet.ID)]).setValue(String.valueOf(tweet.getId()));
+
+ int userPos = tweetFieldNameMap.get(Tweet.USER);
+ for (int i = 0; i < mutableUserFields.length; i++) {
+ ((AMutableRecord) mutableTweetFields[userPos]).setValueAtPos(i, mutableUserFields[i]);
}
if (tweet.getGeoLocation() != null) {
- ((AMutableDouble) mutableTweetFields[2]).setValue(tweet.getGeoLocation().getLatitude());
- ((AMutableDouble) mutableTweetFields[3]).setValue(tweet.getGeoLocation().getLongitude());
+ ((AMutableDouble) mutableTweetFields[tweetFieldNameMap.get(Tweet.LATITUDE)]).setValue(tweet.getGeoLocation().getLatitude());
+ ((AMutableDouble) mutableTweetFields[tweetFieldNameMap.get(Tweet.LONGITUDE)]).setValue(tweet.getGeoLocation().getLongitude());
} else {
- ((AMutableDouble) mutableTweetFields[2]).setValue(0);
- ((AMutableDouble) mutableTweetFields[3]).setValue(0);
+ ((AMutableDouble) mutableTweetFields[tweetFieldNameMap.get(Tweet.LATITUDE)]).setValue(0);
+ ((AMutableDouble) mutableTweetFields[tweetFieldNameMap.get(Tweet.LONGITUDE)]).setValue(0);
}
- ((AMutableString) mutableTweetFields[4]).setValue(JObjectUtil.getNormalizedString(
+ ((AMutableString) mutableTweetFields[tweetFieldNameMap.get(Tweet.CREATED_AT)]).setValue(JObjectUtil.getNormalizedString(
tweet.getCreatedAt().toString()));
- ((AMutableString) mutableTweetFields[5]).setValue(JObjectUtil.getNormalizedString(tweet.getText()));
+ ((AMutableString) mutableTweetFields[tweetFieldNameMap.get(Tweet.MESSAGE)]).setValue(JObjectUtil.getNormalizedString(tweet.getText()));
- for (int i = 0; i < 6; i++) {
+ for (int i = 0; i < mutableTweetFields.length; i++) {
mutableRecord.setValueAtPos(i, mutableTweetFields[i]);
}
diff --git a/asterix-external-data/src/test/java/org/apache/asterix/external/library/AddHashTagsFunction.java b/asterix-external-data/src/test/java/org/apache/asterix/external/library/AddHashTagsFunction.java
index 6e53578..bca508f 100644
--- a/asterix-external-data/src/test/java/org/apache/asterix/external/library/AddHashTagsFunction.java
+++ b/asterix-external-data/src/test/java/org/apache/asterix/external/library/AddHashTagsFunction.java
@@ -51,7 +51,10 @@
if (latitude != null && longitude != null) {
location.setValue(latitude.getValue(), longitude.getValue());
+ } else {
+ location.setValue(0, 0);
}
+
String[] tokens = text.getValue().split(" ");
for (String tk : tokens) {
if (tk.startsWith("#")) {
diff --git a/asterix-installer/pom.xml b/asterix-installer/pom.xml
index 1fdad71..3eb3dfa 100644
--- a/asterix-installer/pom.xml
+++ b/asterix-installer/pom.xml
@@ -271,6 +271,13 @@
</dependency>
<dependency>
<groupId>org.apache.asterix</groupId>
+ <artifactId>asterix-external-data</artifactId>
+ <version>0.8.7-SNAPSHOT</version>
+ <type>zip</type>
+ <classifier>binary-assembly</classifier>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.asterix</groupId>
<artifactId>asterix-test-framework</artifactId>
<version>0.8.7-SNAPSHOT</version>
<scope>test</scope>
diff --git a/asterix-installer/src/main/assembly/binary-assembly.xml b/asterix-installer/src/main/assembly/binary-assembly.xml
index 56115f6..d25281e 100644
--- a/asterix-installer/src/main/assembly/binary-assembly.xml
+++ b/asterix-installer/src/main/assembly/binary-assembly.xml
@@ -124,5 +124,13 @@
<unpack>false</unpack>
<useTransitiveDependencies>false</useTransitiveDependencies>
</dependencySet>
+ <dependencySet>
+ <outputDirectory>extlibs</outputDirectory>
+ <includes>
+ <include>asterix-external-data:*:zip</include>
+ </includes>
+ <unpack>false</unpack>
+ <useTransitiveDependencies>false</useTransitiveDependencies>
+ </dependencySet>
</dependencySets>
</assembly>