external udf feed tests: checkpoint
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java
index 2a07472..2550be6 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java
@@ -14,9 +14,10 @@
*/
package edu.uci.ics.asterix.external.dataset.adapter;
-import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
-import java.util.Queue;
+import java.util.UUID;
+import java.util.logging.Logger;
import twitter4j.Query;
import twitter4j.QueryResult;
@@ -41,15 +42,17 @@
private String keywords;
private Query query;
- private long id = 0;
private String id_prefix;
private Twitter twitter;
private int requestInterval = 10; // seconds
- private Queue<Tweet> tweetBuffer = new LinkedList<Tweet>();
+ private QueryResult result;
- IAObject[] mutableFields;
- String[] tupleFieldValues;
+ private IAObject[] mutableFields;
+ private String[] tupleFieldValues;
private ARecordType recordType;
+ private int nextTweetIndex = 0;
+
+ private static final Logger LOGGER = Logger.getLogger(PullBasedTwitterFeedClient.class.getName());
public PullBasedTwitterFeedClient(IHyracksTaskContext ctx, PullBasedTwitterAdapter adapter) {
this.id_prefix = ctx.getJobletContext().getApplicationContext().getNodeId();
@@ -59,32 +62,8 @@
recordType = adapter.getAdapterOutputType();
recordSerDe = new ARecordSerializerDeserializer(recordType);
mutableRecord = new AMutableRecord(recordType, mutableFields);
- initialize(adapter.getConfiguration());
tupleFieldValues = new String[recordType.getFieldNames().length];
- }
-
- public void initialize(Map<String, String> params) {
- this.keywords = params.get(PullBasedTwitterAdapter.QUERY);
- this.query = new Query(keywords);
- query.setRpp(100);
- }
-
- private Tweet getNextTweet() throws TwitterException, InterruptedException {
- if (tweetBuffer.isEmpty()) {
- QueryResult result;
- Thread.sleep(1000 * requestInterval);
- result = twitter.search(query);
- tweetBuffer.addAll(result.getTweets());
- }
- return tweetBuffer.remove();
- }
-
- public ARecordType getRecordType() {
- return recordType;
- }
-
- public AMutableRecord getMutableRecord() {
- return mutableRecord;
+ initialize(adapter.getConfiguration());
}
@Override
@@ -95,8 +74,7 @@
return false;
}
int numFields = recordType.getFieldNames().length;
-
- tupleFieldValues[0] = id_prefix + ":" + id;
+ tupleFieldValues[0] = UUID.randomUUID().toString();
tupleFieldValues[1] = tweet.getFromUser();
tupleFieldValues[2] = tweet.getLocation() == null ? "" : tweet.getLocation();
tupleFieldValues[3] = tweet.getText();
@@ -105,7 +83,6 @@
((AMutableString) mutableFields[i]).setValue(tupleFieldValues[i]);
mutableRecord.setValueAtPos(i, mutableFields[i]);
}
- id++;
return true;
}
@@ -114,4 +91,29 @@
// TOOO: implement resetting logic for Twitter
}
+ public ARecordType getRecordType() {
+ return recordType;
+ }
+
+ public AMutableRecord getMutableRecord() {
+ return mutableRecord;
+ }
+
+ private void initialize(Map<String, String> params) {
+ this.keywords = params.get(PullBasedTwitterAdapter.QUERY);
+ this.requestInterval = Integer.parseInt(params.get(PullBasedTwitterAdapter.INTERVAL));
+ this.query = new Query(keywords);
+ query.setRpp(100);
+ }
+
+ private Tweet getNextTweet() throws TwitterException, InterruptedException {
+ if (result == null || nextTweetIndex >= result.getTweets().size()) {
+ Thread.sleep(1000 * requestInterval);
+ result = twitter.search(query);
+ nextTweetIndex = 0;
+ }
+ List<Tweet> tw = result.getTweets();
+ return tw.get(nextTweetIndex++);
+ }
+
}
diff --git a/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/ParseTweetFunction.java b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/ParseTweetFunction.java
index 431b367..de13e59 100644
--- a/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/ParseTweetFunction.java
+++ b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/ParseTweetFunction.java
@@ -14,9 +14,6 @@
*/
package edu.uci.ics.asterix.external.library;
-import edu.uci.ics.asterix.external.library.IExternalScalarFunction;
-import edu.uci.ics.asterix.external.library.IFunctionHelper;
-import edu.uci.ics.asterix.external.library.java.JObjects.JInt;
import edu.uci.ics.asterix.external.library.java.JObjects.JOrderedList;
import edu.uci.ics.asterix.external.library.java.JObjects.JRecord;
import edu.uci.ics.asterix.external.library.java.JObjects.JString;
@@ -24,38 +21,40 @@
public class ParseTweetFunction implements IExternalScalarFunction {
- private JOrderedList list = null;
+ private JOrderedList list = null;
- @Override
- public void initialize(IFunctionHelper functionHelper) {
- list = new JOrderedList(functionHelper.getObject(JTypeTag.STRING));
- }
+ @Override
+ public void initialize(IFunctionHelper functionHelper) {
+ list = new JOrderedList(functionHelper.getObject(JTypeTag.STRING));
+ }
- @Override
- public void deinitialize() {
- }
+ @Override
+ public void deinitialize() {
+ }
- @Override
- public void evaluate(IFunctionHelper functionHelper) throws Exception {
- list.clear();
- JRecord inputRecord = (JRecord) functionHelper.getArgument(0);
- JInt id = (JInt) inputRecord.getValueByName("id");
- JString text = (JString) inputRecord.getValueByName("text");
+ @Override
+ public void evaluate(IFunctionHelper functionHelper) throws Exception {
+ list.clear();
+ JRecord inputRecord = (JRecord) functionHelper.getArgument(0);
+ JString id = (JString) inputRecord.getValueByName("id");
+ JString text = (JString) inputRecord.getValueByName("text");
- String[] tokens = text.getValue().split(" ");
- for (String tk : tokens) {
- if (tk.startsWith("#")) {
- JString newField = (JString) functionHelper
- .getObject(JTypeTag.STRING);
- newField.setValue(tk);
- list.add(newField);
- }
- }
- JRecord result = (JRecord) functionHelper.getResultObject();
- result.setField("id", id);
- result.setField("text", text);
- result.setField("topics", list);
- functionHelper.setResult(result);
- }
+ String[] tokens = text.getValue().split(" ");
+ for (String tk : tokens) {
+ if (tk.startsWith("#")) {
+ JString newField = (JString) functionHelper.getObject(JTypeTag.STRING);
+ newField.setValue(tk);
+ list.add(newField);
+ }
+ }
+ JRecord result = (JRecord) functionHelper.getResultObject();
+ result.setField("id", id);
+ result.setField("username", inputRecord.getValueByName("username"));
+ result.setField("location", inputRecord.getValueByName("location"));
+ result.setField("text", text);
+ result.setField("timestamp", inputRecord.getValueByName("timestamp"));
+ result.setField("topics", list);
+ functionHelper.setResult(result);
+ }
}