fix issue258
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/IntroduceDynamicTypeCastRule.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/IntroduceDynamicTypeCastRule.java
index 4728969..3f96b8e 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/IntroduceDynamicTypeCastRule.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/IntroduceDynamicTypeCastRule.java
@@ -40,6 +40,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
@@ -101,16 +102,32 @@
         LogicalVariable recordVar = payloadVars.get(0);
         IVariableTypeEnvironment env = insertDeleteOperator.computeOutputTypeEnvironment(context);
         IAType inputRecordType = (IAType) env.getVarType(recordVar);
-        if (capatible(requiredRecordType, inputRecordType)) {
-            return false;
+
+        // the input record type can be an union type -- for the case when it comes from a subplan or left-outer join
+        boolean checkNull = false;
+        while (isOptional(inputRecordType)) {
+            //while-loop for the case there is a nested multi-level union
+            inputRecordType = ((AUnionType) inputRecordType).getUnionList().get(
+                    NonTaggedFormatUtil.OPTIONAL_TYPE_INDEX_IN_UNION_LIST);
+            checkNull = true;
         }
 
-        LogicalVariable replacedVar = addCast(requiredRecordType, recordVar, insertDeleteOp, context);
-        return replacedVar == null;
+        //see whether the input record type needs to be casted
+        boolean cast = !compatible(requiredRecordType, inputRecordType);
+
+        if (checkNull) {
+            recordVar = addWrapperFunction(requiredRecordType, recordVar, insertDeleteOp, context,
+                    AsterixBuiltinFunctions.NOT_NULL);
+        }
+        if (cast) {
+            addWrapperFunction(requiredRecordType, recordVar, insertDeleteOp, context,
+                    AsterixBuiltinFunctions.CAST_RECORD);
+        }
+        return cast || checkNull;
     }
 
     /**
-     * Inject a cast-record function when necessary
+     * Inject a function to wrap a variable when necessary
      * 
      * @param requiredRecordType
      *            the required record type
@@ -120,45 +137,48 @@
      *            the current parent operator to be rewritten
      * @param context
      *            the optimization context
+     * @param fd
+     *            the function to be injected
      * @return true if cast is injected; false otherwise.
      * @throws AlgebricksException
      */
-    public LogicalVariable addCast(ARecordType requiredRecordType, LogicalVariable recordVar, ILogicalOperator parent,
-            IOptimizationContext context) throws AlgebricksException {
+    public LogicalVariable addWrapperFunction(ARecordType requiredRecordType, LogicalVariable recordVar,
+            ILogicalOperator parent, IOptimizationContext context, FunctionIdentifier fd) throws AlgebricksException {
         List<Mutable<ILogicalOperator>> opRefs = parent.getInputs();
         for (int index = 0; index < opRefs.size(); index++) {
             Mutable<ILogicalOperator> opRef = opRefs.get(index);
             ILogicalOperator op = opRef.getValue();
 
+            //get produced vars
             List<LogicalVariable> producedVars = new ArrayList<LogicalVariable>();
             VariableUtilities.getProducedVariables(op, producedVars);
             IVariableTypeEnvironment env = op.computeOutputTypeEnvironment(context);
-
             for (int i = 0; i < producedVars.size(); i++) {
                 LogicalVariable var = producedVars.get(i);
                 if (var.equals(recordVar)) {
+                    //insert an assign operator to call the function on-top-of the variable
                     IAType actualType = (IAType) env.getVarType(var);
-                    if (!capatible(requiredRecordType, actualType)) {
-                        AbstractFunctionCallExpression cast = new ScalarFunctionCallExpression(
-                                FunctionUtils.getFunctionInfo(AsterixBuiltinFunctions.CAST_RECORD));
-                        cast.getArguments().add(
-                                new MutableObject<ILogicalExpression>(new VariableReferenceExpression(var)));
-                        TypeComputerUtilities.setRequiredAndInputTypes(cast, requiredRecordType, actualType);
-                        LogicalVariable newAssignVar = context.newVar();
-                        AssignOperator newAssignOperator = new AssignOperator(newAssignVar,
-                                new MutableObject<ILogicalExpression>(cast));
-                        newAssignOperator.getInputs().add(new MutableObject<ILogicalOperator>(op));
-                        opRef.setValue(newAssignOperator);
-                        context.computeAndSetTypeEnvironmentForOperator(parent);
-                        context.computeAndSetTypeEnvironmentForOperator(newAssignOperator);
-                        newAssignOperator.computeOutputTypeEnvironment(context);
-                        VariableUtilities.substituteVariables(parent, recordVar, newAssignVar, context);
-                        return newAssignVar;
-                    }
+                    AbstractFunctionCallExpression cast = new ScalarFunctionCallExpression(
+                            FunctionUtils.getFunctionInfo(fd));
+                    cast.getArguments()
+                            .add(new MutableObject<ILogicalExpression>(new VariableReferenceExpression(var)));
+                    //enforce the required record type
+                    TypeComputerUtilities.setRequiredAndInputTypes(cast, requiredRecordType, actualType);
+                    LogicalVariable newAssignVar = context.newVar();
+                    AssignOperator newAssignOperator = new AssignOperator(newAssignVar,
+                            new MutableObject<ILogicalExpression>(cast));
+                    newAssignOperator.getInputs().add(new MutableObject<ILogicalOperator>(op));
+                    opRef.setValue(newAssignOperator);
+                    context.computeAndSetTypeEnvironmentForOperator(newAssignOperator);
+                    newAssignOperator.computeOutputTypeEnvironment(context);
+                    VariableUtilities.substituteVariables(parent, recordVar, newAssignVar, context);
+                    return newAssignVar;
                 }
             }
-            LogicalVariable replacedVar = addCast(requiredRecordType, recordVar, op, context);
+            //recursive descend to the operator who produced the recordVar
+            LogicalVariable replacedVar = addWrapperFunction(requiredRecordType, recordVar, op, context, fd);
             if (replacedVar != null) {
+                //substitute the recordVar by the replacedVar for operators who uses recordVar 
                 VariableUtilities.substituteVariables(parent, recordVar, replacedVar, context);
                 return replacedVar;
             }
@@ -166,10 +186,22 @@
         return null;
     }
 
-    private boolean capatible(ARecordType reqType, IAType inputType) {
+    /**
+     * Check whether the required record type and the input type is compatible
+     * 
+     * @param reqType
+     * @param inputType
+     * @return true if compatible; false otherwise
+     * @throws AlgebricksException
+     */
+    private boolean compatible(ARecordType reqType, IAType inputType) throws AlgebricksException {
         if (inputType.getTypeTag() == ATypeTag.ANY) {
             return false;
         }
+        if (inputType.getTypeTag() != ATypeTag.RECORD) {
+            throw new AlgebricksException("The input type " + inputType + " is not a valid record type!");
+        }
+
         IAType[] reqTypes = reqType.getFieldTypes();
         String[] reqFieldNames = reqType.getFieldNames();
         IAType[] inputTypes = ((ARecordType) inputType).getFieldTypes();
@@ -183,14 +215,16 @@
                 return false;
             }
             IAType reqTypeInside = reqTypes[i];
-            if (reqTypes[i].getTypeTag() == ATypeTag.UNION
-                    && NonTaggedFormatUtil.isOptionalField((AUnionType) reqTypes[i])) {
+            if (isOptional(reqTypes[i])) {
                 reqTypeInside = ((AUnionType) reqTypes[i]).getUnionList().get(
                         NonTaggedFormatUtil.OPTIONAL_TYPE_INDEX_IN_UNION_LIST);
             }
             IAType inputTypeInside = inputTypes[i];
-            if (inputTypes[i].getTypeTag() == ATypeTag.UNION
-                    && NonTaggedFormatUtil.isOptionalField((AUnionType) inputTypes[i])) {
+            if (isOptional(inputTypes[i])) {
+                if (!isOptional(reqTypes[i])) {
+                    // if the required type is not optional, the two types are incompatible
+                    return false;
+                }
                 inputTypeInside = ((AUnionType) inputTypes[i]).getUnionList().get(
                         NonTaggedFormatUtil.OPTIONAL_TYPE_INDEX_IN_UNION_LIST);
             }
@@ -200,4 +234,14 @@
         }
         return true;
     }
+
+    /**
+     * Decide whether a type is an optional type
+     * 
+     * @param type
+     * @return true if it is optional; false otherwise
+     */
+    private boolean isOptional(IAType type) {
+        return type.getTypeTag() == ATypeTag.UNION && NonTaggedFormatUtil.isOptionalField((AUnionType) type);
+    }
 }
diff --git a/asterix-app/src/test/resources/runtimets/queries/open-closed/query-issue258/query-issue258.1.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/open-closed/query-issue258/query-issue258.1.ddl.aql
new file mode 100644
index 0000000..5b219b5
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/open-closed/query-issue258/query-issue258.1.ddl.aql
@@ -0,0 +1,18 @@
+/*
+ * Description  : This test case is to verify the fix for issue258
+ 				: https://code.google.com/p/asterixdb/issues/detail?id=258
+ * Expected Res : Success
+ * Date         : 21 May 2013
+ */
+
+drop dataverse test if exists;
+create dataverse test if not exists;
+use dataverse test;
+
+create type t1 as closed {
+id:int32
+};
+
+
+create dataset ds1(t1) primary key id;
+create dataset ds2(t1) primary key id;
diff --git a/asterix-app/src/test/resources/runtimets/queries/open-closed/query-issue258/query-issue258.2.update.aql b/asterix-app/src/test/resources/runtimets/queries/open-closed/query-issue258/query-issue258.2.update.aql
new file mode 100644
index 0000000..fb334ad
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/open-closed/query-issue258/query-issue258.2.update.aql
@@ -0,0 +1,20 @@
+/*
+ * Description  : This test case is to verify the fix for issue258
+ 				: https://code.google.com/p/asterixdb/issues/detail?id=258
+ * Expected Res : Success
+ * Date         : 21 May 2013
+ */
+
+use dataverse test;
+
+insert into dataset ds1(
+let $L:= 
+  for $x in dataset('ds2')
+  where $x.id = 10
+  return $x
+return
+  if (count($L) <= 0) then
+    {"id": 10}
+  else
+    {"id": 5}
+);
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/open-closed/query-issue258/query-issue258.3.query.aql b/asterix-app/src/test/resources/runtimets/queries/open-closed/query-issue258/query-issue258.3.query.aql
new file mode 100644
index 0000000..fff488a
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/open-closed/query-issue258/query-issue258.3.query.aql
@@ -0,0 +1,11 @@
+/*
+ * Description  : This test case is to verify the fix for issue258
+ 				: https://code.google.com/p/asterixdb/issues/detail?id=258
+ * Expected Res : Success
+ * Date         : 21 May 2013
+ */
+
+use dataverse test;
+
+for $d in dataset ds1
+return $d;
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/results/open-closed/query-issue258/query-issue258.1.adm b/asterix-app/src/test/resources/runtimets/results/open-closed/query-issue258/query-issue258.1.adm
new file mode 100644
index 0000000..0e739c3
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/open-closed/query-issue258/query-issue258.1.adm
@@ -0,0 +1 @@
+{ "id": 10 }
diff --git a/asterix-app/src/test/resources/runtimets/testsuite.xml b/asterix-app/src/test/resources/runtimets/testsuite.xml
index ff3126a..340dcea 100644
--- a/asterix-app/src/test/resources/runtimets/testsuite.xml
+++ b/asterix-app/src/test/resources/runtimets/testsuite.xml
@@ -2726,7 +2726,7 @@
     </test-case>
     <test-case FilePath="open-closed">
       <compilation-unit name="query-issue410">
-        <output-dir compare="Text">query-issue40</output-dir>
+        <output-dir compare="Text">query-issue410</output-dir>
         <expected-error>edu.uci.ics.asterix.common.exceptions.AsterixException</expected-error>
       </compilation-unit>
     </test-case>
@@ -2740,6 +2740,11 @@
 	<output-dir compare="Text">query-issue453-2</output-dir>
       </compilation-unit>
     </test-case>
+    <test-case FilePath="open-closed">
+      <compilation-unit name="query-issue258">
+        <output-dir compare="Text">query-issue258</output-dir>
+      </compilation-unit>
+    </test-case>
   </test-group>
   <test-group name="quantifiers">
     <test-case FilePath="quantifiers">
diff --git a/asterix-om/src/main/java/edu/uci/ics/asterix/om/functions/AsterixBuiltinFunctions.java b/asterix-om/src/main/java/edu/uci/ics/asterix/om/functions/AsterixBuiltinFunctions.java
index bea62dc..1eed0b9 100644
--- a/asterix-om/src/main/java/edu/uci/ics/asterix/om/functions/AsterixBuiltinFunctions.java
+++ b/asterix-om/src/main/java/edu/uci/ics/asterix/om/functions/AsterixBuiltinFunctions.java
@@ -30,6 +30,7 @@
 import edu.uci.ics.asterix.om.typecomputer.impl.CastListResultTypeComputer;
 import edu.uci.ics.asterix.om.typecomputer.impl.CastRecordResultTypeComputer;
 import edu.uci.ics.asterix.om.typecomputer.impl.ClosedRecordConstructorResultType;
+import edu.uci.ics.asterix.om.typecomputer.impl.ConcatNonNullTypeComputer;
 import edu.uci.ics.asterix.om.typecomputer.impl.FieldAccessByIndexResultType;
 import edu.uci.ics.asterix.om.typecomputer.impl.InjectFailureTypeComputer;
 import edu.uci.ics.asterix.om.typecomputer.impl.NonTaggedCollectionMemberResultType;
@@ -85,7 +86,6 @@
 import edu.uci.ics.asterix.om.types.AbstractCollectionType;
 import edu.uci.ics.asterix.om.types.BuiltinType;
 import edu.uci.ics.asterix.om.types.IAType;
-import edu.uci.ics.asterix.om.types.TypeHelper;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
@@ -581,22 +581,8 @@
         add(CARET, NonTaggedNumericAddSubMulDivTypeComputer.INSTANCE);
         add(CIRCLE_CONSTRUCTOR, OptionalACircleTypeComputer.INSTANCE);
         add(CLOSED_RECORD_CONSTRUCTOR, ClosedRecordConstructorResultType.INSTANCE);
-        add(CONCAT_NON_NULL, new IResultTypeComputer() {
-            @Override
-            public IAType computeType(ILogicalExpression expression, IVariableTypeEnvironment env,
-                    IMetadataProvider<?, ?> metadataProvider) throws AlgebricksException {
-                AbstractFunctionCallExpression f = (AbstractFunctionCallExpression) expression;
-                if (f.getArguments().size() < 1) {
-                    return BuiltinType.ANULL;
-                }
-                ILogicalExpression a0 = f.getArguments().get(0).getValue();
-                IAType t0 = (IAType) env.getType(a0);
-                if (TypeHelper.canBeNull(t0)) {
-                    return t0;
-                }
-                return AUnionType.createNullableType(t0);
-            }
-        });
+        add(CONCAT_NON_NULL, ConcatNonNullTypeComputer.INSTANCE);
+
         add(CONTAINS, ABooleanTypeComputer.INSTANCE);
         addPrivateFunction(COUNT, AInt32TypeComputer.INSTANCE);
         add(COUNTHASHED_GRAM_TOKENS, OrderedListOfAInt32TypeComputer.INSTANCE);
diff --git a/asterix-om/src/main/java/edu/uci/ics/asterix/om/typecomputer/impl/ConcatNonNullTypeComputer.java b/asterix-om/src/main/java/edu/uci/ics/asterix/om/typecomputer/impl/ConcatNonNullTypeComputer.java
new file mode 100644
index 0000000..7bf2668
--- /dev/null
+++ b/asterix-om/src/main/java/edu/uci/ics/asterix/om/typecomputer/impl/ConcatNonNullTypeComputer.java
@@ -0,0 +1,74 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.om.typecomputer.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import edu.uci.ics.asterix.om.typecomputer.base.IResultTypeComputer;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.AUnionType;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
+
+/**
+ * The type computer for concat-not-null.
+ * Note that this function is only used for the if-then-else clause.
+ * 
+ * @author yingyib
+ */
+public class ConcatNonNullTypeComputer implements IResultTypeComputer {
+
+    public static final ConcatNonNullTypeComputer INSTANCE = new ConcatNonNullTypeComputer();
+
+    @Override
+    public IAType computeType(ILogicalExpression expression, IVariableTypeEnvironment env,
+            IMetadataProvider<?, ?> metadataProvider) throws AlgebricksException {
+        AbstractFunctionCallExpression f = (AbstractFunctionCallExpression) expression;
+        if (f.getArguments().size() < 1) {
+            return BuiltinType.ANULL;
+        }
+        List<IAType> possibleTypes = new ArrayList<IAType>();
+        for (int i = 0; i < f.getArguments().size(); i++) {
+            ILogicalExpression arg = f.getArguments().get(i).getValue();
+            IAType type = (IAType) env.getType(arg);
+            if (type.getTypeTag() == ATypeTag.UNION) {
+                List<IAType> typeList = ((AUnionType) type).getUnionList();
+                for (IAType t : typeList) {
+                    if (t.getTypeTag() != ATypeTag.NULL) {
+                        //CONCAT_NON_NULL cannot return null because it's only used for if-else construct
+                        if (!possibleTypes.contains(t))
+                            possibleTypes.add(t);
+                    }
+                }
+            } else {
+                if (!possibleTypes.contains(type))
+                    possibleTypes.add(type);
+            }
+        }
+        if (possibleTypes.size() == 1) {
+            return possibleTypes.get(0);
+        } else {
+            throw new AlgebricksException("The two branches of the if-else clause should return the same type.");
+        }
+    }
+
+}