TweetParser Extension

This patch includes following changes:

1. ExtendedTweetParser to parse more than fix attributes.

2. Changed the twitter feeds message unit from Status to String.

Change-Id: I7021e7b779de05b9ec999a8d5f8464fb0ab413c0
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1002
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/revised-tweet-parser/revised-tweet-parser.1.ddl.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/revised-tweet-parser/revised-tweet-parser.1.ddl.aql
new file mode 100644
index 0000000..ddffc43
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/revised-tweet-parser/revised-tweet-parser.1.ddl.aql
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : This test query will check the revised
+ *                parser from two perspective: open Tweet
+ *                will automatically includes all fields
+ *                from tweet status; closed TwitterUser
+ *                type will only includes specified fields.
+ *                consumer.secret is missing here to create
+ *                exception.
+ * Expected Res : Failure
+ */
+
+drop dataverse feeds if exists;
+create dataverse feeds;
+use dataverse feeds;
+
+create type TwitterUser as closed{
+        screen_name: string,
+        lang: string,
+        friends_count: int32,
+        statuses_count: int32
+    };
+
+create type Tweet as open
+{
+  id: int64,
+  user: TwitterUser
+}
+
+create dataset Tweets (Tweet)
+primary key id;
+
+create feed TwitterFeed using push_twitter(
+("type-name"="Tweet"),
+("format"="twitter-status"),//
+("consumer.key"="************"),
+("access.token"="************"),
+("access.token.secret"="************"));
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/revised-tweet-parser/revised-tweet-parser.2.update.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/revised-tweet-parser/revised-tweet-parser.2.update.aql
new file mode 100644
index 0000000..66ede3f
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/revised-tweet-parser/revised-tweet-parser.2.update.aql
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+use dataverse feeds;
+connect feed TwitterFeed to dataset Tweets;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
index 6b63ff0..e5710bc 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
@@ -150,6 +150,13 @@
       </compilation-unit>
     </test-case>
     <test-case FilePath="feeds">
+      <compilation-unit name="revised-tweet-parser">
+        <output-dir compare="Text">revised-tweet-parser</output-dir>
+        <expected-error>One or more parameters are missing from adapter configuration</expected-error>
+        <expected-error>Unknown source feed</expected-error>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="feeds">
       <compilation-unit name="feed-with-external-parser">
         <output-dir compare="Text">feed-with-external-parser</output-dir>
       </compilation-unit>
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPullRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPullRecordReader.java
index e31325a..5a7b4b9 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPullRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPullRecordReader.java
@@ -27,14 +27,14 @@
 import org.apache.asterix.external.input.record.GenericRecord;
 import org.apache.asterix.external.util.FeedLogManager;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-
 import twitter4j.Query;
 import twitter4j.QueryResult;
 import twitter4j.Status;
 import twitter4j.Twitter;
 import twitter4j.TwitterException;
+import twitter4j.TwitterObjectFactory;
 
-public class TwitterPullRecordReader implements IRecordReader<Status> {
+public class TwitterPullRecordReader implements IRecordReader<String> {
 
     private Query query;
     private Twitter twitter;
@@ -42,18 +42,19 @@
     private QueryResult result;
     private int nextTweetIndex = 0;
     private long lastTweetIdReceived = 0;
-    private GenericRecord<Status> record;
+    private GenericRecord<String> record;
 
     public TwitterPullRecordReader(Twitter twitter, String keywords, int requestInterval) {
         this.twitter = twitter;
         this.requestInterval = requestInterval;
         this.query = new Query(keywords);
         this.query.setCount(100);
-        this.record = new GenericRecord<Status>();
+        this.record = new GenericRecord<>();
     }
 
     @Override
     public void close() throws IOException {
+        // do nothing
     }
 
     @Override
@@ -62,7 +63,7 @@
     }
 
     @Override
-    public IRawRecord<Status> next() throws IOException, InterruptedException {
+    public IRawRecord<String> next() throws IOException, InterruptedException {
         if (result == null || nextTweetIndex >= result.getTweets().size()) {
             Thread.sleep(1000 * requestInterval);
             query.setSinceId(lastTweetIdReceived);
@@ -79,7 +80,8 @@
             if (lastTweetIdReceived < tweet.getId()) {
                 lastTweetIdReceived = tweet.getId();
             }
-            record.set(tweet);
+            String jsonTweet = TwitterObjectFactory.getRawJSON(tweet); // transform tweet obj to json
+            record.set(jsonTweet);
             return record;
         } else {
             return null;
@@ -93,10 +95,12 @@
 
     @Override
     public void setFeedLogManager(FeedLogManager feedLogManager) {
+        // do nothing
     }
 
     @Override
     public void setController(AbstractFeedDataFlowController controller) {
+        // do nothing
     }
 
     @Override
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java
index f04cdb9..9ead8a9 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java
@@ -26,31 +26,31 @@
 import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
 import org.apache.asterix.external.input.record.GenericRecord;
 import org.apache.asterix.external.util.FeedLogManager;
-
 import twitter4j.FilterQuery;
 import twitter4j.StallWarning;
 import twitter4j.Status;
 import twitter4j.StatusDeletionNotice;
 import twitter4j.StatusListener;
+import twitter4j.TwitterObjectFactory;
 import twitter4j.TwitterStream;
 
-public class TwitterPushRecordReader implements IRecordReader<Status> {
-    private LinkedBlockingQueue<Status> inputQ;
+public class TwitterPushRecordReader implements IRecordReader<String> {
+    private LinkedBlockingQueue<String> inputQ;
     private TwitterStream twitterStream;
-    private GenericRecord<Status> record;
+    private GenericRecord<String> record;
     private boolean closed = false;
 
     public TwitterPushRecordReader(TwitterStream twitterStream, FilterQuery query) {
-        record = new GenericRecord<Status>();
-        inputQ = new LinkedBlockingQueue<Status>();
+        record = new GenericRecord<>();
+        inputQ = new LinkedBlockingQueue<>();
         this.twitterStream = twitterStream;//TwitterUtil.getTwitterStream(configuration);
         this.twitterStream.addListener(new TweetListener(inputQ));
         this.twitterStream.filter(query);
     }
 
     public TwitterPushRecordReader(TwitterStream twitterStream) {
-        record = new GenericRecord<Status>();
-        inputQ = new LinkedBlockingQueue<Status>();
+        record = new GenericRecord<>();
+        inputQ = new LinkedBlockingQueue<>();
         this.twitterStream = twitterStream;//
         this.twitterStream.addListener(new TweetListener(inputQ));
         twitterStream.sample();
@@ -72,8 +72,8 @@
     }
 
     @Override
-    public IRawRecord<Status> next() throws IOException, InterruptedException {
-        Status tweet = inputQ.poll();
+    public IRawRecord<String> next() throws IOException, InterruptedException {
+        String tweet = inputQ.poll();
         if (tweet == null) {
             return null;
         }
@@ -93,45 +93,52 @@
 
     private class TweetListener implements StatusListener {
 
-        private LinkedBlockingQueue<Status> inputQ;
+        private LinkedBlockingQueue<String> inputQ;
 
-        public TweetListener(LinkedBlockingQueue<Status> inputQ) {
+        public TweetListener(LinkedBlockingQueue<String> inputQ) {
             this.inputQ = inputQ;
         }
 
         @Override
         public void onStatus(Status tweet) {
-            inputQ.add(tweet);
+            String jsonTweet = TwitterObjectFactory.getRawJSON(tweet);
+            inputQ.add(jsonTweet);
         }
 
         @Override
         public void onException(Exception arg0) {
-
+            // do nothing
         }
 
         @Override
         public void onDeletionNotice(StatusDeletionNotice arg0) {
+            // do nothing
         }
 
         @Override
         public void onScrubGeo(long arg0, long arg1) {
+            // do nothing
         }
 
         @Override
         public void onStallWarning(StallWarning arg0) {
+            // do nothing
         }
 
         @Override
         public void onTrackLimitationNotice(int arg0) {
+            // do nothing
         }
     }
 
     @Override
     public void setFeedLogManager(FeedLogManager feedLogManager) {
+        // do nothing
     }
 
     @Override
     public void setController(AbstractFeedDataFlowController controller) {
+        // do nothing
     }
 
     @Override
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
index 541737a..172b22b 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
@@ -37,7 +37,7 @@
 import twitter4j.FilterQuery;
 import twitter4j.Status;
 
-public class TwitterRecordReaderFactory implements IRecordReaderFactory<Status> {
+public class TwitterRecordReaderFactory implements IRecordReaderFactory<String> {
 
     private static final long serialVersionUID = 1L;
     private static final Logger LOGGER = Logger.getLogger(TwitterRecordReaderFactory.class.getName());
@@ -114,7 +114,7 @@
     }
 
     @Override
-    public IRecordReader<? extends Status> createRecordReader(IHyracksTaskContext ctx, int partition)
+    public IRecordReader<? extends String> createRecordReader(IHyracksTaskContext ctx, int partition)
             throws HyracksDataException {
         if (pull) {
             return new TwitterPullRecordReader(TwitterUtil.getTwitterService(configuration),
@@ -133,8 +133,8 @@
     }
 
     @Override
-    public Class<? extends Status> getRecordClass() {
-        return Status.class;
+    public Class<? extends String> getRecordClass() {
+        return String.class;
     }
 
     private boolean validateConfiguration(Map<String, String> configuration) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectAccessors.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectAccessors.java
index 5923354..ab908bf 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectAccessors.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectAccessors.java
@@ -18,12 +18,6 @@
  */
 package org.apache.asterix.external.library.java;
 
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.util.LinkedHashMap;
-import java.util.List;
-
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.dataflow.data.nontagged.serde.ABooleanSerializerDeserializer;
 import org.apache.asterix.dataflow.data.nontagged.serde.ACircleSerializerDeserializer;
@@ -83,14 +77,20 @@
 import org.apache.asterix.om.pointables.base.IVisitablePointable;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.ATypeTag;
-import org.apache.asterix.om.types.AbstractCollectionType;
 import org.apache.asterix.om.types.BuiltinType;
 import org.apache.asterix.om.types.EnumDeserializer;
 import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.types.TypeTagUtil;
 import org.apache.asterix.om.util.container.IObjectPool;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.util.string.UTF8StringReader;
 
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.LinkedHashMap;
+import java.util.List;
+
 public class JObjectAccessors {
 
     public static IJObjectAccessor createFlatJObjectAccessor(ATypeTag aTypeTag) {
@@ -465,15 +465,18 @@
             List<IVisitablePointable> fieldTypeTags = recordPointable.getFieldTypeTags();
             List<IVisitablePointable> fieldNames = recordPointable.getFieldNames();
             int index = 0;
-            boolean closedPart = true;
+            boolean closedPart;
             try {
                 IJObject fieldObject = null;
                 for (IVisitablePointable fieldPointable : fieldPointables) {
                     closedPart = index < recordType.getFieldTypes().length;
                     IVisitablePointable tt = fieldTypeTags.get(index);
-                    IAType fieldType = closedPart ? recordType.getFieldTypes()[index] : null;
                     ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER
                             .deserialize(tt.getByteArray()[tt.getStartOffset()]);
+                    IAType fieldType;
+                    fieldType = closedPart ?
+                            recordType.getFieldTypes()[index] :
+                            TypeTagUtil.getBuiltinTypeByTag(typeTag);
                     IVisitablePointable fieldName = fieldNames.get(index);
                     typeInfo.reset(fieldType, typeTag);
                     switch (typeTag) {
@@ -486,8 +489,8 @@
                                 // value is null
                                 fieldObject = null;
                             } else {
-                                fieldObject = pointableVisitor.visit((AListVisitablePointable) fieldPointable,
-                                        typeInfo);
+                                fieldObject = pointableVisitor
+                                        .visit((AListVisitablePointable) fieldPointable, typeInfo);
                             }
                             break;
                         case ANY:
@@ -536,15 +539,16 @@
             List<IVisitablePointable> items = pointable.getItems();
             List<IVisitablePointable> itemTags = pointable.getItemTags();
             JList list = pointable.ordered() ? new JOrderedList(listType) : new JUnorderedList(listType);
-            IJObject listItem = null;
+            IJObject listItem;
             int index = 0;
             try {
-
                 for (IVisitablePointable itemPointable : items) {
                     IVisitablePointable itemTagPointable = itemTags.get(index);
                     ATypeTag itemTypeTag = EnumDeserializer.ATYPETAGDESERIALIZER
                             .deserialize(itemTagPointable.getByteArray()[itemTagPointable.getStartOffset()]);
-                    typeInfo.reset(listType.getType(), listType.getTypeTag());
+                    IAType fieldType;
+                    fieldType = TypeTagUtil.getBuiltinTypeByTag(itemTypeTag);
+                    typeInfo.reset(fieldType, itemTypeTag);
                     switch (itemTypeTag) {
                         case RECORD:
                             listItem = pointableVisitor.visit((ARecordVisitablePointable) itemPointable, typeInfo);
@@ -557,10 +561,7 @@
                             throw new IllegalArgumentException(
                                     "Cannot parse list item of type " + listType.getTypeTag());
                         default:
-                            IAType itemType = ((AbstractCollectionType) listType).getItemType();
-                            typeInfo.reset(itemType, itemType.getTypeTag());
                             listItem = pointableVisitor.visit((AFlatValuePointable) itemPointable, typeInfo);
-
                     }
                     list.add(listItem);
                 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/TweetParser.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/TweetParser.java
index 522da06..8d483dc 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/TweetParser.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/TweetParser.java
@@ -18,111 +18,236 @@
  */
 package org.apache.asterix.external.parser;
 
-import java.io.DataOutput;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.asterix.builders.RecordBuilder;
-import org.apache.asterix.external.api.IDataParser;
+import org.apache.asterix.builders.AbvsBuilderFactory;
+import org.apache.asterix.builders.IARecordBuilder;
+import org.apache.asterix.builders.IAsterixListBuilder;
+import org.apache.asterix.builders.ListBuilderFactory;
+import org.apache.asterix.builders.RecordBuilderFactory;
+import org.apache.asterix.builders.UnorderedListBuilder;
 import org.apache.asterix.external.api.IRawRecord;
 import org.apache.asterix.external.api.IRecordDataParser;
-import org.apache.asterix.external.library.java.JObjectUtil;
-import org.apache.asterix.external.util.Datatypes.Tweet;
-import org.apache.asterix.om.base.AMutableDouble;
-import org.apache.asterix.om.base.AMutableInt32;
-import org.apache.asterix.om.base.AMutableRecord;
-import org.apache.asterix.om.base.AMutableString;
-import org.apache.asterix.om.base.IAObject;
+import org.apache.asterix.om.base.AMutablePoint;
+import org.apache.asterix.om.base.ANull;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.BuiltinType;
 import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.util.container.IObjectPool;
+import org.apache.asterix.om.util.container.ListObjectPool;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IMutableValueStorage;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.util.string.UTF8StringWriter;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
 
-import twitter4j.Status;
-import twitter4j.User;
+import java.io.DataOutput;
+import java.io.IOException;
 
-public class TweetParser implements IRecordDataParser<Status> {
-
-    private IAObject[] mutableTweetFields;
-    private IAObject[] mutableUserFields;
-    private AMutableRecord mutableRecord;
-    private AMutableRecord mutableUser;
-    private final Map<String, Integer> userFieldNameMap = new HashMap<>();
-    private final Map<String, Integer> tweetFieldNameMap = new HashMap<>();
-    private RecordBuilder recordBuilder = new RecordBuilder();
+public class TweetParser extends AbstractDataParser implements IRecordDataParser<String> {
+    private final IObjectPool<IARecordBuilder, ATypeTag> recordBuilderPool = new ListObjectPool<>(
+            new RecordBuilderFactory());
+    private final IObjectPool<IAsterixListBuilder, ATypeTag> listBuilderPool = new ListObjectPool<>(
+            new ListBuilderFactory());
+    private final IObjectPool<IMutableValueStorage, ATypeTag> abvsBuilderPool = new ListObjectPool<>(
+            new AbvsBuilderFactory());
+    private ARecordType recordType;
+    private UTF8StringWriter utf8Writer = new UTF8StringWriter();
 
     public TweetParser(ARecordType recordType) {
-        initFieldNames(recordType);
-        mutableUserFields = new IAObject[] { new AMutableString(null), new AMutableString(null), new AMutableInt32(0),
-                new AMutableInt32(0), new AMutableString(null), new AMutableInt32(0) };
-        mutableUser = new AMutableRecord((ARecordType) recordType.getFieldTypes()[tweetFieldNameMap.get(Tweet.USER)],
-                mutableUserFields);
-
-        mutableTweetFields = new IAObject[] { new AMutableString(null), mutableUser, new AMutableDouble(0),
-                new AMutableDouble(0), new AMutableString(null), new AMutableString(null) };
-        mutableRecord = new AMutableRecord(recordType, mutableTweetFields);
+        this.recordType = recordType;
+        aPoint = new AMutablePoint(0, 0);
     }
 
-    // Initialize the hashmap values for the field names and positions
-    private void initFieldNames(ARecordType recordType) {
-        String tweetFields[] = recordType.getFieldNames();
-        for (int i = 0; i < tweetFields.length; i++) {
-            tweetFieldNameMap.put(tweetFields[i], i);
-            if (tweetFields[i].equals(Tweet.USER)) {
-                IAType fieldType = recordType.getFieldTypes()[i];
-                if (fieldType.getTypeTag() == ATypeTag.RECORD) {
-                    String userFields[] = ((ARecordType) fieldType).getFieldNames();
-                    for (int j = 0; j < userFields.length; j++) {
-                        userFieldNameMap.put(userFields[j], j);
-                    }
-                }
+    private void parseUnorderedList(JSONArray jArray, DataOutput output) throws IOException, JSONException {
+        ArrayBackedValueStorage itemBuffer = getTempBuffer();
+        UnorderedListBuilder unorderedListBuilder = (UnorderedListBuilder) getUnorderedListBuilder();
 
+        unorderedListBuilder.reset(null);
+        for (int iter1 = 0; iter1 < jArray.length(); iter1++) {
+            itemBuffer.reset();
+            if (writeField(jArray.get(iter1), null, itemBuffer.getDataOutput())) {
+                unorderedListBuilder.addItem(itemBuffer);
             }
         }
+        unorderedListBuilder.write(output, true);
+    }
+
+    private boolean writeField(Object fieldObj, IAType fieldType, DataOutput out) throws IOException, JSONException {
+        boolean writeResult = true;
+        if (fieldType != null) {
+            switch (fieldType.getTypeTag()) {
+                case STRING:
+                    out.write(BuiltinType.ASTRING.getTypeTag().serialize());
+                    utf8Writer.writeUTF8(fieldObj.toString(), out);
+                    break;
+                case INT64:
+                    aInt64.setValue((long) fieldObj);
+                    int64Serde.serialize(aInt64, out);
+                    break;
+                case INT32:
+                    out.write(BuiltinType.AINT32.getTypeTag().serialize());
+                    out.writeInt((Integer) fieldObj);
+                    break;
+                case DOUBLE:
+                    out.write(BuiltinType.ADOUBLE.getTypeTag().serialize());
+                    out.writeDouble((Double) fieldObj);
+                    break;
+                case BOOLEAN:
+                    out.write(BuiltinType.ABOOLEAN.getTypeTag().serialize());
+                    out.writeBoolean((Boolean) fieldObj);
+                    break;
+                case RECORD:
+                    writeRecord((JSONObject) fieldObj, out, (ARecordType) fieldType);
+                    break;
+                default:
+                    writeResult = false;
+            }
+        } else {
+            if (fieldObj == JSONObject.NULL) {
+                nullSerde.serialize(ANull.NULL, out);
+            } else if (fieldObj instanceof Integer) {
+                out.write(BuiltinType.AINT32.getTypeTag().serialize());
+                out.writeInt((Integer) fieldObj);
+            } else if (fieldObj instanceof Boolean) {
+                out.write(BuiltinType.ABOOLEAN.getTypeTag().serialize());
+                out.writeBoolean((Boolean) fieldObj);
+            } else if (fieldObj instanceof Double) {
+                out.write(BuiltinType.ADOUBLE.getTypeTag().serialize());
+                out.writeDouble((Double) fieldObj);
+            } else if (fieldObj instanceof Long) {
+                out.write(BuiltinType.AINT64.getTypeTag().serialize());
+                out.writeLong((Long) fieldObj);
+            } else if (fieldObj instanceof String) {
+                out.write(BuiltinType.ASTRING.getTypeTag().serialize());
+                utf8Writer.writeUTF8((String) fieldObj, out);
+            } else if (fieldObj instanceof JSONArray) {
+                if (((JSONArray) fieldObj).length() != 0) {
+                    parseUnorderedList((JSONArray) fieldObj, out);
+                } else {
+                    writeResult = false;
+                }
+            } else if (fieldObj instanceof JSONObject) {
+                if (((JSONObject) fieldObj).length() != 0) {
+                    writeRecord((JSONObject) fieldObj, out, null);
+                } else {
+                    writeResult = false;
+                }
+            }
+        }
+        return writeResult;
+    }
+
+    private int checkAttrNameIdx(String[] nameList, String name) {
+        int idx = 0;
+        if (nameList != null) {
+            for (String nln : nameList) {
+                if (name.equals(nln)) {
+                    return idx;
+                }
+                idx++;
+            }
+        }
+        return -1;
+    }
+
+    public void writeRecord(JSONObject obj, DataOutput out, ARecordType curRecType) throws IOException, JSONException {
+        IAType[] curTypes = null;
+        String[] curFNames = null;
+        int fieldN;
+        int attrIdx;
+
+        ArrayBackedValueStorage fieldValueBuffer = getTempBuffer();
+        ArrayBackedValueStorage fieldNameBuffer = getTempBuffer();
+        IARecordBuilder recBuilder = getRecordBuilder();
+
+        if (curRecType != null) {
+            curTypes = curRecType.getFieldTypes();
+            curFNames = curRecType.getFieldNames();
+        }
+
+        recBuilder.reset(curRecType);
+        recBuilder.init();
+
+        if (curRecType != null && !curRecType.isOpen()) {
+            // closed record type
+            fieldN = curFNames.length;
+            for (int iter1 = 0; iter1 < fieldN; iter1++) {
+                fieldValueBuffer.reset();
+                DataOutput fieldOutput = fieldValueBuffer.getDataOutput();
+                if (obj.isNull(curFNames[iter1])) {
+                    if (curRecType.isClosedField(curFNames[iter1])) {
+                        throw new HyracksDataException("Closed field " + curFNames[iter1] + " has null value.");
+                    } else {
+                        continue;
+                    }
+                } else {
+                    if (writeField(obj.get(curFNames[iter1]), curTypes[iter1], fieldOutput)) {
+                        recBuilder.addField(iter1, fieldValueBuffer);
+                    }
+                }
+            }
+        } else {
+            //open record type
+            int closedFieldCount = 0;
+            IAType curFieldType = null;
+            for (String attrName : JSONObject.getNames(obj)) {
+                if (obj.isNull(attrName) || obj.length() == 0) {
+                    continue;
+                }
+                attrIdx = checkAttrNameIdx(curFNames, attrName);
+                if (curRecType != null) {
+                    curFieldType = curRecType.getFieldType(attrName);
+                }
+                fieldValueBuffer.reset();
+                fieldNameBuffer.reset();
+                DataOutput fieldOutput = fieldValueBuffer.getDataOutput();
+                if (writeField(obj.get(attrName), curFieldType, fieldOutput)) {
+                    if (attrIdx == -1) {
+                        aString.setValue(attrName);
+                        stringSerde.serialize(aString, fieldNameBuffer.getDataOutput());
+                        recBuilder.addField(fieldNameBuffer, fieldValueBuffer);
+                    } else {
+                        recBuilder.addField(attrIdx, fieldValueBuffer);
+                        closedFieldCount++;
+                    }
+                }
+            }
+            if (curRecType != null && closedFieldCount < curFNames.length) {
+                throw new HyracksDataException("Non-null field is null");
+            }
+        }
+        recBuilder.write(out, true);
+    }
+
+    private IARecordBuilder getRecordBuilder() {
+        return recordBuilderPool.allocate(ATypeTag.RECORD);
+    }
+
+    private IAsterixListBuilder getUnorderedListBuilder() {
+        return listBuilderPool.allocate(ATypeTag.UNORDEREDLIST);
+    }
+
+    private ArrayBackedValueStorage getTempBuffer() {
+        return (ArrayBackedValueStorage) abvsBuilderPool.allocate(ATypeTag.BINARY);
     }
 
     @Override
-    public void parse(IRawRecord<? extends Status> record, DataOutput out) throws HyracksDataException {
-        Status tweet = record.get();
-        User user = tweet.getUser();
-        // Tweet user data
-        ((AMutableString) mutableUserFields[userFieldNameMap.get(Tweet.SCREEN_NAME)])
-                .setValue(JObjectUtil.getNormalizedString(user.getScreenName()));
-        ((AMutableString) mutableUserFields[userFieldNameMap.get(Tweet.LANGUAGE)])
-                .setValue(JObjectUtil.getNormalizedString(user.getLang()));
-        ((AMutableInt32) mutableUserFields[userFieldNameMap.get(Tweet.FRIENDS_COUNT)]).setValue(user.getFriendsCount());
-        ((AMutableInt32) mutableUserFields[userFieldNameMap.get(Tweet.STATUS_COUNT)]).setValue(user.getStatusesCount());
-        ((AMutableString) mutableUserFields[userFieldNameMap.get(Tweet.NAME)])
-                .setValue(JObjectUtil.getNormalizedString(user.getName()));
-        ((AMutableInt32) mutableUserFields[userFieldNameMap.get(Tweet.FOLLOWERS_COUNT)])
-                .setValue(user.getFollowersCount());
-
-        // Tweet data
-        ((AMutableString) mutableTweetFields[tweetFieldNameMap.get(Tweet.ID)]).setValue(String.valueOf(tweet.getId()));
-
-        int userPos = tweetFieldNameMap.get(Tweet.USER);
-        for (int i = 0; i < mutableUserFields.length; i++) {
-            ((AMutableRecord) mutableTweetFields[userPos]).setValueAtPos(i, mutableUserFields[i]);
+    public void parse(IRawRecord<? extends String> record, DataOutput out) throws HyracksDataException {
+        try {
+            //TODO get rid of this temporary json
+            resetPools();
+            JSONObject jsObj = new JSONObject(record.get());
+            writeRecord(jsObj, out, recordType);
+        } catch (JSONException | IOException e) {
+            throw new HyracksDataException(e);
         }
-        if (tweet.getGeoLocation() != null) {
-            ((AMutableDouble) mutableTweetFields[tweetFieldNameMap.get(Tweet.LATITUDE)])
-                    .setValue(tweet.getGeoLocation().getLatitude());
-            ((AMutableDouble) mutableTweetFields[tweetFieldNameMap.get(Tweet.LONGITUDE)])
-                    .setValue(tweet.getGeoLocation().getLongitude());
-        } else {
-            ((AMutableDouble) mutableTweetFields[tweetFieldNameMap.get(Tweet.LATITUDE)]).setValue(0);
-            ((AMutableDouble) mutableTweetFields[tweetFieldNameMap.get(Tweet.LONGITUDE)]).setValue(0);
-        }
-        ((AMutableString) mutableTweetFields[tweetFieldNameMap.get(Tweet.CREATED_AT)])
-                .setValue(JObjectUtil.getNormalizedString(tweet.getCreatedAt().toString()));
-        ((AMutableString) mutableTweetFields[tweetFieldNameMap.get(Tweet.MESSAGE)])
-                .setValue(JObjectUtil.getNormalizedString(tweet.getText()));
+    }
 
-        for (int i = 0; i < mutableTweetFields.length; i++) {
-            mutableRecord.setValueAtPos(i, mutableTweetFields[i]);
-        }
-        recordBuilder.reset(mutableRecord.getType());
-        recordBuilder.init();
-        IDataParser.writeRecord(mutableRecord, out, recordBuilder);
+    private void resetPools() {
+        listBuilderPool.reset();
+        recordBuilderPool.reset();
+        abvsBuilderPool.reset();
     }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/TweetParserFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/TweetParserFactory.java
index d6e536d..3539f6e 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/TweetParserFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/TweetParserFactory.java
@@ -28,7 +28,7 @@
 
 import twitter4j.Status;
 
-public class TweetParserFactory implements IRecordDataParserFactory<Status> {
+public class TweetParserFactory implements IRecordDataParserFactory<String> {
 
     private static final long serialVersionUID = 1L;
     private ARecordType recordType;
@@ -44,18 +44,19 @@
     }
 
     @Override
-    public IRecordDataParser<Status> createRecordParser(IHyracksTaskContext ctx) {
+    public IRecordDataParser<String> createRecordParser(IHyracksTaskContext ctx) {
         TweetParser dataParser = new TweetParser(recordType);
         return dataParser;
     }
 
     @Override
-    public Class<? extends Status> getRecordClass() {
-        return Status.class;
+    public Class<? extends String> getRecordClass() {
+        return String.class;
     }
 
     @Override
     public void setMetaType(ARecordType metaType) {
+        // do nothing
     }
 
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/Datatypes.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/Datatypes.java
index e1a7911..94d7b53 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/Datatypes.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/Datatypes.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.external.util;
 
+import org.apache.asterix.external.parser.TweetParser;
+
 public class Datatypes {
 
     /*
@@ -42,16 +44,37 @@
     public static class Tweet {
         public static final String ID = "id";
         public static final String USER = "user";
+        public static final String GEOLOCATION = "geo";
+        public static final String CREATED_AT = "created_at";
+        public static final String TEXT = "text";
+        public static final String COUNTRY = "country";
+        public static final String PLACE = "place";
+        public static final String SOURCE = "source";
+        public static final String TRUNCATED = "truncated";
+        public static final String IN_REPLY_TO_STATUS_ID = "in_reply_to_status_id";
+        public static final String IN_REPLY_TO_USER_ID = "in_reply_to_user_id";
+        public static final String IN_REPLY_TO_SCREENNAME = "in_reply_to_screen_name";
+        public static final String FAVORITED = "favorited";
+        public static final String RETWEETED = "retweeted";
+        public static final String FAVORITE_COUNT = "favorite_count";
+        public static final String RETWEET_COUNT = "retweet_count";
+        public static final String CONTRIBUTORS = "contributors";
+        public static final String LANGUAGE = "lang";
+        public static final String FILTER_LEVEL = "filter_level";
+        public static final String TIMESTAMP_MS = "timestamp_ms";
+        public static final String IS_QUOTE_STATUS = "is_quote_status";
+        // in API but not int JSON
+        public static final String SENSITIVE = "sensitive";
+        public static final String RETWEETED_BY_ME = "retweeted_by_me";
+        public static final String CURRENT_USER_RETWEET_ID = "current_user_retweet_id";
+
+        // consistency consider
+        public static final String MESSAGE = "message_text";
         public static final String LATITUDE = "latitude";
         public static final String LONGITUDE = "longitude";
-        public static final String CREATED_AT = "created_at";
-        public static final String MESSAGE = "message_text";
-
-        public static final String COUNTRY = "country";
-
         // User fields (for the sub record "user")
         public static final String SCREEN_NAME = "screen_name";
-        public static final String LANGUAGE = "language";
+        public static final String USER_PREFERRED_LANGUAGE = "user_preferred_language";
         public static final String FRIENDS_COUNT = "friends_count";
         public static final String STATUS_COUNT = "status_count";
         public static final String NAME = "name";
@@ -59,6 +82,73 @@
 
     }
 
+    public static final class Tweet_Place {
+        public static final String ID = "id";
+        public static final String URL = "url";
+        public static final String PLACE_TYPE = "place_type";
+        public static final String NAME = "name";
+        public static final String FULL_NAME = "full_name";
+        public static final String COUNTRY_CODE = "country_code";
+        public static final String COUNTRY = "country";
+        public static final String BOUNDING_BOX = "bounding_box";
+        public static final String ATTRIBUTES = "attributes";
+
+        private Tweet_Place() {
+        }
+    }
+
+    public static final class Tweet_User {
+        public static final String ID = "id";
+        public static final String NAME = "name";
+        public static final String SCREEN_NAME = "screen_name";
+        public static final String LOCATION = "location";
+        public static final String DESCRIPTION = "description";
+        public static final String CONTRIBUTORS_ENABLED = "contributors_enabled";
+        public static final String PROFILE_IMAGE_URL = "profile_image_url";
+        public static final String PROFILE_IMAGE_URL_HTTPS = "profile_image_url_https";
+        public static final String URL = "url";
+        public static final String PROTECTED = "protected";
+        public static final String FOLLOWERS_COUNT = "followers_count";
+        public static final String PROFILE_BACKGROUND_COLOR = "profile_background_color";
+        public static final String PROFILE_TEXT_COLOR = "profile_text_color";
+        public static final String PROFILE_LINK_COLOR = "profile_link_color";
+        public static final String PROFILE_SIDEBAR_FILL_COLOR = "profile_sidebar_fill_color";
+        public static final String PROFILE_SIDEBAR_BORDER_COLOR = "profile_sidebar_border_color";
+        public static final String PROFILE_USE_BACKGROUND_IMAGE = "profile_use_background_image";
+        public static final String DEFAULT_PROFILE = "default_profile";
+        public static final String DEFAULT_PROFILE_IMAGE = "default_profile_image";
+        public static final String FRIENDS_COUNT = "friends_count";
+        public static final String CREATED_AT = "CREATED_AT";
+        public static final String FAVOURITES_COUNT = "favourites_count";
+        public static final String UTC_OFFSET = "utc_offset";
+        public static final String TIME_ZONE = "time_zone";
+        public static final String PROFILE_BACKGROUND_IMAGE_URL = "profile_background_image_url";
+        public static final String PROFILE_BACKGROUND_IMAGE_URL_HTTPS = "profile_background_image_url_https";
+        public static final String PROFILE_BANNER_URL = "profile_banner_url";
+        public static final String LANG = "lang";
+        public static final String STATUSES_COUNT = "statuses_count";
+        public static final String GEO_ENABLED = "geo_enabled";
+        public static final String VERIFIED = "verified";
+        public static final String IS_TRANSLATOR = "is_translator";
+        public static final String LISTED_COUNT = "listed_count";
+        public static final String FOLLOW_REQUEST_SENT = "follow_request_sent";
+        // skip Entities, attrs in API but not in JSON is as below
+        public static final String WITHHELD_IN_COUNTRIES = "withheld_in_countries";
+        public static final String BIGGER_PROFILE_IMAGE_URL = "bigger_profile_image_url";
+        public static final String MINI_PROFILE_IMAGE_URL = "mini_profile_image_url";
+        public static final String ORIGINAL_PROFILE_IMAGE_URL = "original_profile_image_url";
+        public static final String SHOW_ALL_INLINE_MEDIA = "show_all_inline_media";
+        public static final String PROFILE_BANNER_RETINA_URL = "profile_banner_retina_url";
+        public static final String PROFILE_BANNER_IPAD_URL = "profile_banner_ipad_url";
+        public static final String PROFILE_BANNER_IPAD_RETINA_URL = "profile_banner_ipad_retina_url";
+        public static final String PROFILE_BANNER_MOBILE_URL = "profile_banner_mobile_url";
+        public static final String PROFILE_BANNER_MOBILE_RETINA_URL = "profile_banner_mobile_retina_url";
+        public static final String PROFILE_BACKGROUND_TILED = "profile_background_tiled";
+
+        private Tweet_User() {
+        }
+    }
+
     /*
         The following assumes this DDL (but ignoring the field name orders):
         create type ProcessedTweet if not exists as open {
@@ -76,5 +166,4 @@
         public static final String LOCATION = "location";
         public static final String TOPICS = "topics";
     }
-
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index e251f32..b666487 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -170,7 +170,7 @@
     public static final String ALIAS_SOCKET_ADAPTER = "socket_adapter";
     public static final String ALIAS_TWITTER_FIREHOSE_ADAPTER = "twitter_firehose";
     public static final String ALIAS_SOCKET_CLIENT_ADAPTER = "socket_client";
-    public static final String ALIAS_RSS_ADAPTER = "rss_feed";
+    public static final String ALIAS_RSS_ADAPTER = "rss";
     public static final String ALIAS_FILE_FEED_ADAPTER = "file_feed";
     public static final String ALIAS_TWITTER_PUSH_ADAPTER = "push_twitter";
     public static final String ALIAS_TWITTER_PULL_ADAPTER = "pull_twitter";
@@ -223,7 +223,7 @@
     public static final String KEY_STREAM_SOURCE = "stream-source";
     public static final String EXTERNAL = "external";
     public static final String KEY_READER_FACTORY = "reader-factory";
-    public static final String READER_RSS = "rss";
+    public static final String READER_RSS = "rss_feed";
     public static final String FORMAT_CSV = "csv";
 
     public static final String ERROR_PARSE_RECORD = "Parser failed to parse record";
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/TwitterUtil.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/TwitterUtil.java
index 4fb602b..70d31c0 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/TwitterUtil.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/TwitterUtil.java
@@ -46,12 +46,12 @@
         public static final String LOCATION_EU = "Europe";
         public static final String LANGUAGES = "languages"; // languages to track
         public static final String TRACK = "keywords"; // terms to track
-        public static final String  FILTER_LEVEL = "filter-level";
+        public static final String FILTER_LEVEL = "filter-level";
     }
 
     public static class GeoConstants {
-        public static final double[][] US = new double[][] { { -124.848974, 24.396308 }, { -66.885444, 49.384358 } };
-        public static final double[][] EU = new double[][]{{-29.7,36.7},{79.2,72.0}};
+        private static final double[][] US = new double[][] { { -124.848974, 24.396308 }, { -66.885444, 49.384358 } };
+        private static final double[][] EU = new double[][] { { -29.7, 36.7 }, { 79.2, 72.0 } };
         public static Map<String, double[][]> boundingBoxes = initializeBoundingBoxes();
     }
 
@@ -83,24 +83,25 @@
 
         if (m.matches()) {
             String[] locationStrings = locationValue.trim().split(";\\s*");
-            locations = new double[locationStrings.length*2][2];
+            locations = new double[locationStrings.length * 2][2];
 
-            for(int i=0; i<locationStrings.length; i++) {
+            for (int i = 0; i < locationStrings.length; i++) {
                 if (locationStrings[i].contains(",")) {
                     String[] coordinatesString = locationStrings[i].split(",");
-                    for(int k=0; k < 2; k++) {
+                    for (int k = 0; k < 2; k++) {
                         for (int l = 0; l < 2; l++) {
                             try {
                                 locations[2 * i + k][l] = Double.parseDouble(coordinatesString[2 * k + l]);
                             } catch (NumberFormatException ne) {
-                                throw new AsterixException("Incorrect coordinate value " + coordinatesString[2 * k + l]);
+                                throw new AsterixException(
+                                        "Incorrect coordinate value " + coordinatesString[2 * k + l]);
                             }
                         }
                     }
                 } else if (GeoConstants.boundingBoxes.containsKey(locationStrings[i])) {
                     // Only add known locations
                     double loc[][] = GeoConstants.boundingBoxes.get(locationStrings[i]);
-                    for(int k=0; k < 2; k++) {
+                    for (int k = 0; k < 2; k++) {
                         for (int l = 0; l < 2; l++) {
                             locations[2 * i + k][l] = loc[k][l];
                         }
@@ -132,7 +133,7 @@
             if (trackValue.contains(",")) {
                 keywords = trackValue.trim().split(",\\s*");
             } else {
-                keywords = new String[] {trackValue};
+                keywords = new String[] { trackValue };
             }
             filterQuery = filterQuery.track(keywords);
         }
@@ -146,7 +147,7 @@
             if (langValue.contains(",")) {
                 languages = langValue.trim().split(",\\s*");
             } else {
-                languages = new String[] {langValue};
+                languages = new String[] { langValue };
             }
             filterQuery = filterQuery.language(languages);
         }
@@ -163,9 +164,9 @@
         }
 
         // Filtering level: none, low or medium (defaul=none)
-        if(filterQuery != null) {
+        if (filterQuery != null) {
             String filterValue = configuration.get(ConfigurationConstants.FILTER_LEVEL);
-            if (filterValue!=null) {
+            if (filterValue != null) {
                 filterQuery = filterQuery.filterLevel(filterValue);
             }
         }
@@ -188,8 +189,11 @@
                 builder.append(AuthenticationConstants.OAUTH_ACCESS_TOKEN + "\n");
                 builder.append(AuthenticationConstants.OAUTH_ACCESS_TOKEN_SECRET + "\n");
                 LOGGER.warning(builder.toString());
-                LOGGER.warning("Unable to configure Twitter adapter due to incomplete/incorrect authentication credentials");
-                LOGGER.warning("For details on how to obtain OAuth authentication token, visit https://dev.twitter.com/oauth/overview/application-owner-access-tokens");
+                LOGGER.warning(
+                        "Unable to configure Twitter adapter due to incomplete/incorrect authentication credentials");
+                LOGGER.warning(
+                        "For details on how to obtain OAuth authentication token, visit https://dev.twitter.com/oauth"
+                                + "/overview/application-owner-access-tokens");
             }
         }
         Twitter twitter = tf.getInstance();
@@ -205,6 +209,7 @@
     private static ConfigurationBuilder getAuthConfiguration(Map<String, String> configuration) {
         ConfigurationBuilder cb = new ConfigurationBuilder();
         cb.setDebugEnabled(true);
+        cb.setJSONStoreEnabled(true);
         String oAuthConsumerKey = configuration.get(AuthenticationConstants.OAUTH_CONSUMER_KEY);
         String oAuthConsumerSecret = configuration.get(AuthenticationConstants.OAUTH_CONSUMER_SECRET);
         String oAuthAccessToken = configuration.get(AuthenticationConstants.OAUTH_ACCESS_TOKEN);
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/AOrderedListType.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/AOrderedListType.java
index e16633e..a5af127 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/AOrderedListType.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/AOrderedListType.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.om.types;
 
+import org.apache.asterix.om.base.AOrderedList;
 import org.apache.asterix.om.base.IAObject;
 import org.json.JSONException;
 import org.json.JSONObject;
@@ -26,6 +27,8 @@
 
     private static final long serialVersionUID = 1L;
 
+    public static final AOrderedListType FULL_OPEN_ORDEREDLIST_TYPE = new AOrderedListType(null,"");
+
     /**
      * @param itemType
      *            if null, the list is untyped
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/AUnorderedListType.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/AUnorderedListType.java
index 80b13b5..febc6ad 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/AUnorderedListType.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/AUnorderedListType.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.om.types;
 
+import org.apache.asterix.om.base.AUnorderedList;
 import org.apache.asterix.om.base.IAObject;
 import org.json.JSONException;
 import org.json.JSONObject;
@@ -26,6 +27,8 @@
 
     private static final long serialVersionUID = 1L;
 
+    public static final AUnorderedListType FULLY_OPEN_UNORDEREDLIST_TYPE = new AUnorderedListType(null,"");
+
     /**
      * @param itemType
      *            if null, the collection is untyped
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/TypeTagUtil.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/TypeTagUtil.java
index 2a50506..b6c5712 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/TypeTagUtil.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/TypeTagUtil.java
@@ -80,6 +80,12 @@
                 return BuiltinType.ADAYTIMEDURATION;
             case UUID:
                 return BuiltinType.AUUID;
+            case RECORD:
+                return ARecordType.FULLY_OPEN_RECORD_TYPE;
+            case UNORDEREDLIST:
+                return AUnorderedListType.FULLY_OPEN_UNORDEREDLIST_TYPE;
+            case ORDEREDLIST:
+                return AOrderedListType.FULL_OPEN_ORDEREDLIST_TYPE;
             default:
                 throw new AsterixException("Typetag " + typeTag + " is not a built-in type");
         }