support dynamic recursive cast for record type

git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_opentype@321 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-app/src/test/resources/runtimets/queries/dml/opentype-c2o-recursive.aql b/asterix-app/src/test/resources/runtimets/queries/dml/opentype-c2o-recursive.aql
new file mode 100644
index 0000000..5b92f28
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/dml/opentype-c2o-recursive.aql
@@ -0,0 +1,57 @@
+/* 
+ * Test case Name  : opentype-o2c-recursive.aql
+ * Description     : verify the static casting of nest record constants 
+ * Expected Result : Success
+ */
+
+drop dataverse testdv2 if exists;
+create dataverse testdv2;
+use dataverse testdv2;
+
+
+create type AddressType as open{
+  street: string,
+  city: string
+}
+
+create type testtype as closed {
+  name: string,
+  id: string,
+  address: AddressType?
+}
+
+create type testtype2 as open {
+  name: string,
+  id: string
+}
+
+create dataset testds(testtype) partitioned by key id;
+
+create dataset testds2(testtype2) partitioned by key id;
+
+insert into dataset testds (
+{ "id": "001", "name": "Person One", "address": {"street": "3019 DBH",  "city": "Irvine", "zip": 92697} }
+);
+
+insert into dataset testds (
+{ "id": "002", "name": "Person Two" }
+);
+
+insert into dataset testds (
+{ "id": "003", "name": "Person Three", "address": {"street": "2019 DBH",  "city": "Irvine"} }
+);
+
+insert into dataset testds (
+{ "id": "004", "name": "Person Four", "address": {"street": "1019 DBH",  "city": "irvine", "property": {"zip": 92697, "review": "positive" }  } }
+);
+
+insert into dataset testds2 (
+ for $d in dataset("testds") 
+	return $d
+);
+
+write output to nc1:"rttest/dml_opentype-c2o-recursive.adm";
+
+for $d in dataset("testds2") 
+order by $d.id
+return $d
diff --git a/asterix-app/src/test/resources/runtimets/queries/dml/opentype-o2c-recursive.aql b/asterix-app/src/test/resources/runtimets/queries/dml/opentype-o2c-recursive.aql
new file mode 100644
index 0000000..493b2cc
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/dml/opentype-o2c-recursive.aql
@@ -0,0 +1,57 @@
+/* 
+ * Test case Name  : opentype-o2c-recursive.aql
+ * Description     : verify the static casting of nest record constants 
+ * Expected Result : Success
+ */
+
+drop dataverse testdv2 if exists;
+create dataverse testdv2;
+use dataverse testdv2;
+
+
+create type AddressType as open{
+  street: string,
+  city: string
+}
+
+create type testtype as open {
+  name: string,
+  id: string
+}
+
+create type testtype2 as closed {
+  name: string,
+  id: string,
+  address: AddressType?
+}
+
+create dataset testds(testtype) partitioned by key id;
+
+create dataset testds2(testtype2) partitioned by key id;
+
+insert into dataset testds (
+{ "id": "001", "name": "Person One", "address": {"street": "3019 DBH",  "city": "Irvine", "zip": 92697} }
+);
+
+insert into dataset testds (
+{ "id": "002", "name": "Person Two" }
+);
+
+insert into dataset testds (
+{ "id": "003", "name": "Person Three", "address": {"street": "2019 DBH",  "city": "Irvine"} }
+);
+
+insert into dataset testds (
+{ "id": "004", "name": "Person Four", "address": {"street": "1019 DBH",  "city": "irvine", "property": {"zip": 92697, "review": "positive" }  } }
+);
+
+insert into dataset testds2 (
+ for $d in dataset("testds") 
+	return $d
+);
+
+write output to nc1:"rttest/dml_opentype-o2c-recursive.adm";
+
+for $d in dataset("testds2") 
+order by $d.id
+return $d
diff --git a/asterix-app/src/test/resources/runtimets/queries/nestrecords/nestrecord.aql b/asterix-app/src/test/resources/runtimets/queries/nestrecords/nestrecord.aql
index 6c4c87b..c0b5e06 100644
--- a/asterix-app/src/test/resources/runtimets/queries/nestrecords/nestrecord.aql
+++ b/asterix-app/src/test/resources/runtimets/queries/nestrecords/nestrecord.aql
@@ -1,6 +1,6 @@
 /* 
- * Test case Name  : opentype-closed-optional.aql
- * Description     : verify that closed type can have optional fields
+ * Test case Name  : nestrecord.aql
+ * Description     : verify the static casting of nest record constants 
  * Expected Result : Success
  */
 
diff --git a/asterix-app/src/test/resources/runtimets/results/dml/opentype-c2o-recursive.adm b/asterix-app/src/test/resources/runtimets/results/dml/opentype-c2o-recursive.adm
new file mode 100644
index 0000000..8088fff
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/dml/opentype-c2o-recursive.adm
@@ -0,0 +1,4 @@
+{ "name": "Person One", "id": "001", "address": { "street": "3019 DBH", "city": "Irvine", "zip": 92697 } }
+{ "name": "Person Two", "id": "002", "address": null }
+{ "name": "Person Three", "id": "003", "address": { "street": "2019 DBH", "city": "Irvine" } }
+{ "name": "Person Four", "id": "004", "address": { "street": "1019 DBH", "city": "irvine", "property": { "zip": 92697, "review": "positive" } } }
diff --git a/asterix-app/src/test/resources/runtimets/results/dml/opentype-o2c-recursive.adm b/asterix-app/src/test/resources/runtimets/results/dml/opentype-o2c-recursive.adm
new file mode 100644
index 0000000..8088fff
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/dml/opentype-o2c-recursive.adm
@@ -0,0 +1,4 @@
+{ "name": "Person One", "id": "001", "address": { "street": "3019 DBH", "city": "Irvine", "zip": 92697 } }
+{ "name": "Person Two", "id": "002", "address": null }
+{ "name": "Person Three", "id": "003", "address": { "street": "2019 DBH", "city": "Irvine" } }
+{ "name": "Person Four", "id": "004", "address": { "street": "1019 DBH", "city": "irvine", "property": { "zip": 92697, "review": "positive" } } }
diff --git a/asterix-om/src/main/java/edu/uci/ics/asterix/dataflow/data/nontagged/serde/ARecordSerializerDeserializer.java b/asterix-om/src/main/java/edu/uci/ics/asterix/dataflow/data/nontagged/serde/ARecordSerializerDeserializer.java
index 5494669..7e3dbdb 100644
--- a/asterix-om/src/main/java/edu/uci/ics/asterix/dataflow/data/nontagged/serde/ARecordSerializerDeserializer.java
+++ b/asterix-om/src/main/java/edu/uci/ics/asterix/dataflow/data/nontagged/serde/ARecordSerializerDeserializer.java
@@ -33,9 +33,9 @@
     private ARecordType recordType;
     private int numberOfSchemaFields = 0;
 
-    @SuppressWarnings("unchecked")
+    @SuppressWarnings("rawtypes")
     private ISerializerDeserializer serializers[];
-    @SuppressWarnings("unchecked")
+    @SuppressWarnings("rawtypes")
     private ISerializerDeserializer deserializers[];
 
     private ARecordSerializerDeserializer() {
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/accessors/ARecordAccessor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/accessors/ARecordAccessor.java
index e05c67a..bad1acb 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/accessors/ARecordAccessor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/accessors/ARecordAccessor.java
@@ -29,6 +29,7 @@
 import edu.uci.ics.asterix.om.types.EnumDeserializer;
 import edu.uci.ics.asterix.om.types.IAType;
 import edu.uci.ics.asterix.om.util.NonTaggedFormatUtil;
+import edu.uci.ics.asterix.runtime.accessors.base.DefaultOpenFieldType;
 import edu.uci.ics.asterix.runtime.accessors.base.IBinaryAccessor;
 import edu.uci.ics.asterix.runtime.accessors.visitor.IBinaryAccessorVisitor;
 import edu.uci.ics.asterix.runtime.util.ResettableByteArrayOutputStream;
@@ -72,11 +73,6 @@
     private int start;
     private int len;
 
-    // nested open field rec type
-    // private static ARecordType nestedOpenRecType = new
-    // ARecordType("nested-open", new String[] {}, new IAType[] {},
-    // true);
-
     public ARecordAccessor(ARecordType inputType) {
         this.inputRecType = inputType;
         IAType[] fieldTypes = inputType.getFieldTypes();
@@ -90,6 +86,12 @@
             for (int i = 0; i < numberOfSchemaFields; i++) {
                 ATypeTag ftypeTag = fieldTypes[i].getTypeTag();
 
+                if (fieldTypes[i].getTypeTag() == ATypeTag.UNION
+                        && NonTaggedFormatUtil.isOptionalField((AUnionType) fieldTypes[i]))
+                    // optional field: add the embedded non-null type tag
+                    ftypeTag = ((AUnionType) fieldTypes[i]).getUnionList()
+                            .get(NonTaggedFormatUtil.OPTIONAL_TYPE_INDEX_IN_UNION_LIST).getTypeTag();
+
                 // add type tag Reference
                 int tagStart = typeBos.size();
                 typeDos.writeByte(ftypeTag.serialize());
@@ -124,10 +126,10 @@
     private void reset() {
         typeBos.setByteArray(typeBuffer, closedPartTypeInfoSize);
         dataBos.setByteArray(dataBuffer, 0);
-        //reset the allocator
+        // reset the allocator
         allocator.reset();
 
-        //clean up the returned containers
+        // clean up the returned containers
         for (int i = fieldNames.size() - 1; i >= numberOfSchemaFields; i--)
             fieldNames.remove(i);
         for (int i = fieldTypeTags.size() - 1; i >= numberOfSchemaFields; i--)
@@ -191,10 +193,12 @@
                     IAType[] fieldTypes = inputRecType.getFieldTypes();
                     int fieldValueLength = 0;
 
+                    IAType fieldType = fieldTypes[fieldNumber];
                     if (fieldTypes[fieldNumber].getTypeTag() == ATypeTag.UNION) {
                         if (NonTaggedFormatUtil.isOptionalField((AUnionType) fieldTypes[fieldNumber])) {
-                            typeTag = ((AUnionType) fieldTypes[fieldNumber]).getUnionList()
-                                    .get(NonTaggedFormatUtil.OPTIONAL_TYPE_INDEX_IN_UNION_LIST).getTypeTag();
+                            fieldType = ((AUnionType) fieldTypes[fieldNumber]).getUnionList().get(
+                                    NonTaggedFormatUtil.OPTIONAL_TYPE_INDEX_IN_UNION_LIST);
+                            typeTag = fieldType.getTypeTag();
                             fieldValueLength = NonTaggedFormatUtil.getFieldValueLength(b, fieldOffsets[fieldNumber],
                                     typeTag, false);
                         }
@@ -208,7 +212,7 @@
                     dataDos.writeByte(typeTag.serialize());
                     dataDos.write(b, fieldOffsets[fieldNumber], fieldValueLength);
                     int fend = dataBos.size();
-                    IBinaryAccessor fieldValue = allocator.allocateFieldValue(fieldTypes[fieldNumber]);
+                    IBinaryAccessor fieldValue = allocator.allocateFieldValue(fieldType);
                     fieldValue.reset(dataBuffer, fstart, fend - fstart);
                     fieldValues.add(fieldValue);
                 }
@@ -238,7 +242,14 @@
 
                     // set the field value (already including type tag)
                     fieldValueLength = NonTaggedFormatUtil.getFieldValueLength(b, fieldOffset, typeTag, true) + 1;
-                    IBinaryAccessor fieldValueAccessor = allocator.allocateFieldName();
+
+                    // allocate
+                    IBinaryAccessor fieldValueAccessor;
+
+                    if (typeTag == ATypeTag.RECORD)
+                        fieldValueAccessor = allocator.allocateFieldValue(DefaultOpenFieldType.NESTED_OPEN_RECORD_TYPE);
+                    else
+                        fieldValueAccessor = allocator.allocateFieldValue(null);
                     fieldValueAccessor.reset(b, fieldOffset, fieldValueLength);
                     fieldValues.add(fieldValueAccessor);
                     fieldOffset += fieldValueLength;
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/accessors/AccessorAllocator.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/accessors/AccessorAllocator.java
index a3fd0d7..c177b1f 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/accessors/AccessorAllocator.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/accessors/AccessorAllocator.java
@@ -24,7 +24,9 @@
     }
 
     public IBinaryAccessor allocateFieldValue(IAType type) {
-        if (type.getTypeTag().equals(ATypeTag.RECORD))
+        if(type == null)
+            return flatArtifactAllocator.allocate(null);
+        else if (type.getTypeTag().equals(ATypeTag.RECORD))
             return nestedRecValueAllocator.allocate(type);
         else if (type.getTypeTag().equals(ATypeTag.UNORDEREDLIST) || type.getTypeTag().equals(ATypeTag.ORDEREDLIST))
             return nestedListValueAllocator.allocate(type);
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/accessors/base/DefaultOpenFieldType.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/accessors/base/DefaultOpenFieldType.java
new file mode 100644
index 0000000..b7eef18
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/accessors/base/DefaultOpenFieldType.java
@@ -0,0 +1,10 @@
+package edu.uci.ics.asterix.runtime.accessors.base;
+
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.IAType;
+
+public class DefaultOpenFieldType {
+
+    // nested open field rec type
+    public static ARecordType NESTED_OPEN_RECORD_TYPE = new ARecordType("nested-open", new String[] {}, new IAType[] {}, true);
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/accessors/cast/ARecordCaster.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/accessors/cast/ARecordCaster.java
index a3f31bb..8face9c 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/accessors/cast/ARecordCaster.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/accessors/cast/ARecordCaster.java
@@ -27,10 +27,12 @@
 import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.asterix.om.types.ATypeTag;
 import edu.uci.ics.asterix.om.types.AUnionType;
+import edu.uci.ics.asterix.om.types.EnumDeserializer;
 import edu.uci.ics.asterix.om.types.IAType;
 import edu.uci.ics.asterix.om.util.NonTaggedFormatUtil;
 import edu.uci.ics.asterix.runtime.accessors.AFlatValueAccessor;
 import edu.uci.ics.asterix.runtime.accessors.ARecordAccessor;
+import edu.uci.ics.asterix.runtime.accessors.base.DefaultOpenFieldType;
 import edu.uci.ics.asterix.runtime.accessors.base.IBinaryAccessor;
 import edu.uci.ics.asterix.runtime.util.ResettableByteArrayOutputStream;
 import edu.uci.ics.hyracks.algebricks.common.utils.Triple;
@@ -238,10 +240,17 @@
             }
             IAType fType = cachedReqType.getFieldTypes()[i];
             nestedVisitorArg.second = fType;
-            
-            //recursively casting, the result of casting can always be thought as flat
+
+            // recursively casting, the result of casting can always be thought
+            // as flat
+            if (optionalFields[i]) {
+                nestedVisitorArg.second = ((AUnionType) fType).getUnionList().get(
+                        NonTaggedFormatUtil.OPTIONAL_TYPE_INDEX_IN_UNION_LIST);
+            }
             field.accept(visitor, nestedVisitorArg);
             recBuilder.addField(i, nestedVisitorArg.first);
+            //reset the req type
+            nestedVisitorArg.second = null;
         }
 
         // write the open part
@@ -249,7 +258,17 @@
             if (openFields[i]) {
                 IBinaryAccessor name = fieldNames.get(i);
                 IBinaryAccessor field = fieldValues.get(i);
-                recBuilder.addField(name, field);
+                IBinaryAccessor fieldTypeTag = fieldTypeTags.get(i);
+
+                ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER
+                        .deserialize(fieldTypeTag.getBytes()[fieldTypeTag.getStartIndex()]);
+                if (typeTag.equals(ATypeTag.RECORD))
+                    nestedVisitorArg.second = DefaultOpenFieldType.NESTED_OPEN_RECORD_TYPE;
+                field.accept(visitor, nestedVisitorArg);
+                recBuilder.addField(name, nestedVisitorArg.first);
+                
+                //reset the req type
+                nestedVisitorArg.second = null;
             }
         }
         recBuilder.write(output, true);