minor changes to pull based twitter adapter
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedTwitterAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedTwitterAdapterFactory.java
index 81b73e7..6058bd2 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedTwitterAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedTwitterAdapterFactory.java
@@ -36,11 +36,24 @@
public static final String PULL_BASED_TWITTER_ADAPTER_NAME = "pull_twitter";
private Map<String, String> configuration;
- private static ARecordType recordType;
+ private static ARecordType recordType = initOutputType();
+
+ private static ARecordType initOutputType() {
+ ARecordType recordType = null;
+ String[] fieldNames = { "id", "username", "location", "text", "timestamp" };
+ IAType[] fieldTypes = { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING,
+ BuiltinType.ASTRING };
+ try {
+ recordType = new ARecordType("TweetType", fieldNames, fieldTypes, false);
+ } catch (Exception e) {
+ throw new IllegalStateException("Unable to create adapter output type");
+ }
+ return recordType;
+ }
@Override
public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception {
- return new PullBasedTwitterAdapter(configuration, ctx);
+ return new PullBasedTwitterAdapter(configuration, recordType, ctx);
}
@Override
@@ -61,16 +74,6 @@
@Override
public void configure(Map<String, String> configuration) throws Exception {
this.configuration = configuration;
- if (recordType != null) {
- String[] fieldNames = { "id", "username", "location", "text", "timestamp" };
- IAType[] fieldTypes = { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING,
- BuiltinType.ASTRING };
- try {
- recordType = new ARecordType("FeedRecordType", fieldNames, fieldTypes, false);
- } catch (Exception e) {
- throw new IllegalStateException("Unable to create adapter output type");
- }
- }
}
@Override
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java
index e9cc21b..838cfeb 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java
@@ -40,9 +40,9 @@
return tweetClient;
}
- public PullBasedTwitterAdapter(Map<String, String> configuration, IHyracksTaskContext ctx) throws AsterixException {
+ public PullBasedTwitterAdapter(Map<String, String> configuration, ARecordType recordType, IHyracksTaskContext ctx) throws AsterixException {
super(configuration, ctx);
- tweetClient = new PullBasedTwitterFeedClient(ctx, this);
+ tweetClient = new PullBasedTwitterFeedClient(ctx, recordType, this);
}
public ARecordType getAdapterOutputType() {
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java
index 75f8fba..2c8d659 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java
@@ -49,11 +49,11 @@
private ARecordType recordType;
private int nextTweetIndex = 0;
- public PullBasedTwitterFeedClient(IHyracksTaskContext ctx, PullBasedTwitterAdapter adapter) {
+ public PullBasedTwitterFeedClient(IHyracksTaskContext ctx, ARecordType recordType, PullBasedTwitterAdapter adapter) {
twitter = new TwitterFactory().getInstance();
mutableFields = new IAObject[] { new AMutableString(null), new AMutableString(null), new AMutableString(null),
new AMutableString(null), new AMutableString(null) };
- recordType = adapter.getAdapterOutputType();
+ this.recordType = recordType;
recordSerDe = new ARecordSerializerDeserializer(recordType);
mutableRecord = new AMutableRecord(recordType, mutableFields);
tupleFieldValues = new String[recordType.getFieldNames().length];
diff --git a/asterix-installer/src/test/java/edu/uci/ics/asterix/installer/test/AsterixExternalLibraryIT.java b/asterix-installer/src/test/java/edu/uci/ics/asterix/installer/test/AsterixExternalLibraryIT.java
index ae21939..a6be981 100644
--- a/asterix-installer/src/test/java/edu/uci/ics/asterix/installer/test/AsterixExternalLibraryIT.java
+++ b/asterix-installer/src/test/java/edu/uci/ics/asterix/installer/test/AsterixExternalLibraryIT.java
@@ -59,9 +59,7 @@
@Test
public void test() throws Exception {
for (TestCaseContext testCaseCtx : testCaseCollection) {
- if (testCaseCtx.getTestCase().getCompilationUnit().get(0).getName().contains("ingest")) {
- TestsUtils.executeTest(PATH_ACTUAL, testCaseCtx, null);
- }
+ TestsUtils.executeTest(PATH_ACTUAL, testCaseCtx, null);
}
}
diff --git a/asterix-installer/src/test/resources/integrationts/library/queries/library-feeds/feed_ingest/feed_ingest.2.update.aql b/asterix-installer/src/test/resources/integrationts/library/queries/library-feeds/feed_ingest/feed_ingest.2.update.aql
index 028ac59..8dfa98d 100644
--- a/asterix-installer/src/test/resources/integrationts/library/queries/library-feeds/feed_ingest/feed_ingest.2.update.aql
+++ b/asterix-installer/src/test/resources/integrationts/library/queries/library-feeds/feed_ingest/feed_ingest.2.update.aql
@@ -9,4 +9,6 @@
*/
use dataverse externallibtest;
+set wait-for-completion-feed "true";
+
connect feed TweetFeed to dataset Tweets;
diff --git a/asterix-installer/src/test/resources/integrationts/library/results/library-metadata/functionDataset/functionDataset.1.adm b/asterix-installer/src/test/resources/integrationts/library/results/library-metadata/functionDataset/functionDataset.1.adm
index bb42e62..4a8369b 100644
--- a/asterix-installer/src/test/resources/integrationts/library/results/library-metadata/functionDataset/functionDataset.1.adm
+++ b/asterix-installer/src/test/resources/integrationts/library/results/library-metadata/functionDataset/functionDataset.1.adm
@@ -6,7 +6,7 @@
", "Language": "JAVA", "Kind": "SCALAR" }
{ "DataverseName": "externallibtest", "Name": "testlib#mysum", "Arity": "2", "Params": [ "AINT32", "AINT32" ], "ReturnType": "AINT32", "Definition": "edu.uci.ics.asterix.external.library.SumFactory
", "Language": "JAVA", "Kind": "SCALAR" }
-{ "DataverseName": "externallibtest", "Name": "testlib#parseTweet", "Arity": "1", "Params": [ "TweetType" ], "ReturnType": "TweetType", "Definition": "edu.uci.ics.asterix.external.library.ParseTweetFactory
+{ "DataverseName": "externallibtest", "Name": "testlib#parseTweet", "Arity": "1", "Params": [ "TweetInputType" ], "ReturnType": "TweetOutputType", "Definition": "edu.uci.ics.asterix.external.library.ParseTweetFactory
", "Language": "JAVA", "Kind": "SCALAR" }
{ "DataverseName": "externallibtest", "Name": "testlib#toUpper", "Arity": "1", "Params": [ "TextType" ], "ReturnType": "TextType", "Definition": "edu.uci.ics.asterix.external.library.UpperCaseFactory
", "Language": "JAVA", "Kind": "SCALAR" }
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedUtil.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedUtil.java
index 3297c2d..e3d5bed 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedUtil.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedUtil.java
@@ -240,8 +240,8 @@
switch (adapterFactory.getAdapterType()) {
case TYPED:
- adapterOutputType = ((ITypedAdapterFactory) adapterFactory).getAdapterOutputType();
((ITypedAdapterFactory) adapterFactory).configure(configuration);
+ adapterOutputType = ((ITypedAdapterFactory) adapterFactory).getAdapterOutputType();
break;
case GENERIC:
String outputTypeName = configuration.get(IGenericAdapterFactory.KEY_TYPE_NAME);