Improve Schemaless Record Serializer Deserializer.

Change-Id: I25c5054a361128a3bee4241d7b9b40da7e61373f
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1098
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
Reviewed-by: Michael Blow <mblow@apache.org>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/extension/IExtensionStatement.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/extension/IExtensionStatement.java
index 9e2241f..e88962a 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/extension/IExtensionStatement.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/extension/IExtensionStatement.java
@@ -21,7 +21,9 @@
 import org.apache.asterix.lang.common.base.Statement;
 import org.apache.asterix.metadata.declared.AqlMetadataProvider;
 import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 /**
  * An interface that provides an extension mechanism to extend a language with additional statements
@@ -44,5 +46,5 @@
      * @throws Exception
      */
     void handle(IStatementExecutor statementExecutor, AqlMetadataProvider metadataProvider,
-            IHyracksClientConnection hcc) throws Exception;
+            IHyracksClientConnection hcc) throws HyracksDataException, AlgebricksException;
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixExtension.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixExtension.java
index d1d6e0c..57a3c79 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixExtension.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixExtension.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.common.config;
 
+import java.util.Collections;
 import java.util.List;
 
 import org.apache.commons.lang3.ObjectUtils;
@@ -32,6 +33,11 @@
         this.args = args;
     }
 
+    public AsterixExtension(String className) {
+        this.className = className;
+        this.args = Collections.emptyList();
+    }
+
     public List<Pair<String, String>> getArgs() {
         return args;
     }
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/LangRecordParseUtil.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/LangRecordParseUtil.java
index 94144c6..a604315 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/LangRecordParseUtil.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/LangRecordParseUtil.java
@@ -68,7 +68,7 @@
                 parseLiteral((LiteralExpr) expr, serialized);
                 break;
             case RECORD_CONSTRUCTOR_EXPRESSION:
-                parseRecord((RecordConstructor) expr, serialized);
+                parseRecord((RecordConstructor) expr, serialized, true);
                 break;
             case LIST_CONSTRUCTOR_EXPRESSION:
                 parseList((ListConstructor) expr, serialized);
@@ -82,7 +82,7 @@
         }
     }
 
-    public static void parseRecord(RecordConstructor recordValue, ArrayBackedValueStorage serialized)
+    public static void parseRecord(RecordConstructor recordValue, ArrayBackedValueStorage serialized, boolean tagged)
             throws HyracksDataException {
         AMutableString fieldNameString = new AMutableString(null);
         ArrayBackedValueStorage fieldName = new ArrayBackedValueStorage();
@@ -112,7 +112,7 @@
             parseExpression(fb.getRightExpr(), fieldValue);
             recordBuilder.addField(fieldName, fieldValue);
         }
-        recordBuilder.write(serialized.getDataOutput(), true);
+        recordBuilder.write(serialized.getDataOutput(), tagged);
     }
 
     private static void parseList(ListConstructor valueExpr, ArrayBackedValueStorage serialized)
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/ARecordSerializerDeserializer.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/ARecordSerializerDeserializer.java
index c922054..c69c89e 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/ARecordSerializerDeserializer.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/ARecordSerializerDeserializer.java
@@ -16,7 +16,6 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
 package org.apache.asterix.dataflow.data.nontagged.serde;
 
 import java.io.DataInput;
@@ -32,13 +31,14 @@
 import org.apache.asterix.om.base.AMissing;
 import org.apache.asterix.om.base.ANull;
 import org.apache.asterix.om.base.ARecord;
+import org.apache.asterix.om.base.AString;
 import org.apache.asterix.om.base.IAObject;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.asterix.om.types.AUnionType;
+import org.apache.asterix.om.types.BuiltinType;
 import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.om.util.NonTaggedFormatUtil;
-import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
 import org.apache.hyracks.api.dataflow.value.IBinaryHashFunction;
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
@@ -153,7 +153,7 @@
                     IAObject[] mergedFields = mergeFields(closedFields, openFields);
                     return new ARecord(mergedRecordType, mergedFields);
                 } else {
-                    return new ARecord(this.recordType, openFields);
+                    return new ARecord(openPartRecType, openFields);
                 }
             } else {
                 return new ARecord(this.recordType, closedFields);
@@ -184,10 +184,32 @@
             }
             recordBuilder.write(out, writeTypeTag);
         } else {
-            throw new NotImplementedException("Serializer for schemaless records is not implemented.");
+            serializeSchemalessRecord(instance, out, writeTypeTag);
         }
     }
 
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    public static void serializeSchemalessRecord(ARecord record, DataOutput dataOutput, boolean writeTypeTag)
+            throws HyracksDataException {
+        ISerializerDeserializer<AString> stringSerde = AqlSerializerDeserializerProvider.INSTANCE
+                .getSerializerDeserializer(BuiltinType.ASTRING);
+        RecordBuilder confRecordBuilder = new RecordBuilder();
+        confRecordBuilder.reset(ARecordType.FULLY_OPEN_RECORD_TYPE);
+        ArrayBackedValueStorage fieldNameBytes = new ArrayBackedValueStorage();
+        ArrayBackedValueStorage fieldValueBytes = new ArrayBackedValueStorage();
+        for (int i = 0; i < record.getType().getFieldNames().length; i++) {
+            String fieldName = record.getType().getFieldNames()[i];
+            fieldValueBytes.reset();
+            fieldNameBytes.reset();
+            stringSerde.serialize(new AString(fieldName), fieldNameBytes.getDataOutput());
+            ISerializerDeserializer valueSerde = AqlSerializerDeserializerProvider.INSTANCE
+                    .getSerializerDeserializer(record.getType().getFieldTypes()[i]);
+            valueSerde.serialize(record.getValueByPos(i), fieldValueBytes.getDataOutput());
+            confRecordBuilder.addField(fieldNameBytes, fieldValueBytes);
+        }
+        confRecordBuilder.write(dataOutput, writeTypeTag);
+    }
+
     private IAObject[] mergeFields(IAObject[] closedFields, IAObject[] openFields) {
         IAObject[] fields = new IAObject[closedFields.length + openFields.length];
         int i = 0;
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/base/ARecord.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/base/ARecord.java
index 79dd63c..c35f000 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/base/ARecord.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/base/ARecord.java
@@ -26,6 +26,7 @@
 import org.json.JSONObject;
 
 public class ARecord implements IAObject {
+    public static final ARecord EMPTY_OPEN_RECORD = new ARecord(ARecordType.FULLY_OPEN_RECORD_TYPE, new IAObject[] {});
 
     protected ARecordType type;
     protected IAObject[] fields;