1. fix optional field related issues 2. add static casting for constant records

git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_opentype@280 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/TopDownTypeInferenceRule.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/TopDownTypeInferenceRule.java
index 4097d3a..08b50cc 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/TopDownTypeInferenceRule.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/TopDownTypeInferenceRule.java
@@ -4,11 +4,19 @@
 import java.util.List;
 
 import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
 
 import edu.uci.ics.asterix.metadata.declared.AqlDataSource;
+import edu.uci.ics.asterix.om.base.ANull;
+import edu.uci.ics.asterix.om.base.AString;
+import edu.uci.ics.asterix.om.constants.AsterixConstantValue;
 import edu.uci.ics.asterix.om.typecomputer.base.TypeComputerUtilities;
 import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.AUnionType;
+import edu.uci.ics.asterix.om.types.BuiltinType;
 import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.om.util.NonTaggedFormatUtil;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
@@ -17,6 +25,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
@@ -100,6 +109,7 @@
                                 inputRecordType);
                         changed &= !requiredRecordType.equals(inputRecordType);
                         if (changed) {
+                            staticTypeCast(funcExpr, requiredRecordType, inputRecordType);
                             List<Mutable<ILogicalExpression>> args = funcExpr.getArguments();
                             int openPartStart = requiredRecordType.getFieldTypes().length * 2;
                             for (int j = openPartStart; j < args.size(); j++) {
@@ -121,4 +131,127 @@
         } while (currentOperator != null);
         return changed;
     }
+
+    private void staticTypeCast(ScalarFunctionCallExpression func, ARecordType reqType, ARecordType inputType) {
+        IAType[] reqFieldTypes = reqType.getFieldTypes();
+        String[] reqFieldNames = reqType.getFieldNames();
+        IAType[] inputFieldTypes = inputType.getFieldTypes();
+        String[] inputFieldNames = inputType.getFieldNames();
+
+        int[] fieldPermutation = new int[reqFieldTypes.length];
+        boolean[] nullFields = new boolean[reqFieldTypes.length];
+        boolean[] openFields = new boolean[inputFieldTypes.length];
+
+        for (int i = 0; i < nullFields.length; i++)
+            nullFields[i] = false;
+        for (int i = 0; i < openFields.length; i++)
+            openFields[i] = true;
+        for (int i = 0; i < fieldPermutation.length; i++)
+            fieldPermutation[i] = -1;
+
+        // forward match: match from actual to required
+        boolean matched = false;
+        for (int i = 0; i < inputFieldNames.length; i++) {
+            String fieldName = inputFieldNames[i];
+            IAType fieldType = inputFieldTypes[i];
+            matched = false;
+            for (int j = 0; j < reqFieldNames.length; j++) {
+                String reqFieldName = reqFieldNames[j];
+                IAType reqFieldType = reqFieldTypes[j];
+                if (fieldName.equals(reqFieldName)) {
+                    if (fieldType.equals(reqFieldType)) {
+                        fieldPermutation[j] = i;
+                        openFields[i] = false;
+                        matched = true;
+                        break;
+                    }
+
+                    // match the optional field
+                    if (reqFieldType.getTypeTag() == ATypeTag.UNION
+                            && NonTaggedFormatUtil.isOptionalField((AUnionType) reqFieldType)) {
+                        IAType itemType = ((AUnionType) reqFieldType).getUnionList().get(
+                                NonTaggedFormatUtil.OPTIONAL_TYPE_INDEX_IN_UNION_LIST);
+                        if (fieldType.equals(BuiltinType.ANULL) || fieldType.equals(itemType)) {
+                            fieldPermutation[j] = i;
+                            openFields[i] = false;
+                            matched = true;
+                            break;
+                        }
+                    }
+                }
+            }
+            if (matched)
+                continue;
+            // the input has extra fields
+            if (!reqType.isOpen())
+                throw new IllegalStateException("static type mismatch: including extra closed fields");
+        }
+
+        // backward match: match from required to actual
+        for (int i = 0; i < reqFieldNames.length; i++) {
+            String reqFieldName = reqFieldNames[i];
+            IAType reqFieldType = reqFieldTypes[i];
+            matched = false;
+            for (int j = 0; j < inputFieldNames.length; j++) {
+                String fieldName = inputFieldNames[j];
+                IAType fieldType = inputFieldTypes[j];
+                if (fieldName.equals(reqFieldName)) {
+                    if (fieldType.equals(reqFieldType)) {
+                        matched = true;
+                        break;
+                    }
+
+                    // match the optional field
+                    if (reqFieldType.getTypeTag() == ATypeTag.UNION
+                            && NonTaggedFormatUtil.isOptionalField((AUnionType) reqFieldType)) {
+                        IAType itemType = ((AUnionType) reqFieldType).getUnionList().get(
+                                NonTaggedFormatUtil.OPTIONAL_TYPE_INDEX_IN_UNION_LIST);
+                        if (fieldType.equals(BuiltinType.ANULL) || fieldType.equals(itemType)) {
+                            matched = true;
+                            break;
+                        }
+                    }
+                }
+            }
+            if (matched)
+                continue;
+
+            IAType t = reqFieldTypes[i];
+            if (t.getTypeTag() == ATypeTag.UNION && NonTaggedFormatUtil.isOptionalField((AUnionType) t)) {
+                // add a null field
+                nullFields[i] = true;
+            } else {
+                // no matched field in the input for a required closed field
+                throw new IllegalStateException("static type mismatch: miss a required closed field");
+            }
+        }
+
+        List<Mutable<ILogicalExpression>> arguments = func.getArguments();
+        List<Mutable<ILogicalExpression>> argumentsClone = new ArrayList<Mutable<ILogicalExpression>>();
+        argumentsClone.addAll(arguments);
+        arguments.clear();
+        // re-order the closed part and fill in null fields
+        for (int i = 0; i < fieldPermutation.length; i++) {
+            int pos = fieldPermutation[i];
+            if (pos >= 0) {
+                arguments.add(argumentsClone.get(2 * pos));
+                arguments.add(argumentsClone.get(2 * pos + 1));
+            }
+            if (nullFields[i]) {
+                // add a null field
+                arguments.add(new MutableObject<ILogicalExpression>(new ConstantExpression(new AsterixConstantValue(
+                        new AString(reqFieldNames[i])))));
+                arguments.add(new MutableObject<ILogicalExpression>(new ConstantExpression(new AsterixConstantValue(
+                        ANull.NULL))));
+            }
+        }
+
+        // add the open part
+        for (int i = 0; i < openFields.length; i++) {
+            if (openFields[i]) {
+                arguments.add(argumentsClone.get(2 * i));
+                arguments.add(argumentsClone.get(2 * i + 1));
+            }
+        }
+    }
 }
diff --git a/asterix-app/src/test/resources/runtimets/queries/dml/insert-into-loaded-dataset-with-index_01.aql b/asterix-app/src/test/resources/runtimets/queries/dml/insert-into-loaded-dataset-with-index_01.aql
index 01e1845..7a3cce2 100644
--- a/asterix-app/src/test/resources/runtimets/queries/dml/insert-into-loaded-dataset-with-index_01.aql
+++ b/asterix-app/src/test/resources/runtimets/queries/dml/insert-into-loaded-dataset-with-index_01.aql
@@ -24,7 +24,7 @@
 let $z:=3
 return {
 	"l_orderkey": $x,
-	"l_partkey": $y,
+	"l_linenumber": $y,
 	"l_suppkey": $z
 }
 );
@@ -35,7 +35,7 @@
 let $z:=4
 return {
 	"l_orderkey": $x,
-	"l_partkey": $y,
+	"l_linenumber": $y,
 	"l_suppkey": $z
 }
 );
diff --git a/asterix-app/src/test/resources/runtimets/queries/dml/insert-into-loaded-dataset-with-index_02.aql b/asterix-app/src/test/resources/runtimets/queries/dml/insert-into-loaded-dataset-with-index_02.aql
index 2212eb9..04a5043 100644
--- a/asterix-app/src/test/resources/runtimets/queries/dml/insert-into-loaded-dataset-with-index_02.aql
+++ b/asterix-app/src/test/resources/runtimets/queries/dml/insert-into-loaded-dataset-with-index_02.aql
@@ -49,7 +49,7 @@
 	where $l.l_orderkey<10
 	return {
 		"l_orderkey": $l.l_orderkey,
-		"l_partkey": $l.l_linenumber,
+		"l_linenumber": $l.l_linenumber,
 		"l_suppkey": $l.l_partkey
 	}
 );
diff --git a/asterix-app/src/test/resources/runtimets/queries/dml/insert-into-loaded-dataset_01.aql b/asterix-app/src/test/resources/runtimets/queries/dml/insert-into-loaded-dataset_01.aql
index ac41ec7..1f211e8 100644
--- a/asterix-app/src/test/resources/runtimets/queries/dml/insert-into-loaded-dataset_01.aql
+++ b/asterix-app/src/test/resources/runtimets/queries/dml/insert-into-loaded-dataset_01.aql
@@ -21,7 +21,7 @@
 let $z:=3
 return {
 	"l_orderkey": $x,
-	"l_partkey": $y,
+	"l_linenumber": $y,
 	"l_suppkey": $z
 }
 );
@@ -32,7 +32,7 @@
 let $z:=4
 return {
 	"l_orderkey": $x,
-	"l_partkey": $y,
+	"l_linenumber": $y,
 	"l_suppkey": $z
 }
 );
diff --git a/asterix-app/src/test/resources/runtimets/queries/dml/insert.aql b/asterix-app/src/test/resources/runtimets/queries/dml/insert.aql
index 7c38a64..2b4622f 100644
--- a/asterix-app/src/test/resources/runtimets/queries/dml/insert.aql
+++ b/asterix-app/src/test/resources/runtimets/queries/dml/insert.aql
@@ -47,7 +47,7 @@
 	where $l.l_orderkey<10
 	return {
 		"l_orderkey": $l.l_orderkey,
-		"l_partkey": $l.l_linenumber,
+		"l_linenumber": $l.l_linenumber,
 		"l_suppkey": $l.l_partkey
 	}
 );
diff --git a/asterix-app/src/test/resources/runtimets/queries/dml/insert_less_nc.aql b/asterix-app/src/test/resources/runtimets/queries/dml/insert_less_nc.aql
index 0fc544d..7e133ae 100644
--- a/asterix-app/src/test/resources/runtimets/queries/dml/insert_less_nc.aql
+++ b/asterix-app/src/test/resources/runtimets/queries/dml/insert_less_nc.aql
@@ -46,7 +46,7 @@
 	where $l.l_orderkey<1000
 	return {
 		"l_orderkey": $l.l_orderkey,
-		"l_partkey": $l.l_linenumber,
+		"l_linenumber": $l.l_linenumber,
 		"l_suppkey": $l.l_partkey
 	}
 );
diff --git a/asterix-app/src/test/resources/runtimets/queries/dml/opentype-closed-optional.aql b/asterix-app/src/test/resources/runtimets/queries/dml/opentype-closed-optional.aql
new file mode 100644
index 0000000..cc3b622
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/dml/opentype-closed-optional.aql
@@ -0,0 +1,24 @@
+drop dataverse testdv2 if exists;
+create dataverse testdv2;
+use dataverse testdv2;
+
+create type testtype as closed {
+  name: string ?,
+  id: string
+}
+
+create dataset testds(testtype) partitioned by key id;
+
+insert into dataset testds (
+{ "id": "001", "name": "Person One"}
+);
+
+insert into dataset testds (
+{ "id": "002"}
+);
+
+write output to nc1:"rttest/dml_opentype-closed-optional.adm";
+
+for $d in dataset("testds") 
+order by $d.id
+return $d
diff --git a/asterix-app/src/test/resources/runtimets/queries/dml/opentype-noexpand.aql b/asterix-app/src/test/resources/runtimets/queries/dml/opentype-noexpand.aql
index 43aa5af..f8b4880 100644
--- a/asterix-app/src/test/resources/runtimets/queries/dml/opentype-noexpand.aql
+++ b/asterix-app/src/test/resources/runtimets/queries/dml/opentype-noexpand.aql
@@ -3,14 +3,18 @@
 use dataverse testdv2;
 
 create type testtype as open {
-  name: string,
+  name: string ?,
   id: string
 }
 
 create dataset testds(testtype) partitioned by key id;
 
 insert into dataset testds (
-{ "name": "Person One",  "id": "001"}
+{ "id": "001", "name": "Person One"}
+);
+
+insert into dataset testds (
+{ "id": "002"}
 );
 
 write output to nc1:"rttest/dml_opentype-noexpand.adm";
diff --git a/asterix-app/src/test/resources/runtimets/queries/dml/opentype-o2c.aql b/asterix-app/src/test/resources/runtimets/queries/dml/opentype-o2c.aql
index 832b118..d5c2195 100644
--- a/asterix-app/src/test/resources/runtimets/queries/dml/opentype-o2c.aql
+++ b/asterix-app/src/test/resources/runtimets/queries/dml/opentype-o2c.aql
@@ -10,7 +10,7 @@
 create type testtype2 as closed {
   id: string,
   name: string,
-  hobby: string
+  hobby: string?
 }
 
 create dataset testds(testtype) partitioned by key id;
diff --git a/asterix-app/src/test/resources/runtimets/queries/failure/insert.aql b/asterix-app/src/test/resources/runtimets/queries/failure/insert.aql
index 3f1daad..a0ce842 100644
--- a/asterix-app/src/test/resources/runtimets/queries/failure/insert.aql
+++ b/asterix-app/src/test/resources/runtimets/queries/failure/insert.aql
@@ -47,7 +47,7 @@
 	where $l.l_orderkey<1000
 	return {
 		"l_orderkey": $l.l_orderkey,
-		"l_partkey": $l.l_linenumber,
+		"l_linenumber": $l.l_linenumber,
 		"l_suppkey": $l.l_partkey
 	}
 );
@@ -58,7 +58,7 @@
 	die after 1000
 	return {
 		"l_orderkey": $l.l_orderkey,
-		"l_partkey": $l.l_linenumber,
+		"l_linenumber": $l.l_linenumber,
 		"l_suppkey": $l.l_partkey
 	}
 );
diff --git a/asterix-app/src/test/resources/runtimets/results/dml/opentype-closed-optional.adm b/asterix-app/src/test/resources/runtimets/results/dml/opentype-closed-optional.adm
new file mode 100644
index 0000000..78e8d93
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/dml/opentype-closed-optional.adm
@@ -0,0 +1,2 @@
+{ "name": "Person One", "id": "001" }
+{ "name": null, "id": "002" }
diff --git a/asterix-app/src/test/resources/runtimets/results/dml/opentype-noexpand.adm b/asterix-app/src/test/resources/runtimets/results/dml/opentype-noexpand.adm
index e6dc584..78e8d93 100644
--- a/asterix-app/src/test/resources/runtimets/results/dml/opentype-noexpand.adm
+++ b/asterix-app/src/test/resources/runtimets/results/dml/opentype-noexpand.adm
@@ -1 +1,2 @@
 { "name": "Person One", "id": "001" }
+{ "name": null, "id": "002" }
diff --git a/asterix-om/src/main/java/edu/uci/ics/asterix/dataflow/data/nontagged/printers/ARecordPrinterFactory.java b/asterix-om/src/main/java/edu/uci/ics/asterix/dataflow/data/nontagged/printers/ARecordPrinterFactory.java
index 184a810..fafe320 100644
--- a/asterix-om/src/main/java/edu/uci/ics/asterix/dataflow/data/nontagged/printers/ARecordPrinterFactory.java
+++ b/asterix-om/src/main/java/edu/uci/ics/asterix/dataflow/data/nontagged/printers/ARecordPrinterFactory.java
@@ -123,7 +123,7 @@
                                     fieldValueLength = NonTaggedFormatUtil.getFieldValueLength(b,
                                             fieldOffsets[fieldNumber], tag, false);
                                     fieldPrinters[fieldNumber].print(b, fieldOffsets[fieldNumber] - 1,
-                                            fieldOffsets[fieldNumber], ps);
+                                            fieldValueLength, ps);
                                 }
                             } else {
                                 tag = fieldTypes[fieldNumber].getTypeTag();
diff --git a/asterix-om/src/main/java/edu/uci/ics/asterix/om/typecomputer/impl/ClosedRecordConstructorResultType.java b/asterix-om/src/main/java/edu/uci/ics/asterix/om/typecomputer/impl/ClosedRecordConstructorResultType.java
index 3fcf1c8..6a17a46 100644
--- a/asterix-om/src/main/java/edu/uci/ics/asterix/om/typecomputer/impl/ClosedRecordConstructorResultType.java
+++ b/asterix-om/src/main/java/edu/uci/ics/asterix/om/typecomputer/impl/ClosedRecordConstructorResultType.java
@@ -7,6 +7,7 @@
 import edu.uci.ics.asterix.om.base.AString;
 import edu.uci.ics.asterix.om.constants.AsterixConstantValue;
 import edu.uci.ics.asterix.om.typecomputer.base.IResultTypeComputer;
+import edu.uci.ics.asterix.om.typecomputer.base.TypeComputerUtilities;
 import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.asterix.om.types.IAType;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -26,6 +27,14 @@
     public IAType computeType(ILogicalExpression expression, IVariableTypeEnvironment env,
             IMetadataProvider<?, ?> metadataProvider) throws AlgebricksException {
         AbstractFunctionCallExpression f = (AbstractFunctionCallExpression) expression;
+
+        /**
+         * if type has been top-down propagated, use the enforced type
+         */
+        ARecordType type = TypeComputerUtilities.getRequiredType(f);
+        if (type != null)
+            return type;
+
         int n = f.getArguments().size() / 2;
         String[] fieldNames = new String[n];
         IAType[] fieldTypes = new IAType[n];
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/util/ARecordCaster.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/util/ARecordCaster.java
index 08d6876..afd11de 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/util/ARecordCaster.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/util/ARecordCaster.java
@@ -22,6 +22,7 @@
 
     // describe fields (open or not) in the input records
     private boolean[] openFields;
+    private boolean[] optionalFields;
 
     private List<SimpleValueReference> reqFieldNames = new ArrayList<SimpleValueReference>();
     private List<SimpleValueReference> reqFieldTypeTags = new ArrayList<SimpleValueReference>();
@@ -33,6 +34,7 @@
 
     private RecordBuilder recBuilder = new RecordBuilder();
     private SimpleValueReference nullReference = new SimpleValueReference();
+    private SimpleValueReference nullTypeTag = new SimpleValueReference();
 
     public ARecordCaster() {
         try {
@@ -42,6 +44,10 @@
             nullWriter.writeNull(dos);
             int end = bos.size();
             nullReference.reset(buffer, start, end - start);
+            start = bos.size();
+            dos.write(ATypeTag.NULL.serialize());
+            end = bos.size();
+            nullTypeTag.reset(buffer, start, end);
         } catch (IOException e) {
             throw new IllegalStateException(e);
         }
@@ -81,6 +87,9 @@
         IAType[] fieldTypes = reqType.getFieldTypes();
         String[] fieldNames = reqType.getFieldNames();
         fieldPermutation = new int[numSchemaFields];
+        optionalFields = new boolean[numSchemaFields];
+        for (int i = 0; i < optionalFields.length; i++)
+            optionalFields[i] = false;
 
         bos.setByteArray(buffer, nullReference.getStartIndex() + nullReference.getLength());
         for (int i = 0; i < numSchemaFields; i++) {
@@ -88,6 +97,13 @@
             String fname = fieldNames[i];
 
             // add type tag pointable
+            if (fieldTypes[i].getTypeTag() == ATypeTag.UNION
+                    && NonTaggedFormatUtil.isOptionalField((AUnionType) fieldTypes[i])) {
+                // optional field: add the embedded non-null type tag
+                ftypeTag = ((AUnionType) fieldTypes[i]).getUnionList()
+                        .get(NonTaggedFormatUtil.OPTIONAL_TYPE_INDEX_IN_UNION_LIST).getTypeTag();
+                optionalFields[i] = true;
+            }
             int tagStart = bos.size();
             dos.writeByte(ftypeTag.serialize());
             int tagEnd = bos.size();
@@ -139,9 +155,17 @@
             for (int j = 0; j < fieldNames.size(); j++) {
                 SimpleValueReference fieldName = fieldNames.get(j);
                 SimpleValueReference fieldTypeTag = fieldTypeTags.get(j);
-                if (fieldName.equals(reqFieldName) && fieldTypeTag.equals(reqFieldTypeTag)) {
-                    matched = true;
-                    break;
+                if (fieldName.equals(reqFieldName)) {
+                    if (fieldTypeTag.equals(reqFieldTypeTag)) {
+                        matched = true;
+                        break;
+                    }
+
+                    // match the null type of optional field
+                    if (optionalFields[i] && fieldTypeTag.equals(nullTypeTag)) {
+                        matched = true;
+                        break;
+                    }
                 }
             }
             if (matched)