Fix for Issue 929
commit 58317b37a3a7b2546b4780f4427ae1ed21a4ece9
Author: Ubuntu <raman@ramangro.ramangro.d3.internal.cloudapp.net>
Date: Fri Aug 14 10:47:06 2015 +0000
Fix for Issue 929:
a) Added documenation for use of OAuth keys and tokens when using the built-in Twitter adaptor
b) Modified RSS feed adaptor and added documenation
Change-Id: I5521287a4fa1818c78a4f83b1a3cabeea8e6096d
Reviewed-on: https://asterix-gerrit.ics.uci.edu/354
Reviewed-by: Ian Maxon <imaxon@apache.org>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
index dad5975..c34c297 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
@@ -187,6 +187,7 @@
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.algebricks.common.utils.Triple;
import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
@@ -2129,10 +2130,10 @@
// All Metadata checks have passed. Feed connect request is valid. //
FeedPolicyAccessor policyAccessor = new FeedPolicyAccessor(feedPolicy.getProperties());
- Pair<FeedConnectionRequest, Boolean> p = getFeedConnectionRequest(dataverseName, feed,
+ Triple<FeedConnectionRequest, Boolean, List<IFeedJoint>> triple = getFeedConnectionRequest(dataverseName, feed,
cbfs.getDatasetName(), feedPolicy, mdTxnCtx);
- FeedConnectionRequest connectionRequest = p.first;
- boolean createFeedIntakeJob = p.second;
+ FeedConnectionRequest connectionRequest = triple.first;
+ boolean createFeedIntakeJob = triple.second;
FeedLifecycleListener.INSTANCE.registerFeedEventSubscriber(feedConnId, eventSubscriber);
subscriberRegistered = true;
@@ -2142,6 +2143,11 @@
feedId.getDataverse(), feedId.getFeedName());
Pair<JobSpecification, IFeedAdapterFactory> pair = FeedOperations.buildFeedIntakeJobSpec(primaryFeed,
metadataProvider, policyAccessor);
+ // adapter configuration are valid at this stage
+ // register the feed joints (these are auto-de-registered)
+ for (IFeedJoint fj : triple.third){
+ FeedLifecycleListener.INSTANCE.registerFeedJoint(fj);
+ }
runJob(hcc, pair.first, false);
IFeedAdapterFactory adapterFactory = pair.second;
if (adapterFactory.isRecordTrackingEnabled()) {
@@ -2149,6 +2155,10 @@
adapterFactory.createIntakeProgressTracker());
}
eventSubscriber.assertEvent(FeedLifecycleEvent.FEED_INTAKE_STARTED);
+ } else {
+ for (IFeedJoint fj : triple.third){
+ FeedLifecycleListener.INSTANCE.registerFeedJoint(fj);
+ }
}
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
bActiveTxn = false;
@@ -2193,7 +2203,7 @@
* @return
* @throws MetadataException
*/
- private Pair<FeedConnectionRequest, Boolean> getFeedConnectionRequest(String dataverse, Feed feed, String dataset,
+ private Triple<FeedConnectionRequest, Boolean, List<IFeedJoint>> getFeedConnectionRequest(String dataverse, Feed feed, String dataset,
FeedPolicy feedPolicy, MetadataTransactionContext mdTxnCtx) throws MetadataException {
IFeedJoint sourceFeedJoint = null;
FeedConnectionRequest request = null;
@@ -2235,9 +2245,6 @@
ConnectionLocation.SOURCE_FEED_COMPUTE_STAGE, FeedJointType.COMPUTE, connectionId);
jointsToRegister.add(computeFeedJoint);
}
- for (IFeedJoint joint : jointsToRegister) {
- FeedLifecycleListener.INSTANCE.registerFeedJoint(joint);
- }
} else {
sourceFeedJoint = FeedLifecycleListener.INSTANCE.getFeedJoint(feedJointKey);
connectionLocation = sourceFeedJoint.getConnectionLocation();
@@ -2250,7 +2257,7 @@
dataset, feedPolicy.getPolicyName(), feedPolicy.getProperties(), feed.getFeedId());
sourceFeedJoint.addConnectionRequest(request);
- return new Pair<FeedConnectionRequest, Boolean>(request, needIntakeJob);
+ return new Triple<FeedConnectionRequest, Boolean, List<IFeedJoint>>(request, needIntakeJob, jointsToRegister);
}
/*
diff --git a/asterix-app/src/test/resources/metadata/results/basic/meta15.adm b/asterix-app/src/test/resources/metadata/results/basic/meta15.adm
index ec29b68..237ac06 100644
--- a/asterix-app/src/test/resources/metadata/results/basic/meta15.adm
+++ b/asterix-app/src/test/resources/metadata/results/basic/meta15.adm
@@ -4,6 +4,7 @@
, { "DataverseName": "Metadata", "Name": "hive", "Classname": "edu.uci.ics.asterix.external.adapter.factory.HiveAdapterFactory", "Type": "INTERNAL", "Timestamp": "Tue Jul 16 22:38:45 PDT 2013" }
, { "DataverseName": "Metadata", "Name": "localfs", "Classname": "edu.uci.ics.asterix.external.adapter.factory.NCFileSystemAdapterFactory", "Type": "INTERNAL", "Timestamp": "Tue Jul 16 22:38:45 PDT 2013" }
, { "DataverseName": "Metadata", "Name": "pull_twitter", "Classname": "edu.uci.ics.asterix.external.adapter.factory.PullBasedTwitterAdapterFactory", "Type": "INTERNAL", "Timestamp": "Tue Jul 16 22:38:45 PDT 2013" }
+, { "DataverseName": "Metadata", "Name": "push_twitter", "Classname": "edu.uci.ics.asterix.external.adapter.factory.PushBasedTwitterAdapterFactory", "Type": "INTERNAL", "Timestamp": "Tue Jul 16 22:38:45 PDT 2013" }
, { "DataverseName": "Metadata", "Name": "rss_feed", "Classname": "edu.uci.ics.asterix.external.adapter.factory.RSSFeedAdapterFactory", "Type": "INTERNAL", "Timestamp": "Tue Jul 16 22:38:45 PDT 2013" }
, { "DataverseName": "Metadata", "Name": "socket_adapter", "Classname": "edu.uci.ics.asterix.tools.external.data.GenericSocketFeedAdapterFactory", "Type": "INTERNAL", "Timestamp": "Tue Jul 16 22:38:45 PDT 2013" }
, { "DataverseName": "Metadata", "Name": "socket_client", "Classname": "edu.uci.ics.asterix.tools.external.data.SocketClientAdapterFactory", "Type": "INTERNAL", "Timestamp": "Wed Nov 20 14:45:58 IST 2013" }
diff --git a/asterix-app/src/test/resources/metadata/results/basic/meta15/meta15.1.adm b/asterix-app/src/test/resources/metadata/results/basic/meta15/meta15.1.adm
index 2bc2ca1..c00c88f 100644
--- a/asterix-app/src/test/resources/metadata/results/basic/meta15/meta15.1.adm
+++ b/asterix-app/src/test/resources/metadata/results/basic/meta15/meta15.1.adm
@@ -5,6 +5,7 @@
, { "DataverseName": "Metadata", "Name": "hive", "Classname": "edu.uci.ics.asterix.external.adapter.factory.HiveAdapterFactory", "Type": "INTERNAL", "Timestamp": "Tue Jul 16 22:38:45 PDT 2013" }
, { "DataverseName": "Metadata", "Name": "localfs", "Classname": "edu.uci.ics.asterix.external.adapter.factory.NCFileSystemAdapterFactory", "Type": "INTERNAL", "Timestamp": "Tue Jul 16 22:38:45 PDT 2013" }
, { "DataverseName": "Metadata", "Name": "pull_twitter", "Classname": "edu.uci.ics.asterix.external.adapter.factory.PullBasedTwitterAdapterFactory", "Type": "INTERNAL", "Timestamp": "Tue Jul 16 22:38:45 PDT 2013" }
+, { "DataverseName": "Metadata", "Name": "push_twitter", "Classname": "edu.uci.ics.asterix.external.adapter.factory.PushBasedTwitterAdapterFactory", "Type": "INTERNAL", "Timestamp": "Tue Jul 16 22:38:45 PDT 2013" }
, { "DataverseName": "Metadata", "Name": "rss_feed", "Classname": "edu.uci.ics.asterix.external.adapter.factory.RSSFeedAdapterFactory", "Type": "INTERNAL", "Timestamp": "Tue Jul 16 22:38:45 PDT 2013" }
, { "DataverseName": "Metadata", "Name": "socket_adapter", "Classname": "edu.uci.ics.asterix.tools.external.data.GenericSocketFeedAdapterFactory", "Type": "INTERNAL", "Timestamp": "Tue Jul 16 22:38:45 PDT 2013" }
, { "DataverseName": "Metadata", "Name": "socket_client", "Classname": "edu.uci.ics.asterix.tools.external.data.SocketClientAdapterFactory", "Type": "INTERNAL", "Timestamp": "Wed Nov 20 14:45:58 IST 2013" }
diff --git a/asterix-doc/src/site/markdown/feeds/tutorial.md b/asterix-doc/src/site/markdown/feeds/tutorial.md
index 886d29d..009e2b1 100644
--- a/asterix-doc/src/site/markdown/feeds/tutorial.md
+++ b/asterix-doc/src/site/markdown/feeds/tutorial.md
@@ -1,19 +1,21 @@
# Support for Data Ingestion in AsterixDB #
-## <a id="toc">Table of Contents</a> ##
+## <a id="#toc">Table of Contents</a> ##
* [Introduction](#Introduction)
* [Data Feed Basics](#DataFeedBasics)
* [Collecting Data: Feed Adaptors](#FeedAdaptors)
* [Preprocessing Collected Data](#PreprocessingCollectedData)
- * [Creating an External Dataset](#IntroductionCreatingAnExternalDataset)
+ * [Creating an External UDF](#CreatingAnExternalUDF)
+ * [Installing an AsterixDB Library](#InstallingAnAsterixDBLibrary)
-## <a id="Introduction">Introduction</a> <font size="4"><a href="#toc">[Back to TOC]</a></font> ##
-In this document, we describe the support for data ingestion in AsterixDB, an open-source Big Data Management System (BDMS) that provides a platform for storage and analysis of large volumes of semi-structured data. Data feeds are a new mechanism for having
-continuous data arrive into a BDMS from external sources and incrementally populate a persisted dataset and associated indexes. We add a new BDMS architectural component, called a data feed, that makes a Big Data system the caretaker for functionality that
+
+## <a name="Introduction">Introduction</a> ##
+
+In this document, we describe the support for data ingestion in AsterixDB, an open-source Big Data Management System (BDMS) that provides a platform for storage and analysis of large volumes of semi-structured data. Data feeds are a new mechanism for having continuous data arrive into a BDMS from external sources and incrementally populate a persisted dataset and associated indexes. We add a new BDMS architectural component, called a data feed, that makes a Big Data system the caretaker for functionality that
used to live outside, and we show how it improves users’ lives and system performance.
-### <a id="DataFeedBasics">Data Feed Basics</a> <font size="4"><a href="#toc">[Back to TOC]</a></font> ###
+## <a name="DataFeedBasics">Data Feed Basics</a> ##
####Collecting Data: Feed Adaptors####
The functionality of establishing a connection with a data source
@@ -34,9 +36,12 @@
provides a generic socket-based adaptor that can be used
to ingest data that is directed at a prescribed socket.
-Next, we consider creating an example feed that consists of tweets obtained from
-the Twitter service. To do so, we use the built-in push-based Twitter adaptor.
-To being with, we must define a Tweet using the AsterixDB Data Model (ADM) and the AsterixDB Query Language (AQL). Given below are the type definitions in AQL that create a Tweet datatype which is representative of a real tweet as obtained from Twitter.
+
+In this tutorial, we shall describe building two example data ingestion pipelines that cover the popular scenario of ingesting data from (a) Twitter and (b) RSS Feed source.
+
+####Ingesting Twitter Stream
+We shall use the built-in push-based Twitter adaptor.
+As a pre-requisite, we must define a Tweet using the AsterixDB Data Model (ADM) and the AsterixDB Query Language (AQL). Given below are the type definition in AQL that create a Tweet datatype which is representative of a real tweet as obtained from Twitter.
create dataverse feeds;
use dataverse feeds;
@@ -58,16 +63,79 @@
message_text:string
};
+ create dataset Tweets (Tweet)
+ primary key id;
+We also create a dataset that we shall use to persist the tweets in AsterixDB.
Next we make use of the create feed AQL statement to define our example data feed.
- create feed TwitterFeed if not exists using "push_twitter"
- (("type-name"="Tweet"),("location"="US"));
+#####Using the "push_twitter" feed adapter#####
+The push_twitter adaptor requires setting up an application account with Twitter. To retrieve
+tweets, Twitter requires registering an application with Twitter. Registration involves providing a name and a brief description for the application. Each application has an associated OAuth authentication credential that includes OAuth keys and tokens. Accessing the
+Twitter API requires providing the following.
+1. Consumer Key (API Key)
+2. Consumer Secret (API Secret)
+3. Access Token
+4. Access Token Secret
+
+The "push_twitter" adaptor takes as configuration the above mentioned parameters. End-user(s) are required to obtain the above authentication credentials prior to using the "push_twitter" adaptor. For further information on obtaining OAuth keys and tokens and registering an application with Twitter, please visit http://apps.twitter.com
+
+Given below is an example AQL statement that creates a feed - TwitterFeed by using the
+"push_twitter" adaptor.
+
+
+
+ create feed TwitterFeed if not exists using "push_twitter"
+ (("type-name"="Tweet"),
+ ("consumer.key"="************"),
+ ("consumer.secret"="**************"),
+ ("access.token"="**********"),
+ ("access.token.secret"="*************"));
+
+It is required that the above authentication parameters are provided valid values.
Note that the create feed statement does not initiate the flow of data from Twitter into our AsterixDB instance. Instead, the create feed statement only results in registering the feed with AsterixDB. The flow of data along a feed is initiated when it is connected
to a target dataset using the connect feed statement (which we shall revisit later).
-####Preprocessing Collected Data####
+
+####Ingesting an RSS Feed
+RSS (Rich Site Summary); originally RDF Site Summary; often called Really Simple Syndication, uses a family of standard web feed formats to publish frequently updated information: blog entries, news headlines, audio, video. An RSS document (called "feed", "web feed", or "channel") includes full or summarized text, and metadata, like publishing date and author's name. RSS feeds enable publishers to syndicate data automatically.
+
+
+#####Using the "rss_feed" feed adapter#####
+AsterixDB provides a built-in feed adaptor that allows retrieving data given a collection of RSS end point URLs. As observed in the case of ingesting tweets, it is required to model an RSS data item using AQL.
+
+ create dataverse feeds if not exists;
+ use dataverse feeds;
+
+ create type Rss if not exists as open{
+ id: string,
+ title: string,
+ description: string,
+ link: string
+ };
+
+ create dataset RssDataset (Rss)
+ primary key id;
+
+
+Next, we define an RSS feed using our built-in adaptor "rss_feed".
+
+ create feed my_feed using
+ rss_feed (
+ ("type-name"="Rss"),
+ ("url"="http://rss.cnn.com/rss/edition.rss")
+ );
+
+In the above definition, the configuration parameter "url" can be a comma separated list that reflects a collection of RSS URLs, where each URL corresponds to an RSS endpoint or a RSS feed.
+The "rss_adaptor" retrieves data from each of the specified RSS URLs (comma separated values) in parallel.
+
+
+So far, we have discussed the mechanism for retrieving data from the external world into the AsterixDB system. However, the arriving data may require certain pre-processing prior to being persisted in AsterixDB storage. Next, we discuss how the arriving data can be pre-processed.
+
+
+
+## <a id="PreprocessingCollectedData">Preprocessing Collected Data</a> ###
A feed definition may optionally include the specification of a
user-defined function that is to be applied to each feed record prior
to persistence. Examples of pre-processing might include adding
@@ -86,8 +154,7 @@
reason about an AQL UDF and involve the use of indexes during
its invocation.
-The tweets collected by the Twitter Adaptor (push_twiiter) (Figure 4) conform
-to the Tweet datatype (defined earlier). We consider an example transformation of a raw tweet into its lightweight version - ProcessedTweet - which is defined next.
+We consider an example transformation of a raw tweet into its lightweight version - ProcessedTweet - which is defined next.
create type ProcessedTweet if not exists as open {
id: string,
@@ -100,13 +167,13 @@
};
-The processing required in transforming a collected tweet to its lighter version (of type ProcessedTweet) involves extracting the topicsor hash-tags (if any) in a tweet
+The processing required in transforming a collected tweet to its lighter version (of type ProcessedTweet) involves extracting the topics or hash-tags (if any) in a tweet
and collecting them in the referred-topics attribute for the tweet.
Additionally, the latitude and longitude values (doubles) are combined into the spatial point type. Note that spatial data types are considered as first class citizens that come with the support for creating indexes. Next we show a revised version of our example TwitterFeed that involves the use of a UDF. We assume that the UDF that contains the transformation logic into a ProcessedTweet is avaialable as a Java UDF inside an AsterixDB library named 'testlib'. We defer the writing of a Java UDF and its installation as part of an AsterixDB library to a later section of this document.
create feed ProcessedTwitterFeed if not exists
using "push_twitter"
- (("type-name"="Tweet"),("location"="US"));
+ (("type-name"="Tweet"));
apply function testlib#processRawTweet;
Note that a feed adaptor and a UDF act as pluggable components. These
@@ -209,14 +276,22 @@
provides a set of built-in policies, each constructed by setting
appropriate value(s) for the policy parameter(s) from the table below.
-Policy Parameter | Description | Default Value
-------------------|------------|---------------|
-excess.records.spill | Set to true if records that cannot be processed by an operator for lack of resources (referred to as excess records hereafter) should be persisted to the local disk for deferred processing. | false |
-excess.records.discard | Set to true if excess records should be discarded. | false |
-excess.records.throttle | Set to true if rate of arrival of records is required to be reduced in an adaptive manner to prevent having any excess records. | false |
-excess.records.elastic | Set to true if the system should attempt to resolve resource bottlenecks by re-structuring and/or rescheduling the feed ingestion pipeline. | false |
-recover.soft.failure | Set to true if the feed must attempt to survive any runtime exception. A false value permits an early termination of a feed in such an event. | true |
-recover.hard.failure | Set to true if the feed must attempt to survive a hardware failures (loss of AsterixDB node(s)). A false value permits the early termination of a feed in the event of a hardware failure | |
+
+
+####Policy Parameters
+
+
+- *excess.records.spill*: Set to true if records that cannot be processed by an operator for lack of resources (referred to as excess records hereafter) should be persisted to the local disk for deferred processing. (Default: false)
+
+- *excess.records.discard*: Set to true if excess records should be discarded. (Default: false)
+
+- *excess.records.throttle*: Set to true if rate of arrival of records is required to be reduced in an adaptive manner to prevent having any excess records (Default: false)
+
+- *excess.records.elastic*: Set to true if the system should attempt to resolve resource bottlenecks by re-structuring and/or rescheduling the feed ingestion pipeline. (Default: false)
+
+- *recover.soft.failure*: Set to true if the feed must attempt to survive any runtime exception. A false value permits an early termination of a feed in such an event. (Default: true)
+
+- *recover.soft.failure*: Set to true if the feed must attempt to survive a hardware failures (loss of AsterixDB node(s)). A false value permits the early termination of a feed in the event of a hardware failure (Default: false)
Note that the end user may choose to form a custom policy. E.g.
it is possible in AsterixDB to create a custom policy that spills excess
@@ -232,7 +307,8 @@
using policy Basic ;
-####Writing an External UDF####
+## <a id="WritingAnExternalUDF">Writing an External UDF</a> ###
+
A Java UDF in AsterixDB is required to implement an prescribe interface. We shall next write a basic UDF that extracts the hashtags contained in the tweet's text and appends each into an unordered list. The list is added as an additional attribute to the tweet to form the augment version - ProcessedTweet.
package edu.uci.ics.asterix.external.library;
@@ -285,4 +361,87 @@
}
+## <a id="CreatingAnAsterixDBLibrary">Creating an AsterixDB Library</a> ###
+We need to install our Java UDF so that we may use it in AQL statements/queries. An AsterixDB library has a pre-defined structure which is as follows.
+
+
+- jar file: A jar file that would contain the class files for your UDF source code.
+- library descriptor.xml: This is a descriptor that provide meta-information about the library.
+
+ <externalLibrary xmlns="library">
+ <language>JAVA</language>
+ <libraryFunctions>
+ <libraryFunction>
+ <function_type>SCALAR</function_type>
+ <name>addFeatures</name>
+ <arguments>Tweet</arguments>
+ <return_type>ProcessedTweet</return_type>
+ <definition>edu.uci.ics.asterix.external.library.AddHashTagsFactory
+ </definition>
+ </libraryFunction>
+ </libraryFunctions>
+ </externalLibrary>
+
+
+- lib: other dependency jars
+
+If the Java UDF requires additional dependency jars, you may add them under a "lib" folder is required.
+
+We create a zip bundle that contains the jar file and the library descriptor xml file. The zip would have the following structure.
+
+ $ unzip -l ./tweetlib.zip
+ Archive: ./tweetlib.zip
+ Length Date Time Name
+ -------- ---- ---- ----
+ 760817 04-23-14 17:16 hash-tags.jar
+ 405 04-23-14 17:16 tweet.xml
+ -------- -------
+ 761222 2 files
+
+
+###Installing an AsterixDB Library###
+
+We assume you have followed the [http://asterixdb.ics.uci.edu/documentation/install.html instructions] to set up a running AsterixDB instance. Let us refer your AsterixDB instance by the name "my_asterix".
+
+
+- Step 1: Stop the AsterixDB instance if it is in the ACTIVE state.
+
+ $ managix stop -n my_asterix
+
+
+- Step 2: Install the library using Managix install command. Just to illustrate, we use the help command to look up the syntax
+
+ $ managix help -cmd install
+ Installs a library to an asterix instance.
+ Options
+ n Name of Asterix Instance
+ d Name of the dataverse under which the library will be installed
+ l Name of the library
+ p Path to library zip bundle
+
+
+Above is a sample output and explains the usage and the required parameters. Each library has a name and is installed under a dataverse. Recall that we had created a dataverse by the name - "feeds" prior to creating our datatypes and dataset. We shall name our library - "testlib".
+
+We assume you have a library zip bundle that needs to be installed.
+To install the library, use the Managix install command. An example is shown below.
+
+ $ managix install -n my_asterix -d feeds -l testlib -p <put the absolute path of the library zip bundle here>
+
+You should see the following message:
+
+ INFO: Installed library testlib
+
+We shall next start our AsterixDB instance using the start command as shown below.
+
+ $ managix start -n my_asterix
+
+You may now use the AsterixDB library in AQL statements and queries. To look at the installed artifacts, you may execute the following query at the AsterixDB web-console.
+
+ for $x in dataset Metadata.Function
+ return $x
+
+ for $x in dataset Metadata.Library
+ return $x
+
+Our library is now installed and is ready to be used.
\ No newline at end of file
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PushBasedTwitterAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PushBasedTwitterAdapterFactory.java
index 8af49fb..f77efbf 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PushBasedTwitterAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PushBasedTwitterAdapterFactory.java
@@ -20,6 +20,7 @@
import edu.uci.ics.asterix.common.feeds.api.IIntakeProgressTracker;
import edu.uci.ics.asterix.external.dataset.adapter.PushBasedTwitterAdapter;
import edu.uci.ics.asterix.external.util.TwitterUtil;
+import edu.uci.ics.asterix.external.util.TwitterUtil.AuthenticationConstants;
import edu.uci.ics.asterix.metadata.feeds.IFeedAdapterFactory;
import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
@@ -62,6 +63,16 @@
this.outputType = outputType;
this.configuration = configuration;
TwitterUtil.initializeConfigurationWithAuthInfo(configuration);
+ boolean requiredParamsSpecified = validateConfiguration(configuration);
+ if(!requiredParamsSpecified){
+ StringBuilder builder = new StringBuilder();
+ builder.append("One or more parameters are missing from adapter configuration\n");
+ builder.append(AuthenticationConstants.OAUTH_CONSUMER_KEY + "\n");
+ builder.append(AuthenticationConstants.OAUTH_CONSUMER_SECRET + "\n");
+ builder.append(AuthenticationConstants.OAUTH_ACCESS_TOKEN + "\n");
+ builder.append(AuthenticationConstants.OAUTH_ACCESS_TOKEN_SECRET + "\n");
+ throw new Exception(builder.toString());
+ }
}
@Override
@@ -80,4 +91,16 @@
return null;
}
+ private boolean validateConfiguration(Map<String, String> configuration) {
+ String consumerKey = configuration.get(AuthenticationConstants.OAUTH_CONSUMER_KEY);
+ String consumerSecret = configuration.get(AuthenticationConstants.OAUTH_CONSUMER_SECRET);
+ String accessToken = configuration.get(AuthenticationConstants.OAUTH_ACCESS_TOKEN);
+ String tokenSecret = configuration.get(AuthenticationConstants.OAUTH_ACCESS_TOKEN_SECRET);
+
+ if(consumerKey == null || consumerSecret == null || accessToken == null || tokenSecret == null){
+ return false;
+ }
+ return true;
+ }
+
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java
index 4ab7d22..baab239 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java
@@ -32,7 +32,7 @@
private static final long serialVersionUID = 1L;
- private static final String KEY_RSS_URL = "rss_url";
+ private static final String KEY_RSS_URL = "url";
private List<String> feedURLs = new ArrayList<String>();
private String id_prefix = "";
@@ -46,6 +46,7 @@
super(configuration, ctx);
id_prefix = ctx.getJobletContext().getApplicationContext().getNodeId();
this.recordType = recordType;
+ reconfigure(configuration);
}
private void initializeFeedURLs(String rssURLProperty) {
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/util/TwitterUtil.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/util/TwitterUtil.java
index 8a74963..2737582 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/util/TwitterUtil.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/util/TwitterUtil.java
@@ -18,6 +18,8 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
+import java.util.logging.Logger;
+import java.util.logging.Level;
import twitter4j.FilterQuery;
import twitter4j.Twitter;
@@ -29,6 +31,9 @@
public class TwitterUtil {
+
+ private static Logger LOGGER = Logger.getLogger(TwitterUtil.class.getName());
+
public static class ConfigurationConstants {
public static final String KEY_LOCATION = "location";
public static final String LOCATION_US = "US";
@@ -77,7 +82,22 @@
public static Twitter getTwitterService(Map<String, String> configuration) {
ConfigurationBuilder cb = getAuthConfiguration(configuration);
- TwitterFactory tf = new TwitterFactory(cb.build());
+ TwitterFactory tf = null;
+ try{
+ tf = new TwitterFactory(cb.build());
+ } catch (Exception e){
+ if (LOGGER.isLoggable(Level.WARNING)){
+ StringBuilder builder = new StringBuilder();
+ builder.append("Twitter Adapter requires the following config parameters\n");
+ builder.append(AuthenticationConstants.OAUTH_CONSUMER_KEY + "\n");
+ builder.append(AuthenticationConstants.OAUTH_CONSUMER_SECRET + "\n");
+ builder.append(AuthenticationConstants.OAUTH_ACCESS_TOKEN + "\n");
+ builder.append(AuthenticationConstants.OAUTH_ACCESS_TOKEN_SECRET + "\n");
+ LOGGER.warning(builder.toString());
+ LOGGER.warning("Unable to configure Twitter adapter due to incomplete/incorrect authentication credentials");
+ LOGGER.warning("For details on how to obtain OAuth authentication token, visit https://dev.twitter.com/oauth/overview/application-owner-access-tokens");
+ }
+ }
Twitter twitter = tf.getInstance();
return twitter;
}
@@ -132,8 +152,10 @@
break;
}
} catch (Exception e) {
- throw new AsterixException("Incorrect configuration! unable to load authentication credentials "
- + e.getMessage());
+ if(LOGGER.isLoggable(Level.WARNING)){
+ LOGGER.warning("unable to load authentication credentials from auth.properties file" +
+ "credential information will be obtained from adapter's configuration");
+ }
}
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
index 97c90f8..62688de 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -323,6 +323,7 @@
"edu.uci.ics.asterix.external.adapter.factory.HDFSAdapterFactory",
"edu.uci.ics.asterix.external.adapter.factory.HiveAdapterFactory",
"edu.uci.ics.asterix.external.adapter.factory.PullBasedTwitterAdapterFactory",
+ "edu.uci.ics.asterix.external.adapter.factory.PushBasedTwitterAdapterFactory",
"edu.uci.ics.asterix.external.adapter.factory.RSSFeedAdapterFactory",
"edu.uci.ics.asterix.external.adapter.factory.CNNFeedAdapterFactory",
"edu.uci.ics.asterix.tools.external.data.RateControlledFileSystemBasedAdapterFactory",