TweetParser Extension
This patch includes following changes:
1. ExtendedTweetParser to parse more than fix attributes.
2. Changed the twitter feeds message unit from Status to String.
Change-Id: I7021e7b779de05b9ec999a8d5f8464fb0ab413c0
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1002
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/revised-tweet-parser/revised-tweet-parser.1.ddl.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/revised-tweet-parser/revised-tweet-parser.1.ddl.aql
new file mode 100644
index 0000000..ddffc43
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/revised-tweet-parser/revised-tweet-parser.1.ddl.aql
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : This test query will check the revised
+ * parser from two perspective: open Tweet
+ * will automatically includes all fields
+ * from tweet status; closed TwitterUser
+ * type will only includes specified fields.
+ * consumer.secret is missing here to create
+ * exception.
+ * Expected Res : Failure
+ */
+
+drop dataverse feeds if exists;
+create dataverse feeds;
+use dataverse feeds;
+
+create type TwitterUser as closed{
+ screen_name: string,
+ lang: string,
+ friends_count: int32,
+ statuses_count: int32
+ };
+
+create type Tweet as open
+{
+ id: int64,
+ user: TwitterUser
+}
+
+create dataset Tweets (Tweet)
+primary key id;
+
+create feed TwitterFeed using push_twitter(
+("type-name"="Tweet"),
+("format"="twitter-status"),//
+("consumer.key"="************"),
+("access.token"="************"),
+("access.token.secret"="************"));
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/revised-tweet-parser/revised-tweet-parser.2.update.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/revised-tweet-parser/revised-tweet-parser.2.update.aql
new file mode 100644
index 0000000..66ede3f
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/revised-tweet-parser/revised-tweet-parser.2.update.aql
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+use dataverse feeds;
+connect feed TwitterFeed to dataset Tweets;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
index 6b63ff0..e5710bc 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
@@ -150,6 +150,13 @@
</compilation-unit>
</test-case>
<test-case FilePath="feeds">
+ <compilation-unit name="revised-tweet-parser">
+ <output-dir compare="Text">revised-tweet-parser</output-dir>
+ <expected-error>One or more parameters are missing from adapter configuration</expected-error>
+ <expected-error>Unknown source feed</expected-error>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="feeds">
<compilation-unit name="feed-with-external-parser">
<output-dir compare="Text">feed-with-external-parser</output-dir>
</compilation-unit>
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPullRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPullRecordReader.java
index e31325a..5a7b4b9 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPullRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPullRecordReader.java
@@ -27,14 +27,14 @@
import org.apache.asterix.external.input.record.GenericRecord;
import org.apache.asterix.external.util.FeedLogManager;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-
import twitter4j.Query;
import twitter4j.QueryResult;
import twitter4j.Status;
import twitter4j.Twitter;
import twitter4j.TwitterException;
+import twitter4j.TwitterObjectFactory;
-public class TwitterPullRecordReader implements IRecordReader<Status> {
+public class TwitterPullRecordReader implements IRecordReader<String> {
private Query query;
private Twitter twitter;
@@ -42,18 +42,19 @@
private QueryResult result;
private int nextTweetIndex = 0;
private long lastTweetIdReceived = 0;
- private GenericRecord<Status> record;
+ private GenericRecord<String> record;
public TwitterPullRecordReader(Twitter twitter, String keywords, int requestInterval) {
this.twitter = twitter;
this.requestInterval = requestInterval;
this.query = new Query(keywords);
this.query.setCount(100);
- this.record = new GenericRecord<Status>();
+ this.record = new GenericRecord<>();
}
@Override
public void close() throws IOException {
+ // do nothing
}
@Override
@@ -62,7 +63,7 @@
}
@Override
- public IRawRecord<Status> next() throws IOException, InterruptedException {
+ public IRawRecord<String> next() throws IOException, InterruptedException {
if (result == null || nextTweetIndex >= result.getTweets().size()) {
Thread.sleep(1000 * requestInterval);
query.setSinceId(lastTweetIdReceived);
@@ -79,7 +80,8 @@
if (lastTweetIdReceived < tweet.getId()) {
lastTweetIdReceived = tweet.getId();
}
- record.set(tweet);
+ String jsonTweet = TwitterObjectFactory.getRawJSON(tweet); // transform tweet obj to json
+ record.set(jsonTweet);
return record;
} else {
return null;
@@ -93,10 +95,12 @@
@Override
public void setFeedLogManager(FeedLogManager feedLogManager) {
+ // do nothing
}
@Override
public void setController(AbstractFeedDataFlowController controller) {
+ // do nothing
}
@Override
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java
index f04cdb9..9ead8a9 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java
@@ -26,31 +26,31 @@
import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
import org.apache.asterix.external.input.record.GenericRecord;
import org.apache.asterix.external.util.FeedLogManager;
-
import twitter4j.FilterQuery;
import twitter4j.StallWarning;
import twitter4j.Status;
import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener;
+import twitter4j.TwitterObjectFactory;
import twitter4j.TwitterStream;
-public class TwitterPushRecordReader implements IRecordReader<Status> {
- private LinkedBlockingQueue<Status> inputQ;
+public class TwitterPushRecordReader implements IRecordReader<String> {
+ private LinkedBlockingQueue<String> inputQ;
private TwitterStream twitterStream;
- private GenericRecord<Status> record;
+ private GenericRecord<String> record;
private boolean closed = false;
public TwitterPushRecordReader(TwitterStream twitterStream, FilterQuery query) {
- record = new GenericRecord<Status>();
- inputQ = new LinkedBlockingQueue<Status>();
+ record = new GenericRecord<>();
+ inputQ = new LinkedBlockingQueue<>();
this.twitterStream = twitterStream;//TwitterUtil.getTwitterStream(configuration);
this.twitterStream.addListener(new TweetListener(inputQ));
this.twitterStream.filter(query);
}
public TwitterPushRecordReader(TwitterStream twitterStream) {
- record = new GenericRecord<Status>();
- inputQ = new LinkedBlockingQueue<Status>();
+ record = new GenericRecord<>();
+ inputQ = new LinkedBlockingQueue<>();
this.twitterStream = twitterStream;//
this.twitterStream.addListener(new TweetListener(inputQ));
twitterStream.sample();
@@ -72,8 +72,8 @@
}
@Override
- public IRawRecord<Status> next() throws IOException, InterruptedException {
- Status tweet = inputQ.poll();
+ public IRawRecord<String> next() throws IOException, InterruptedException {
+ String tweet = inputQ.poll();
if (tweet == null) {
return null;
}
@@ -93,45 +93,52 @@
private class TweetListener implements StatusListener {
- private LinkedBlockingQueue<Status> inputQ;
+ private LinkedBlockingQueue<String> inputQ;
- public TweetListener(LinkedBlockingQueue<Status> inputQ) {
+ public TweetListener(LinkedBlockingQueue<String> inputQ) {
this.inputQ = inputQ;
}
@Override
public void onStatus(Status tweet) {
- inputQ.add(tweet);
+ String jsonTweet = TwitterObjectFactory.getRawJSON(tweet);
+ inputQ.add(jsonTweet);
}
@Override
public void onException(Exception arg0) {
-
+ // do nothing
}
@Override
public void onDeletionNotice(StatusDeletionNotice arg0) {
+ // do nothing
}
@Override
public void onScrubGeo(long arg0, long arg1) {
+ // do nothing
}
@Override
public void onStallWarning(StallWarning arg0) {
+ // do nothing
}
@Override
public void onTrackLimitationNotice(int arg0) {
+ // do nothing
}
}
@Override
public void setFeedLogManager(FeedLogManager feedLogManager) {
+ // do nothing
}
@Override
public void setController(AbstractFeedDataFlowController controller) {
+ // do nothing
}
@Override
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
index 541737a..172b22b 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
@@ -37,7 +37,7 @@
import twitter4j.FilterQuery;
import twitter4j.Status;
-public class TwitterRecordReaderFactory implements IRecordReaderFactory<Status> {
+public class TwitterRecordReaderFactory implements IRecordReaderFactory<String> {
private static final long serialVersionUID = 1L;
private static final Logger LOGGER = Logger.getLogger(TwitterRecordReaderFactory.class.getName());
@@ -114,7 +114,7 @@
}
@Override
- public IRecordReader<? extends Status> createRecordReader(IHyracksTaskContext ctx, int partition)
+ public IRecordReader<? extends String> createRecordReader(IHyracksTaskContext ctx, int partition)
throws HyracksDataException {
if (pull) {
return new TwitterPullRecordReader(TwitterUtil.getTwitterService(configuration),
@@ -133,8 +133,8 @@
}
@Override
- public Class<? extends Status> getRecordClass() {
- return Status.class;
+ public Class<? extends String> getRecordClass() {
+ return String.class;
}
private boolean validateConfiguration(Map<String, String> configuration) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectAccessors.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectAccessors.java
index 5923354..ab908bf 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectAccessors.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectAccessors.java
@@ -18,12 +18,6 @@
*/
package org.apache.asterix.external.library.java;
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.util.LinkedHashMap;
-import java.util.List;
-
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.dataflow.data.nontagged.serde.ABooleanSerializerDeserializer;
import org.apache.asterix.dataflow.data.nontagged.serde.ACircleSerializerDeserializer;
@@ -83,14 +77,20 @@
import org.apache.asterix.om.pointables.base.IVisitablePointable;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.ATypeTag;
-import org.apache.asterix.om.types.AbstractCollectionType;
import org.apache.asterix.om.types.BuiltinType;
import org.apache.asterix.om.types.EnumDeserializer;
import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.types.TypeTagUtil;
import org.apache.asterix.om.util.container.IObjectPool;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.util.string.UTF8StringReader;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.LinkedHashMap;
+import java.util.List;
+
public class JObjectAccessors {
public static IJObjectAccessor createFlatJObjectAccessor(ATypeTag aTypeTag) {
@@ -465,15 +465,18 @@
List<IVisitablePointable> fieldTypeTags = recordPointable.getFieldTypeTags();
List<IVisitablePointable> fieldNames = recordPointable.getFieldNames();
int index = 0;
- boolean closedPart = true;
+ boolean closedPart;
try {
IJObject fieldObject = null;
for (IVisitablePointable fieldPointable : fieldPointables) {
closedPart = index < recordType.getFieldTypes().length;
IVisitablePointable tt = fieldTypeTags.get(index);
- IAType fieldType = closedPart ? recordType.getFieldTypes()[index] : null;
ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER
.deserialize(tt.getByteArray()[tt.getStartOffset()]);
+ IAType fieldType;
+ fieldType = closedPart ?
+ recordType.getFieldTypes()[index] :
+ TypeTagUtil.getBuiltinTypeByTag(typeTag);
IVisitablePointable fieldName = fieldNames.get(index);
typeInfo.reset(fieldType, typeTag);
switch (typeTag) {
@@ -486,8 +489,8 @@
// value is null
fieldObject = null;
} else {
- fieldObject = pointableVisitor.visit((AListVisitablePointable) fieldPointable,
- typeInfo);
+ fieldObject = pointableVisitor
+ .visit((AListVisitablePointable) fieldPointable, typeInfo);
}
break;
case ANY:
@@ -536,15 +539,16 @@
List<IVisitablePointable> items = pointable.getItems();
List<IVisitablePointable> itemTags = pointable.getItemTags();
JList list = pointable.ordered() ? new JOrderedList(listType) : new JUnorderedList(listType);
- IJObject listItem = null;
+ IJObject listItem;
int index = 0;
try {
-
for (IVisitablePointable itemPointable : items) {
IVisitablePointable itemTagPointable = itemTags.get(index);
ATypeTag itemTypeTag = EnumDeserializer.ATYPETAGDESERIALIZER
.deserialize(itemTagPointable.getByteArray()[itemTagPointable.getStartOffset()]);
- typeInfo.reset(listType.getType(), listType.getTypeTag());
+ IAType fieldType;
+ fieldType = TypeTagUtil.getBuiltinTypeByTag(itemTypeTag);
+ typeInfo.reset(fieldType, itemTypeTag);
switch (itemTypeTag) {
case RECORD:
listItem = pointableVisitor.visit((ARecordVisitablePointable) itemPointable, typeInfo);
@@ -557,10 +561,7 @@
throw new IllegalArgumentException(
"Cannot parse list item of type " + listType.getTypeTag());
default:
- IAType itemType = ((AbstractCollectionType) listType).getItemType();
- typeInfo.reset(itemType, itemType.getTypeTag());
listItem = pointableVisitor.visit((AFlatValuePointable) itemPointable, typeInfo);
-
}
list.add(listItem);
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/TweetParser.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/TweetParser.java
index 522da06..8d483dc 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/TweetParser.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/TweetParser.java
@@ -18,111 +18,236 @@
*/
package org.apache.asterix.external.parser;
-import java.io.DataOutput;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.asterix.builders.RecordBuilder;
-import org.apache.asterix.external.api.IDataParser;
+import org.apache.asterix.builders.AbvsBuilderFactory;
+import org.apache.asterix.builders.IARecordBuilder;
+import org.apache.asterix.builders.IAsterixListBuilder;
+import org.apache.asterix.builders.ListBuilderFactory;
+import org.apache.asterix.builders.RecordBuilderFactory;
+import org.apache.asterix.builders.UnorderedListBuilder;
import org.apache.asterix.external.api.IRawRecord;
import org.apache.asterix.external.api.IRecordDataParser;
-import org.apache.asterix.external.library.java.JObjectUtil;
-import org.apache.asterix.external.util.Datatypes.Tweet;
-import org.apache.asterix.om.base.AMutableDouble;
-import org.apache.asterix.om.base.AMutableInt32;
-import org.apache.asterix.om.base.AMutableRecord;
-import org.apache.asterix.om.base.AMutableString;
-import org.apache.asterix.om.base.IAObject;
+import org.apache.asterix.om.base.AMutablePoint;
+import org.apache.asterix.om.base.ANull;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.BuiltinType;
import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.util.container.IObjectPool;
+import org.apache.asterix.om.util.container.ListObjectPool;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IMutableValueStorage;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.util.string.UTF8StringWriter;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
-import twitter4j.Status;
-import twitter4j.User;
+import java.io.DataOutput;
+import java.io.IOException;
-public class TweetParser implements IRecordDataParser<Status> {
-
- private IAObject[] mutableTweetFields;
- private IAObject[] mutableUserFields;
- private AMutableRecord mutableRecord;
- private AMutableRecord mutableUser;
- private final Map<String, Integer> userFieldNameMap = new HashMap<>();
- private final Map<String, Integer> tweetFieldNameMap = new HashMap<>();
- private RecordBuilder recordBuilder = new RecordBuilder();
+public class TweetParser extends AbstractDataParser implements IRecordDataParser<String> {
+ private final IObjectPool<IARecordBuilder, ATypeTag> recordBuilderPool = new ListObjectPool<>(
+ new RecordBuilderFactory());
+ private final IObjectPool<IAsterixListBuilder, ATypeTag> listBuilderPool = new ListObjectPool<>(
+ new ListBuilderFactory());
+ private final IObjectPool<IMutableValueStorage, ATypeTag> abvsBuilderPool = new ListObjectPool<>(
+ new AbvsBuilderFactory());
+ private ARecordType recordType;
+ private UTF8StringWriter utf8Writer = new UTF8StringWriter();
public TweetParser(ARecordType recordType) {
- initFieldNames(recordType);
- mutableUserFields = new IAObject[] { new AMutableString(null), new AMutableString(null), new AMutableInt32(0),
- new AMutableInt32(0), new AMutableString(null), new AMutableInt32(0) };
- mutableUser = new AMutableRecord((ARecordType) recordType.getFieldTypes()[tweetFieldNameMap.get(Tweet.USER)],
- mutableUserFields);
-
- mutableTweetFields = new IAObject[] { new AMutableString(null), mutableUser, new AMutableDouble(0),
- new AMutableDouble(0), new AMutableString(null), new AMutableString(null) };
- mutableRecord = new AMutableRecord(recordType, mutableTweetFields);
+ this.recordType = recordType;
+ aPoint = new AMutablePoint(0, 0);
}
- // Initialize the hashmap values for the field names and positions
- private void initFieldNames(ARecordType recordType) {
- String tweetFields[] = recordType.getFieldNames();
- for (int i = 0; i < tweetFields.length; i++) {
- tweetFieldNameMap.put(tweetFields[i], i);
- if (tweetFields[i].equals(Tweet.USER)) {
- IAType fieldType = recordType.getFieldTypes()[i];
- if (fieldType.getTypeTag() == ATypeTag.RECORD) {
- String userFields[] = ((ARecordType) fieldType).getFieldNames();
- for (int j = 0; j < userFields.length; j++) {
- userFieldNameMap.put(userFields[j], j);
- }
- }
+ private void parseUnorderedList(JSONArray jArray, DataOutput output) throws IOException, JSONException {
+ ArrayBackedValueStorage itemBuffer = getTempBuffer();
+ UnorderedListBuilder unorderedListBuilder = (UnorderedListBuilder) getUnorderedListBuilder();
+ unorderedListBuilder.reset(null);
+ for (int iter1 = 0; iter1 < jArray.length(); iter1++) {
+ itemBuffer.reset();
+ if (writeField(jArray.get(iter1), null, itemBuffer.getDataOutput())) {
+ unorderedListBuilder.addItem(itemBuffer);
}
}
+ unorderedListBuilder.write(output, true);
+ }
+
+ private boolean writeField(Object fieldObj, IAType fieldType, DataOutput out) throws IOException, JSONException {
+ boolean writeResult = true;
+ if (fieldType != null) {
+ switch (fieldType.getTypeTag()) {
+ case STRING:
+ out.write(BuiltinType.ASTRING.getTypeTag().serialize());
+ utf8Writer.writeUTF8(fieldObj.toString(), out);
+ break;
+ case INT64:
+ aInt64.setValue((long) fieldObj);
+ int64Serde.serialize(aInt64, out);
+ break;
+ case INT32:
+ out.write(BuiltinType.AINT32.getTypeTag().serialize());
+ out.writeInt((Integer) fieldObj);
+ break;
+ case DOUBLE:
+ out.write(BuiltinType.ADOUBLE.getTypeTag().serialize());
+ out.writeDouble((Double) fieldObj);
+ break;
+ case BOOLEAN:
+ out.write(BuiltinType.ABOOLEAN.getTypeTag().serialize());
+ out.writeBoolean((Boolean) fieldObj);
+ break;
+ case RECORD:
+ writeRecord((JSONObject) fieldObj, out, (ARecordType) fieldType);
+ break;
+ default:
+ writeResult = false;
+ }
+ } else {
+ if (fieldObj == JSONObject.NULL) {
+ nullSerde.serialize(ANull.NULL, out);
+ } else if (fieldObj instanceof Integer) {
+ out.write(BuiltinType.AINT32.getTypeTag().serialize());
+ out.writeInt((Integer) fieldObj);
+ } else if (fieldObj instanceof Boolean) {
+ out.write(BuiltinType.ABOOLEAN.getTypeTag().serialize());
+ out.writeBoolean((Boolean) fieldObj);
+ } else if (fieldObj instanceof Double) {
+ out.write(BuiltinType.ADOUBLE.getTypeTag().serialize());
+ out.writeDouble((Double) fieldObj);
+ } else if (fieldObj instanceof Long) {
+ out.write(BuiltinType.AINT64.getTypeTag().serialize());
+ out.writeLong((Long) fieldObj);
+ } else if (fieldObj instanceof String) {
+ out.write(BuiltinType.ASTRING.getTypeTag().serialize());
+ utf8Writer.writeUTF8((String) fieldObj, out);
+ } else if (fieldObj instanceof JSONArray) {
+ if (((JSONArray) fieldObj).length() != 0) {
+ parseUnorderedList((JSONArray) fieldObj, out);
+ } else {
+ writeResult = false;
+ }
+ } else if (fieldObj instanceof JSONObject) {
+ if (((JSONObject) fieldObj).length() != 0) {
+ writeRecord((JSONObject) fieldObj, out, null);
+ } else {
+ writeResult = false;
+ }
+ }
+ }
+ return writeResult;
+ }
+
+ private int checkAttrNameIdx(String[] nameList, String name) {
+ int idx = 0;
+ if (nameList != null) {
+ for (String nln : nameList) {
+ if (name.equals(nln)) {
+ return idx;
+ }
+ idx++;
+ }
+ }
+ return -1;
+ }
+
+ public void writeRecord(JSONObject obj, DataOutput out, ARecordType curRecType) throws IOException, JSONException {
+ IAType[] curTypes = null;
+ String[] curFNames = null;
+ int fieldN;
+ int attrIdx;
+
+ ArrayBackedValueStorage fieldValueBuffer = getTempBuffer();
+ ArrayBackedValueStorage fieldNameBuffer = getTempBuffer();
+ IARecordBuilder recBuilder = getRecordBuilder();
+
+ if (curRecType != null) {
+ curTypes = curRecType.getFieldTypes();
+ curFNames = curRecType.getFieldNames();
+ }
+
+ recBuilder.reset(curRecType);
+ recBuilder.init();
+
+ if (curRecType != null && !curRecType.isOpen()) {
+ // closed record type
+ fieldN = curFNames.length;
+ for (int iter1 = 0; iter1 < fieldN; iter1++) {
+ fieldValueBuffer.reset();
+ DataOutput fieldOutput = fieldValueBuffer.getDataOutput();
+ if (obj.isNull(curFNames[iter1])) {
+ if (curRecType.isClosedField(curFNames[iter1])) {
+ throw new HyracksDataException("Closed field " + curFNames[iter1] + " has null value.");
+ } else {
+ continue;
+ }
+ } else {
+ if (writeField(obj.get(curFNames[iter1]), curTypes[iter1], fieldOutput)) {
+ recBuilder.addField(iter1, fieldValueBuffer);
+ }
+ }
+ }
+ } else {
+ //open record type
+ int closedFieldCount = 0;
+ IAType curFieldType = null;
+ for (String attrName : JSONObject.getNames(obj)) {
+ if (obj.isNull(attrName) || obj.length() == 0) {
+ continue;
+ }
+ attrIdx = checkAttrNameIdx(curFNames, attrName);
+ if (curRecType != null) {
+ curFieldType = curRecType.getFieldType(attrName);
+ }
+ fieldValueBuffer.reset();
+ fieldNameBuffer.reset();
+ DataOutput fieldOutput = fieldValueBuffer.getDataOutput();
+ if (writeField(obj.get(attrName), curFieldType, fieldOutput)) {
+ if (attrIdx == -1) {
+ aString.setValue(attrName);
+ stringSerde.serialize(aString, fieldNameBuffer.getDataOutput());
+ recBuilder.addField(fieldNameBuffer, fieldValueBuffer);
+ } else {
+ recBuilder.addField(attrIdx, fieldValueBuffer);
+ closedFieldCount++;
+ }
+ }
+ }
+ if (curRecType != null && closedFieldCount < curFNames.length) {
+ throw new HyracksDataException("Non-null field is null");
+ }
+ }
+ recBuilder.write(out, true);
+ }
+
+ private IARecordBuilder getRecordBuilder() {
+ return recordBuilderPool.allocate(ATypeTag.RECORD);
+ }
+
+ private IAsterixListBuilder getUnorderedListBuilder() {
+ return listBuilderPool.allocate(ATypeTag.UNORDEREDLIST);
+ }
+
+ private ArrayBackedValueStorage getTempBuffer() {
+ return (ArrayBackedValueStorage) abvsBuilderPool.allocate(ATypeTag.BINARY);
}
@Override
- public void parse(IRawRecord<? extends Status> record, DataOutput out) throws HyracksDataException {
- Status tweet = record.get();
- User user = tweet.getUser();
- // Tweet user data
- ((AMutableString) mutableUserFields[userFieldNameMap.get(Tweet.SCREEN_NAME)])
- .setValue(JObjectUtil.getNormalizedString(user.getScreenName()));
- ((AMutableString) mutableUserFields[userFieldNameMap.get(Tweet.LANGUAGE)])
- .setValue(JObjectUtil.getNormalizedString(user.getLang()));
- ((AMutableInt32) mutableUserFields[userFieldNameMap.get(Tweet.FRIENDS_COUNT)]).setValue(user.getFriendsCount());
- ((AMutableInt32) mutableUserFields[userFieldNameMap.get(Tweet.STATUS_COUNT)]).setValue(user.getStatusesCount());
- ((AMutableString) mutableUserFields[userFieldNameMap.get(Tweet.NAME)])
- .setValue(JObjectUtil.getNormalizedString(user.getName()));
- ((AMutableInt32) mutableUserFields[userFieldNameMap.get(Tweet.FOLLOWERS_COUNT)])
- .setValue(user.getFollowersCount());
-
- // Tweet data
- ((AMutableString) mutableTweetFields[tweetFieldNameMap.get(Tweet.ID)]).setValue(String.valueOf(tweet.getId()));
-
- int userPos = tweetFieldNameMap.get(Tweet.USER);
- for (int i = 0; i < mutableUserFields.length; i++) {
- ((AMutableRecord) mutableTweetFields[userPos]).setValueAtPos(i, mutableUserFields[i]);
+ public void parse(IRawRecord<? extends String> record, DataOutput out) throws HyracksDataException {
+ try {
+ //TODO get rid of this temporary json
+ resetPools();
+ JSONObject jsObj = new JSONObject(record.get());
+ writeRecord(jsObj, out, recordType);
+ } catch (JSONException | IOException e) {
+ throw new HyracksDataException(e);
}
- if (tweet.getGeoLocation() != null) {
- ((AMutableDouble) mutableTweetFields[tweetFieldNameMap.get(Tweet.LATITUDE)])
- .setValue(tweet.getGeoLocation().getLatitude());
- ((AMutableDouble) mutableTweetFields[tweetFieldNameMap.get(Tweet.LONGITUDE)])
- .setValue(tweet.getGeoLocation().getLongitude());
- } else {
- ((AMutableDouble) mutableTweetFields[tweetFieldNameMap.get(Tweet.LATITUDE)]).setValue(0);
- ((AMutableDouble) mutableTweetFields[tweetFieldNameMap.get(Tweet.LONGITUDE)]).setValue(0);
- }
- ((AMutableString) mutableTweetFields[tweetFieldNameMap.get(Tweet.CREATED_AT)])
- .setValue(JObjectUtil.getNormalizedString(tweet.getCreatedAt().toString()));
- ((AMutableString) mutableTweetFields[tweetFieldNameMap.get(Tweet.MESSAGE)])
- .setValue(JObjectUtil.getNormalizedString(tweet.getText()));
+ }
- for (int i = 0; i < mutableTweetFields.length; i++) {
- mutableRecord.setValueAtPos(i, mutableTweetFields[i]);
- }
- recordBuilder.reset(mutableRecord.getType());
- recordBuilder.init();
- IDataParser.writeRecord(mutableRecord, out, recordBuilder);
+ private void resetPools() {
+ listBuilderPool.reset();
+ recordBuilderPool.reset();
+ abvsBuilderPool.reset();
}
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/TweetParserFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/TweetParserFactory.java
index d6e536d..3539f6e 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/TweetParserFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/TweetParserFactory.java
@@ -28,7 +28,7 @@
import twitter4j.Status;
-public class TweetParserFactory implements IRecordDataParserFactory<Status> {
+public class TweetParserFactory implements IRecordDataParserFactory<String> {
private static final long serialVersionUID = 1L;
private ARecordType recordType;
@@ -44,18 +44,19 @@
}
@Override
- public IRecordDataParser<Status> createRecordParser(IHyracksTaskContext ctx) {
+ public IRecordDataParser<String> createRecordParser(IHyracksTaskContext ctx) {
TweetParser dataParser = new TweetParser(recordType);
return dataParser;
}
@Override
- public Class<? extends Status> getRecordClass() {
- return Status.class;
+ public Class<? extends String> getRecordClass() {
+ return String.class;
}
@Override
public void setMetaType(ARecordType metaType) {
+ // do nothing
}
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/Datatypes.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/Datatypes.java
index e1a7911..94d7b53 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/Datatypes.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/Datatypes.java
@@ -18,6 +18,8 @@
*/
package org.apache.asterix.external.util;
+import org.apache.asterix.external.parser.TweetParser;
+
public class Datatypes {
/*
@@ -42,16 +44,37 @@
public static class Tweet {
public static final String ID = "id";
public static final String USER = "user";
+ public static final String GEOLOCATION = "geo";
+ public static final String CREATED_AT = "created_at";
+ public static final String TEXT = "text";
+ public static final String COUNTRY = "country";
+ public static final String PLACE = "place";
+ public static final String SOURCE = "source";
+ public static final String TRUNCATED = "truncated";
+ public static final String IN_REPLY_TO_STATUS_ID = "in_reply_to_status_id";
+ public static final String IN_REPLY_TO_USER_ID = "in_reply_to_user_id";
+ public static final String IN_REPLY_TO_SCREENNAME = "in_reply_to_screen_name";
+ public static final String FAVORITED = "favorited";
+ public static final String RETWEETED = "retweeted";
+ public static final String FAVORITE_COUNT = "favorite_count";
+ public static final String RETWEET_COUNT = "retweet_count";
+ public static final String CONTRIBUTORS = "contributors";
+ public static final String LANGUAGE = "lang";
+ public static final String FILTER_LEVEL = "filter_level";
+ public static final String TIMESTAMP_MS = "timestamp_ms";
+ public static final String IS_QUOTE_STATUS = "is_quote_status";
+ // in API but not int JSON
+ public static final String SENSITIVE = "sensitive";
+ public static final String RETWEETED_BY_ME = "retweeted_by_me";
+ public static final String CURRENT_USER_RETWEET_ID = "current_user_retweet_id";
+
+ // consistency consider
+ public static final String MESSAGE = "message_text";
public static final String LATITUDE = "latitude";
public static final String LONGITUDE = "longitude";
- public static final String CREATED_AT = "created_at";
- public static final String MESSAGE = "message_text";
-
- public static final String COUNTRY = "country";
-
// User fields (for the sub record "user")
public static final String SCREEN_NAME = "screen_name";
- public static final String LANGUAGE = "language";
+ public static final String USER_PREFERRED_LANGUAGE = "user_preferred_language";
public static final String FRIENDS_COUNT = "friends_count";
public static final String STATUS_COUNT = "status_count";
public static final String NAME = "name";
@@ -59,6 +82,73 @@
}
+ public static final class Tweet_Place {
+ public static final String ID = "id";
+ public static final String URL = "url";
+ public static final String PLACE_TYPE = "place_type";
+ public static final String NAME = "name";
+ public static final String FULL_NAME = "full_name";
+ public static final String COUNTRY_CODE = "country_code";
+ public static final String COUNTRY = "country";
+ public static final String BOUNDING_BOX = "bounding_box";
+ public static final String ATTRIBUTES = "attributes";
+
+ private Tweet_Place() {
+ }
+ }
+
+ public static final class Tweet_User {
+ public static final String ID = "id";
+ public static final String NAME = "name";
+ public static final String SCREEN_NAME = "screen_name";
+ public static final String LOCATION = "location";
+ public static final String DESCRIPTION = "description";
+ public static final String CONTRIBUTORS_ENABLED = "contributors_enabled";
+ public static final String PROFILE_IMAGE_URL = "profile_image_url";
+ public static final String PROFILE_IMAGE_URL_HTTPS = "profile_image_url_https";
+ public static final String URL = "url";
+ public static final String PROTECTED = "protected";
+ public static final String FOLLOWERS_COUNT = "followers_count";
+ public static final String PROFILE_BACKGROUND_COLOR = "profile_background_color";
+ public static final String PROFILE_TEXT_COLOR = "profile_text_color";
+ public static final String PROFILE_LINK_COLOR = "profile_link_color";
+ public static final String PROFILE_SIDEBAR_FILL_COLOR = "profile_sidebar_fill_color";
+ public static final String PROFILE_SIDEBAR_BORDER_COLOR = "profile_sidebar_border_color";
+ public static final String PROFILE_USE_BACKGROUND_IMAGE = "profile_use_background_image";
+ public static final String DEFAULT_PROFILE = "default_profile";
+ public static final String DEFAULT_PROFILE_IMAGE = "default_profile_image";
+ public static final String FRIENDS_COUNT = "friends_count";
+ public static final String CREATED_AT = "CREATED_AT";
+ public static final String FAVOURITES_COUNT = "favourites_count";
+ public static final String UTC_OFFSET = "utc_offset";
+ public static final String TIME_ZONE = "time_zone";
+ public static final String PROFILE_BACKGROUND_IMAGE_URL = "profile_background_image_url";
+ public static final String PROFILE_BACKGROUND_IMAGE_URL_HTTPS = "profile_background_image_url_https";
+ public static final String PROFILE_BANNER_URL = "profile_banner_url";
+ public static final String LANG = "lang";
+ public static final String STATUSES_COUNT = "statuses_count";
+ public static final String GEO_ENABLED = "geo_enabled";
+ public static final String VERIFIED = "verified";
+ public static final String IS_TRANSLATOR = "is_translator";
+ public static final String LISTED_COUNT = "listed_count";
+ public static final String FOLLOW_REQUEST_SENT = "follow_request_sent";
+ // skip Entities, attrs in API but not in JSON is as below
+ public static final String WITHHELD_IN_COUNTRIES = "withheld_in_countries";
+ public static final String BIGGER_PROFILE_IMAGE_URL = "bigger_profile_image_url";
+ public static final String MINI_PROFILE_IMAGE_URL = "mini_profile_image_url";
+ public static final String ORIGINAL_PROFILE_IMAGE_URL = "original_profile_image_url";
+ public static final String SHOW_ALL_INLINE_MEDIA = "show_all_inline_media";
+ public static final String PROFILE_BANNER_RETINA_URL = "profile_banner_retina_url";
+ public static final String PROFILE_BANNER_IPAD_URL = "profile_banner_ipad_url";
+ public static final String PROFILE_BANNER_IPAD_RETINA_URL = "profile_banner_ipad_retina_url";
+ public static final String PROFILE_BANNER_MOBILE_URL = "profile_banner_mobile_url";
+ public static final String PROFILE_BANNER_MOBILE_RETINA_URL = "profile_banner_mobile_retina_url";
+ public static final String PROFILE_BACKGROUND_TILED = "profile_background_tiled";
+
+ private Tweet_User() {
+ }
+ }
+
/*
The following assumes this DDL (but ignoring the field name orders):
create type ProcessedTweet if not exists as open {
@@ -76,5 +166,4 @@
public static final String LOCATION = "location";
public static final String TOPICS = "topics";
}
-
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index e251f32..b666487 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -170,7 +170,7 @@
public static final String ALIAS_SOCKET_ADAPTER = "socket_adapter";
public static final String ALIAS_TWITTER_FIREHOSE_ADAPTER = "twitter_firehose";
public static final String ALIAS_SOCKET_CLIENT_ADAPTER = "socket_client";
- public static final String ALIAS_RSS_ADAPTER = "rss_feed";
+ public static final String ALIAS_RSS_ADAPTER = "rss";
public static final String ALIAS_FILE_FEED_ADAPTER = "file_feed";
public static final String ALIAS_TWITTER_PUSH_ADAPTER = "push_twitter";
public static final String ALIAS_TWITTER_PULL_ADAPTER = "pull_twitter";
@@ -223,7 +223,7 @@
public static final String KEY_STREAM_SOURCE = "stream-source";
public static final String EXTERNAL = "external";
public static final String KEY_READER_FACTORY = "reader-factory";
- public static final String READER_RSS = "rss";
+ public static final String READER_RSS = "rss_feed";
public static final String FORMAT_CSV = "csv";
public static final String ERROR_PARSE_RECORD = "Parser failed to parse record";
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/TwitterUtil.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/TwitterUtil.java
index 4fb602b..70d31c0 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/TwitterUtil.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/TwitterUtil.java
@@ -46,12 +46,12 @@
public static final String LOCATION_EU = "Europe";
public static final String LANGUAGES = "languages"; // languages to track
public static final String TRACK = "keywords"; // terms to track
- public static final String FILTER_LEVEL = "filter-level";
+ public static final String FILTER_LEVEL = "filter-level";
}
public static class GeoConstants {
- public static final double[][] US = new double[][] { { -124.848974, 24.396308 }, { -66.885444, 49.384358 } };
- public static final double[][] EU = new double[][]{{-29.7,36.7},{79.2,72.0}};
+ private static final double[][] US = new double[][] { { -124.848974, 24.396308 }, { -66.885444, 49.384358 } };
+ private static final double[][] EU = new double[][] { { -29.7, 36.7 }, { 79.2, 72.0 } };
public static Map<String, double[][]> boundingBoxes = initializeBoundingBoxes();
}
@@ -83,24 +83,25 @@
if (m.matches()) {
String[] locationStrings = locationValue.trim().split(";\\s*");
- locations = new double[locationStrings.length*2][2];
+ locations = new double[locationStrings.length * 2][2];
- for(int i=0; i<locationStrings.length; i++) {
+ for (int i = 0; i < locationStrings.length; i++) {
if (locationStrings[i].contains(",")) {
String[] coordinatesString = locationStrings[i].split(",");
- for(int k=0; k < 2; k++) {
+ for (int k = 0; k < 2; k++) {
for (int l = 0; l < 2; l++) {
try {
locations[2 * i + k][l] = Double.parseDouble(coordinatesString[2 * k + l]);
} catch (NumberFormatException ne) {
- throw new AsterixException("Incorrect coordinate value " + coordinatesString[2 * k + l]);
+ throw new AsterixException(
+ "Incorrect coordinate value " + coordinatesString[2 * k + l]);
}
}
}
} else if (GeoConstants.boundingBoxes.containsKey(locationStrings[i])) {
// Only add known locations
double loc[][] = GeoConstants.boundingBoxes.get(locationStrings[i]);
- for(int k=0; k < 2; k++) {
+ for (int k = 0; k < 2; k++) {
for (int l = 0; l < 2; l++) {
locations[2 * i + k][l] = loc[k][l];
}
@@ -132,7 +133,7 @@
if (trackValue.contains(",")) {
keywords = trackValue.trim().split(",\\s*");
} else {
- keywords = new String[] {trackValue};
+ keywords = new String[] { trackValue };
}
filterQuery = filterQuery.track(keywords);
}
@@ -146,7 +147,7 @@
if (langValue.contains(",")) {
languages = langValue.trim().split(",\\s*");
} else {
- languages = new String[] {langValue};
+ languages = new String[] { langValue };
}
filterQuery = filterQuery.language(languages);
}
@@ -163,9 +164,9 @@
}
// Filtering level: none, low or medium (defaul=none)
- if(filterQuery != null) {
+ if (filterQuery != null) {
String filterValue = configuration.get(ConfigurationConstants.FILTER_LEVEL);
- if (filterValue!=null) {
+ if (filterValue != null) {
filterQuery = filterQuery.filterLevel(filterValue);
}
}
@@ -188,8 +189,11 @@
builder.append(AuthenticationConstants.OAUTH_ACCESS_TOKEN + "\n");
builder.append(AuthenticationConstants.OAUTH_ACCESS_TOKEN_SECRET + "\n");
LOGGER.warning(builder.toString());
- LOGGER.warning("Unable to configure Twitter adapter due to incomplete/incorrect authentication credentials");
- LOGGER.warning("For details on how to obtain OAuth authentication token, visit https://dev.twitter.com/oauth/overview/application-owner-access-tokens");
+ LOGGER.warning(
+ "Unable to configure Twitter adapter due to incomplete/incorrect authentication credentials");
+ LOGGER.warning(
+ "For details on how to obtain OAuth authentication token, visit https://dev.twitter.com/oauth"
+ + "/overview/application-owner-access-tokens");
}
}
Twitter twitter = tf.getInstance();
@@ -205,6 +209,7 @@
private static ConfigurationBuilder getAuthConfiguration(Map<String, String> configuration) {
ConfigurationBuilder cb = new ConfigurationBuilder();
cb.setDebugEnabled(true);
+ cb.setJSONStoreEnabled(true);
String oAuthConsumerKey = configuration.get(AuthenticationConstants.OAUTH_CONSUMER_KEY);
String oAuthConsumerSecret = configuration.get(AuthenticationConstants.OAUTH_CONSUMER_SECRET);
String oAuthAccessToken = configuration.get(AuthenticationConstants.OAUTH_ACCESS_TOKEN);
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/AOrderedListType.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/AOrderedListType.java
index e16633e..a5af127 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/AOrderedListType.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/AOrderedListType.java
@@ -18,6 +18,7 @@
*/
package org.apache.asterix.om.types;
+import org.apache.asterix.om.base.AOrderedList;
import org.apache.asterix.om.base.IAObject;
import org.json.JSONException;
import org.json.JSONObject;
@@ -26,6 +27,8 @@
private static final long serialVersionUID = 1L;
+ public static final AOrderedListType FULL_OPEN_ORDEREDLIST_TYPE = new AOrderedListType(null,"");
+
/**
* @param itemType
* if null, the list is untyped
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/AUnorderedListType.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/AUnorderedListType.java
index 80b13b5..febc6ad 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/AUnorderedListType.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/AUnorderedListType.java
@@ -18,6 +18,7 @@
*/
package org.apache.asterix.om.types;
+import org.apache.asterix.om.base.AUnorderedList;
import org.apache.asterix.om.base.IAObject;
import org.json.JSONException;
import org.json.JSONObject;
@@ -26,6 +27,8 @@
private static final long serialVersionUID = 1L;
+ public static final AUnorderedListType FULLY_OPEN_UNORDEREDLIST_TYPE = new AUnorderedListType(null,"");
+
/**
* @param itemType
* if null, the collection is untyped
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/TypeTagUtil.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/TypeTagUtil.java
index 2a50506..b6c5712 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/TypeTagUtil.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/TypeTagUtil.java
@@ -80,6 +80,12 @@
return BuiltinType.ADAYTIMEDURATION;
case UUID:
return BuiltinType.AUUID;
+ case RECORD:
+ return ARecordType.FULLY_OPEN_RECORD_TYPE;
+ case UNORDEREDLIST:
+ return AUnorderedListType.FULLY_OPEN_UNORDEREDLIST_TYPE;
+ case ORDEREDLIST:
+ return AOrderedListType.FULL_OPEN_ORDEREDLIST_TYPE;
default:
throw new AsterixException("Typetag " + typeTag + " is not a built-in type");
}