Fix ASTERIXDB-1609 and OrderedList bug in TweetParser

1. For ASTERIXDB-1609, add UNION type check in writeField, and add one
more case for orderedList.
2. For OrderedList bug, change UnorderedListBuilder to
OrderedListBuilder.

Change-Id: Ia27148cb10206b93dabf7655aed68f3004f96dfd
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1339
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Wail Alkowaileet <wael.y.k@gmail.com>
Reviewed-by: Steven Jacobs <sjaco002@ucr.edu>
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/TweetParser.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/TweetParser.java
index 8d483dc..d23d490 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/TweetParser.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/TweetParser.java
@@ -22,14 +22,16 @@
 import org.apache.asterix.builders.IARecordBuilder;
 import org.apache.asterix.builders.IAsterixListBuilder;
 import org.apache.asterix.builders.ListBuilderFactory;
+import org.apache.asterix.builders.OrderedListBuilder;
 import org.apache.asterix.builders.RecordBuilderFactory;
-import org.apache.asterix.builders.UnorderedListBuilder;
 import org.apache.asterix.external.api.IRawRecord;
 import org.apache.asterix.external.api.IRecordDataParser;
 import org.apache.asterix.om.base.AMutablePoint;
 import org.apache.asterix.om.base.ANull;
+import org.apache.asterix.om.types.AOrderedListType;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.AUnionType;
 import org.apache.asterix.om.types.BuiltinType;
 import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.om.util.container.IObjectPool;
@@ -60,51 +62,80 @@
         aPoint = new AMutablePoint(0, 0);
     }
 
-    private void parseUnorderedList(JSONArray jArray, DataOutput output) throws IOException, JSONException {
+    private void parseJSONArray(JSONArray jArray, DataOutput output, AOrderedListType orderedListType)
+            throws IOException, JSONException {
         ArrayBackedValueStorage itemBuffer = getTempBuffer();
-        UnorderedListBuilder unorderedListBuilder = (UnorderedListBuilder) getUnorderedListBuilder();
+        OrderedListBuilder orderedList = (OrderedListBuilder) getOrderedListBuilder();
 
-        unorderedListBuilder.reset(null);
+        orderedList.reset(orderedListType);
         for (int iter1 = 0; iter1 < jArray.length(); iter1++) {
             itemBuffer.reset();
-            if (writeField(jArray.get(iter1), null, itemBuffer.getDataOutput())) {
-                unorderedListBuilder.addItem(itemBuffer);
+            if (writeField(jArray.get(iter1), orderedListType == null ? null : orderedListType.getItemType(),
+                    itemBuffer.getDataOutput())) {
+                orderedList.addItem(itemBuffer);
             }
         }
-        unorderedListBuilder.write(output, true);
+        orderedList.write(output, true);
     }
 
-    private boolean writeField(Object fieldObj, IAType fieldType, DataOutput out) throws IOException, JSONException {
+    private boolean writeFieldWithFieldType(Object fieldObj, IAType fieldType, DataOutput out)
+            throws HyracksDataException {
         boolean writeResult = true;
-        if (fieldType != null) {
-            switch (fieldType.getTypeTag()) {
+        IAType chkFieldType;
+        chkFieldType = fieldType instanceof AUnionType ? ((AUnionType) fieldType).getActualType() : fieldType;
+        try {
+            switch (chkFieldType.getTypeTag()) {
                 case STRING:
-                    out.write(BuiltinType.ASTRING.getTypeTag().serialize());
+                    out.write(fieldType.getTypeTag().serialize());
                     utf8Writer.writeUTF8(fieldObj.toString(), out);
                     break;
                 case INT64:
-                    aInt64.setValue((long) fieldObj);
+                    out.write(fieldType.getTypeTag().serialize());
+                    if (fieldObj instanceof Integer) {
+                        out.writeLong(((Integer) fieldObj).longValue());
+                    } else {
+                        out.writeLong((Long) fieldObj);
+                    }
                     int64Serde.serialize(aInt64, out);
                     break;
                 case INT32:
-                    out.write(BuiltinType.AINT32.getTypeTag().serialize());
+                    out.write(fieldType.getTypeTag().serialize());
                     out.writeInt((Integer) fieldObj);
                     break;
                 case DOUBLE:
-                    out.write(BuiltinType.ADOUBLE.getTypeTag().serialize());
+                    out.write(fieldType.getTypeTag().serialize());
                     out.writeDouble((Double) fieldObj);
                     break;
                 case BOOLEAN:
-                    out.write(BuiltinType.ABOOLEAN.getTypeTag().serialize());
+                    out.write(fieldType.getTypeTag().serialize());
                     out.writeBoolean((Boolean) fieldObj);
                     break;
                 case RECORD:
-                    writeRecord((JSONObject) fieldObj, out, (ARecordType) fieldType);
+                    if (((JSONObject) fieldObj).length() != 0) {
+                        writeResult = writeRecord((JSONObject) fieldObj, out, (ARecordType) chkFieldType);
+                    } else {
+                        writeResult = false;
+                    }
+                    break;
+                case ORDEREDLIST:
+                    if (((JSONArray) fieldObj).length() != 0) {
+                        parseJSONArray((JSONArray) fieldObj, out, (AOrderedListType) chkFieldType);
+                    } else {
+                        writeResult = false;
+                    }
                     break;
                 default:
                     writeResult = false;
             }
-        } else {
+        } catch (IOException | JSONException e) {
+            throw new HyracksDataException(e);
+        }
+        return writeResult;
+    }
+
+    private boolean writeFieldWithoutFieldType(Object fieldObj, DataOutput out) throws HyracksDataException {
+        boolean writeResult = true;
+        try {
             if (fieldObj == JSONObject.NULL) {
                 nullSerde.serialize(ANull.NULL, out);
             } else if (fieldObj instanceof Integer) {
@@ -124,21 +155,28 @@
                 utf8Writer.writeUTF8((String) fieldObj, out);
             } else if (fieldObj instanceof JSONArray) {
                 if (((JSONArray) fieldObj).length() != 0) {
-                    parseUnorderedList((JSONArray) fieldObj, out);
+                    parseJSONArray((JSONArray) fieldObj, out, null);
                 } else {
                     writeResult = false;
                 }
             } else if (fieldObj instanceof JSONObject) {
                 if (((JSONObject) fieldObj).length() != 0) {
-                    writeRecord((JSONObject) fieldObj, out, null);
+                    writeResult = writeRecord((JSONObject) fieldObj, out, null);
                 } else {
                     writeResult = false;
                 }
             }
+        } catch (IOException | JSONException e) {
+            throw new HyracksDataException(e);
         }
         return writeResult;
     }
 
+    private boolean writeField(Object fieldObj, IAType fieldType, DataOutput out) throws HyracksDataException {
+        return fieldType == null ? writeFieldWithoutFieldType(fieldObj, out)
+                : writeFieldWithFieldType(fieldObj, fieldType, out);
+    }
+
     private int checkAttrNameIdx(String[] nameList, String name) {
         int idx = 0;
         if (nameList != null) {
@@ -152,11 +190,13 @@
         return -1;
     }
 
-    public void writeRecord(JSONObject obj, DataOutput out, ARecordType curRecType) throws IOException, JSONException {
+    public boolean writeRecord(JSONObject obj, DataOutput out, ARecordType curRecType)
+            throws IOException, JSONException {
         IAType[] curTypes = null;
         String[] curFNames = null;
         int fieldN;
         int attrIdx;
+        boolean writeRecord = false;
 
         ArrayBackedValueStorage fieldValueBuffer = getTempBuffer();
         ArrayBackedValueStorage fieldNameBuffer = getTempBuffer();
@@ -177,7 +217,8 @@
                 fieldValueBuffer.reset();
                 DataOutput fieldOutput = fieldValueBuffer.getDataOutput();
                 if (obj.isNull(curFNames[iter1])) {
-                    if (curRecType.isClosedField(curFNames[iter1])) {
+                    if (curRecType.getFieldType(curFNames[iter1]) != null
+                            && !(curRecType.getFieldType(curFNames[iter1]) instanceof AUnionType)) {
                         throw new HyracksDataException("Closed field " + curFNames[iter1] + " has null value.");
                     } else {
                         continue;
@@ -185,12 +226,12 @@
                 } else {
                     if (writeField(obj.get(curFNames[iter1]), curTypes[iter1], fieldOutput)) {
                         recBuilder.addField(iter1, fieldValueBuffer);
+                        writeRecord = true;
                     }
                 }
             }
         } else {
             //open record type
-            int closedFieldCount = 0;
             IAType curFieldType = null;
             for (String attrName : JSONObject.getNames(obj)) {
                 if (obj.isNull(attrName) || obj.length() == 0) {
@@ -204,29 +245,29 @@
                 fieldNameBuffer.reset();
                 DataOutput fieldOutput = fieldValueBuffer.getDataOutput();
                 if (writeField(obj.get(attrName), curFieldType, fieldOutput)) {
+                    writeRecord = true;
                     if (attrIdx == -1) {
                         aString.setValue(attrName);
                         stringSerde.serialize(aString, fieldNameBuffer.getDataOutput());
                         recBuilder.addField(fieldNameBuffer, fieldValueBuffer);
                     } else {
                         recBuilder.addField(attrIdx, fieldValueBuffer);
-                        closedFieldCount++;
                     }
                 }
             }
-            if (curRecType != null && closedFieldCount < curFNames.length) {
-                throw new HyracksDataException("Non-null field is null");
-            }
         }
-        recBuilder.write(out, true);
+        if (writeRecord) {
+            recBuilder.write(out, true);
+        }
+        return writeRecord;
     }
 
     private IARecordBuilder getRecordBuilder() {
         return recordBuilderPool.allocate(ATypeTag.RECORD);
     }
 
-    private IAsterixListBuilder getUnorderedListBuilder() {
-        return listBuilderPool.allocate(ATypeTag.UNORDEREDLIST);
+    private IAsterixListBuilder getOrderedListBuilder() {
+        return listBuilderPool.allocate(ATypeTag.ORDEREDLIST);
     }
 
     private ArrayBackedValueStorage getTempBuffer() {