[ASTERIXDB-3263][COMP] Incorrect data inserted when optional field in closed type

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Update the dynamic type cast rule to correctly check type compatibility
  in case of insert/upsert.
- Update the constant folding rule to allow constant fold closed records.

Change-Id: I3469da3f4f70759886fe9b98faf99eaaceff1edc
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17778
Reviewed-by: Peeyush Gupta <peeyush.gupta@couchbase.com>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ConstantFoldingRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ConstantFoldingRule.java
index 9fe1ba4..3b39d7c 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ConstantFoldingRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ConstantFoldingRule.java
@@ -395,12 +395,14 @@
             if (fi.isExternal()) {
                 return false;
             }
+            IAType returnType = (IAType) _emptyTypeEnv.getType(function);
             // skip all functions that would produce records/arrays/multisets (derived types) in their open format
             // this is because constant folding them will make them closed (currently)
             if (function.getFunctionIdentifier().equals(BuiltinFunctions.OPEN_RECORD_CONSTRUCTOR)) {
-                return false;
+                if (returnType.getTypeTag() != ATypeTag.OBJECT || ((ARecordType) returnType).isOpen()) {
+                    return false;
+                }
             }
-            IAType returnType = (IAType) _emptyTypeEnv.getType(function);
             return canConstantFoldType(returnType);
         }
 
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/FullTextContainsParameterCheckAndSetRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/FullTextContainsParameterCheckAndSetRule.java
index 3e4e563..3b2153c 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/FullTextContainsParameterCheckAndSetRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/FullTextContainsParameterCheckAndSetRule.java
@@ -26,9 +26,11 @@
 import org.apache.asterix.common.metadata.DataverseName;
 import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.asterix.metadata.utils.FullTextUtil;
+import org.apache.asterix.om.base.ARecord;
 import org.apache.asterix.om.base.AString;
 import org.apache.asterix.om.constants.AsterixConstantValue;
 import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.asterix.om.utils.ConstantExpressionUtil;
 import org.apache.asterix.runtime.evaluators.functions.FullTextContainsFunctionDescriptor;
@@ -239,82 +241,111 @@
                 List<Mutable<ILogicalExpression>> newArgs, String functionName) throws AlgebricksException {
             String ftConfigName = null;
 
-            // Get the last parameter - this should be a record-constructor.
-            AbstractFunctionCallExpression openRecConsExpr = (AbstractFunctionCallExpression) expr.getValue();
-            FunctionIdentifier openRecConsFi = openRecConsExpr.getFunctionIdentifier();
-            if (openRecConsFi != BuiltinFunctions.OPEN_RECORD_CONSTRUCTOR
-                    && openRecConsFi != BuiltinFunctions.CLOSED_RECORD_CONSTRUCTOR) {
-                throw CompilationException.create(ErrorCode.TYPE_UNSUPPORTED, openRecConsExpr.getSourceLocation(),
-                        functionName, openRecConsFi);
-            }
-
-            // We multiply 2 because the layout of the arguments are: [expr, val, expr1, val1, ...]
-            if (openRecConsExpr.getArguments().size() > FullTextContainsFunctionDescriptor.getParamTypeMap().size()
-                    * 2) {
-                throw CompilationException.create(ErrorCode.TOO_MANY_OPTIONS_FOR_FUNCTION,
-                        openRecConsExpr.getSourceLocation(), functionName);
-            }
-
-            if (openRecConsExpr.getArguments().size() % 2 != 0) {
-                throw CompilationException.create(ErrorCode.COMPILATION_INVALID_PARAMETER_NUMBER,
-                        openRecConsExpr.getSourceLocation(), functionName);
-            }
-
-            for (int i = 0; i < openRecConsExpr.getArguments().size(); i = i + 2) {
-                ILogicalExpression optionExpr = openRecConsExpr.getArguments().get(i).getValue();
-                ILogicalExpression optionExprVal = openRecConsExpr.getArguments().get(i + 1).getValue();
-
-                String option = ConstantExpressionUtil.getStringConstant(optionExpr);
-
-                if (optionExpr.getExpressionTag() != LogicalExpressionTag.CONSTANT || option == null) {
-                    throw CompilationException.create(ErrorCode.TYPE_UNSUPPORTED, optionExpr.getSourceLocation(),
-                            functionName, optionExpr.getExpressionTag());
+            // Get the last parameter - this should be a record-constructor or a constant expression.
+            if (expr.getValue().getExpressionTag() == LogicalExpressionTag.CONSTANT) {
+                ConstantExpression constantExpression = (ConstantExpression) expr.getValue();
+                ARecord record =
+                        (ARecord) ConstantExpressionUtil.getConstantIaObject(constantExpression, ATypeTag.OBJECT);
+                if (record == null) {
+                    throw CompilationException.create(ErrorCode.TYPE_UNSUPPORTED,
+                            constantExpression.getSourceLocation(), functionName,
+                            constantExpression.getExpressionTag());
+                }
+                ARecordType recordType = record.getType();
+                if (record.numberOfFields() > FullTextContainsFunctionDescriptor.getParamTypeMap().size()) {
+                    throw CompilationException.create(ErrorCode.TOO_MANY_OPTIONS_FOR_FUNCTION,
+                            constantExpression.getSourceLocation(), functionName);
+                }
+                for (int i = 0; i < record.numberOfFields(); i++) {
+                    String option = recordType.getFieldNames()[i].toLowerCase();
+                    ILogicalExpression optionExpr =
+                            new ConstantExpression(new AsterixConstantValue(new AString(option)));
+                    ILogicalExpression optionExprVal =
+                            new ConstantExpression(new AsterixConstantValue(record.getValueByPos(i)));
+                    ftConfigName = handleThirdParameterOptions(optionExpr, optionExprVal, newArgs, functionName);
+                }
+            } else {
+                AbstractFunctionCallExpression openRecConsExpr = (AbstractFunctionCallExpression) expr.getValue();
+                FunctionIdentifier openRecConsFi = openRecConsExpr.getFunctionIdentifier();
+                if (openRecConsFi != BuiltinFunctions.OPEN_RECORD_CONSTRUCTOR
+                        && openRecConsFi != BuiltinFunctions.CLOSED_RECORD_CONSTRUCTOR) {
+                    throw CompilationException.create(ErrorCode.TYPE_UNSUPPORTED, openRecConsExpr.getSourceLocation(),
+                            functionName, openRecConsFi);
                 }
 
-                option = option.toLowerCase();
-                if (!FullTextContainsFunctionDescriptor.getParamTypeMap().containsKey(option)) {
-                    throw CompilationException.create(ErrorCode.TYPE_UNSUPPORTED, optionExprVal.getSourceLocation(),
-                            functionName, option);
+                // We multiply 2 because the layout of the arguments are: [expr, val, expr1, val1, ...]
+                if (openRecConsExpr.getArguments().size() > FullTextContainsFunctionDescriptor.getParamTypeMap().size()
+                        * 2) {
+                    throw CompilationException.create(ErrorCode.TOO_MANY_OPTIONS_FOR_FUNCTION,
+                            openRecConsExpr.getSourceLocation(), functionName);
                 }
 
-                String optionTypeStringVal = null;
-                // If the option value is a constant, then we can check here.
-                if (optionExprVal.getExpressionTag() == LogicalExpressionTag.CONSTANT) {
-                    switch (FullTextContainsFunctionDescriptor.getParamTypeMap().get(option)) {
-                        case STRING:
-                            optionTypeStringVal = ConstantExpressionUtil.getStringConstant(optionExprVal);
-                            if (optionTypeStringVal == null) {
-                                throw CompilationException.create(ErrorCode.TYPE_UNSUPPORTED,
-                                        optionExprVal.getSourceLocation(), functionName, option);
-                            }
-                            optionTypeStringVal = optionTypeStringVal.toLowerCase();
-                            break;
-                        default:
-                            // Currently, we only have a string parameter. So, the flow doesn't reach here.
+                if (openRecConsExpr.getArguments().size() % 2 != 0) {
+                    throw CompilationException.create(ErrorCode.COMPILATION_INVALID_PARAMETER_NUMBER,
+                            openRecConsExpr.getSourceLocation(), functionName);
+                }
+
+                for (int i = 0; i < openRecConsExpr.getArguments().size(); i = i + 2) {
+                    ILogicalExpression optionExpr = openRecConsExpr.getArguments().get(i).getValue();
+                    ILogicalExpression optionExprVal = openRecConsExpr.getArguments().get(i + 1).getValue();
+                    ftConfigName = handleThirdParameterOptions(optionExpr, optionExprVal, newArgs, functionName);
+                }
+            }
+            return ftConfigName;
+        }
+
+        private String handleThirdParameterOptions(ILogicalExpression optionExpr, ILogicalExpression optionExprVal,
+                List<Mutable<ILogicalExpression>> newArgs, String functionName) throws AlgebricksException {
+            String ftConfigName = null;
+            String option = ConstantExpressionUtil.getStringConstant(optionExpr);
+
+            if (optionExpr.getExpressionTag() != LogicalExpressionTag.CONSTANT || option == null) {
+                throw CompilationException.create(ErrorCode.TYPE_UNSUPPORTED, optionExpr.getSourceLocation(),
+                        functionName, optionExpr.getExpressionTag());
+            }
+
+            option = option.toLowerCase();
+            if (!FullTextContainsFunctionDescriptor.getParamTypeMap().containsKey(option)) {
+                throw CompilationException.create(ErrorCode.TYPE_UNSUPPORTED, optionExprVal.getSourceLocation(),
+                        functionName, option);
+            }
+
+            String optionTypeStringVal = null;
+            // If the option value is a constant, then we can check here.
+            if (optionExprVal.getExpressionTag() == LogicalExpressionTag.CONSTANT) {
+                switch (FullTextContainsFunctionDescriptor.getParamTypeMap().get(option)) {
+                    case STRING:
+                        optionTypeStringVal = ConstantExpressionUtil.getStringConstant(optionExprVal);
+                        if (optionTypeStringVal == null) {
                             throw CompilationException.create(ErrorCode.TYPE_UNSUPPORTED,
                                     optionExprVal.getSourceLocation(), functionName, option);
-                    }
-
-                    // Check the validity of option value
-                    switch (option) {
-                        case FullTextContainsFunctionDescriptor.SEARCH_MODE_OPTION:
-                            checkSearchModeOption(optionTypeStringVal, functionName, optionExprVal.getSourceLocation());
-                            break;
-                        case FullTextContainsFunctionDescriptor.FULLTEXT_CONFIG_OPTION:
-                            checkFullTextConfigOption(optionTypeStringVal, functionName,
-                                    optionExprVal.getSourceLocation());
-                            ftConfigName = optionTypeStringVal;
-                            break;
-                        default:
-                            throw CompilationException.create(ErrorCode.TYPE_UNSUPPORTED,
-                                    optionExprVal.getSourceLocation(), functionName, option);
-                    }
+                        }
+                        optionTypeStringVal = optionTypeStringVal.toLowerCase();
+                        break;
+                    default:
+                        // Currently, we only have a string parameter. So, the flow doesn't reach here.
+                        throw CompilationException.create(ErrorCode.TYPE_UNSUPPORTED, optionExprVal.getSourceLocation(),
+                                functionName, option);
                 }
 
-                // Add this option as arguments to the ftcontains().
-                newArgs.add(new MutableObject<ILogicalExpression>(optionExpr));
-                newArgs.add(new MutableObject<ILogicalExpression>(optionExprVal));
+                // Check the validity of option value
+                switch (option) {
+                    case FullTextContainsFunctionDescriptor.SEARCH_MODE_OPTION:
+                        checkSearchModeOption(optionTypeStringVal, functionName, optionExprVal.getSourceLocation());
+                        break;
+                    case FullTextContainsFunctionDescriptor.FULLTEXT_CONFIG_OPTION:
+                        checkFullTextConfigOption(optionTypeStringVal, functionName, optionExprVal.getSourceLocation());
+                        ftConfigName = optionTypeStringVal;
+                        break;
+                    default:
+                        throw CompilationException.create(ErrorCode.TYPE_UNSUPPORTED, optionExprVal.getSourceLocation(),
+                                functionName, option);
+                }
             }
+
+            // Add this option as arguments to the ftcontains().
+            newArgs.add(new MutableObject<>(optionExpr));
+            newArgs.add(new MutableObject<>(optionExprVal));
             return ftConfigName;
         }
 
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceDynamicTypeCastForExternalFunctionRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceDynamicTypeCastForExternalFunctionRule.java
index 22d87ac..e8f9048 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceDynamicTypeCastForExternalFunctionRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceDynamicTypeCastForExternalFunctionRule.java
@@ -81,8 +81,7 @@
             reqArgType = ((ExternalFunctionInfo) funcCallExpr.getFunctionInfo()).getParameterTypes().get(i);
 
             if (reqArgType.getTypeTag() == ATypeTag.OBJECT) {
-                castFlag = !IntroduceDynamicTypeCastRule.compatible((ARecordType) reqArgType, inputType,
-                        argExpr.getValue().getSourceLocation());
+                castFlag = !IntroduceDynamicTypeCastRule.compatible((ARecordType) reqArgType, inputType, op);
             } else if (reqArgType.getTypeTag() == ATypeTag.ANY) {
                 IAType inputPrimeType = TypeComputeUtils.getActualType(inputType);
                 castFlag = inputPrimeType.getTypeTag().isDerivedType();
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceDynamicTypeCastRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceDynamicTypeCastRule.java
index 4044965..a9ad7ce 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceDynamicTypeCastRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceDynamicTypeCastRule.java
@@ -167,7 +167,7 @@
         }
 
         /** see whether the input record type needs to be casted */
-        boolean cast = !compatible(requiredRecordType, inputRecordType, op.getSourceLocation());
+        boolean cast = !compatible(requiredRecordType, inputRecordType, op);
 
         if (checkUnknown) {
             recordVar = addWrapperFunction(requiredRecordType, recordVar, op, context, BuiltinFunctions.CHECK_UNKNOWN);
@@ -252,8 +252,9 @@
      * @return true if compatible; false otherwise
      * @throws AlgebricksException
      */
-    public static boolean compatible(ARecordType reqType, IAType inputType, SourceLocation sourceLoc)
+    public static boolean compatible(ARecordType reqType, IAType inputType, ILogicalOperator op)
             throws AlgebricksException {
+        SourceLocation sourceLoc = op.getSourceLocation();
         if (inputType.getTypeTag() == ATypeTag.ANY) {
             return false;
         }
@@ -279,19 +280,25 @@
                 return false;
             }
             IAType reqTypeInside = reqTypes[i];
-            if (NonTaggedFormatUtil.isOptional(reqTypes[i])) {
-                reqTypeInside = ((AUnionType) reqTypes[i]).getActualType();
-            }
             IAType inputTypeInside = inputTypes[i];
-            if (NonTaggedFormatUtil.isOptional(inputTypes[i])) {
-                if (!NonTaggedFormatUtil.isOptional(reqTypes[i])) {
-                    /** if the required type is not optional, the two types are incompatible */
+            if (op.getOperatorTag() == LogicalOperatorTag.INSERT_DELETE_UPSERT) {
+                if (!reqTypeInside.equals(inputTypeInside)) {
                     return false;
                 }
-                inputTypeInside = ((AUnionType) inputTypes[i]).getActualType();
-            }
-            if (inputTypeInside.getTypeTag() != ATypeTag.MISSING && !reqTypeInside.equals(inputTypeInside)) {
-                return false;
+            } else {
+                if (NonTaggedFormatUtil.isOptional(reqTypes[i])) {
+                    reqTypeInside = ((AUnionType) reqTypes[i]).getActualType();
+                }
+                if (NonTaggedFormatUtil.isOptional(inputTypes[i])) {
+                    if (!NonTaggedFormatUtil.isOptional(reqTypes[i])) {
+                        /** if the required type is not optional, the two types are incompatible */
+                        return false;
+                    }
+                    inputTypeInside = ((AUnionType) inputTypes[i]).getActualType();
+                }
+                if (inputTypeInside.getTypeTag() != ATypeTag.MISSING && !reqTypeInside.equals(inputTypeInside)) {
+                    return false;
+                }
             }
         }
         return true;
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/index-through-object/index-through-object.9.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/index-through-object/index-through-object.9.plan
index 1d2e55b..9b51cb0 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/index-through-object/index-through-object.9.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/index-through-object/index-through-object.9.plan
@@ -2,46 +2,38 @@
   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
     -- STREAM_PROJECT  |PARTITIONED|
       -- ASSIGN  |PARTITIONED|
-        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-          -- NESTED_LOOP  |PARTITIONED|
-            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-              -- STREAM_PROJECT  |PARTITIONED|
-                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                  -- HYBRID_HASH_JOIN [$$82][$$83]  |PARTITIONED|
-                    -- HASH_PARTITION_EXCHANGE [$$82]  |PARTITIONED|
-                      -- ASSIGN  |PARTITIONED|
-                        -- STREAM_SELECT  |PARTITIONED|
-                          -- ASSIGN  |PARTITIONED|
-                            -- STREAM_PROJECT  |PARTITIONED|
-                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                -- BTREE_SEARCH (Test.Users.Users)  |PARTITIONED|
-                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                    -- STABLE_SORT [$$106(ASC)]  |PARTITIONED|
-                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                        -- STREAM_PROJECT  |PARTITIONED|
-                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                            -- BTREE_SEARCH (Test.Users.usersNameIdx)  |PARTITIONED|
-                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                -- ASSIGN  |PARTITIONED|
-                                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                    -- HASH_PARTITION_EXCHANGE [$$83]  |PARTITIONED|
-                      -- STREAM_SELECT  |PARTITIONED|
-                        -- ASSIGN  |PARTITIONED|
-                          -- ASSIGN  |PARTITIONED|
-                            -- STREAM_PROJECT  |PARTITIONED|
-                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                -- BTREE_SEARCH (Test.Users.Users)  |PARTITIONED|
-                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                    -- STABLE_SORT [$$110(ASC)]  |PARTITIONED|
-                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                        -- STREAM_PROJECT  |PARTITIONED|
-                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                            -- BTREE_SEARCH (Test.Users.usersNameIdx)  |PARTITIONED|
-                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                -- ASSIGN  |PARTITIONED|
-                                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-            -- BROADCAST_EXCHANGE  |PARTITIONED|
-              -- STREAM_SELECT  |UNPARTITIONED|
-                -- ASSIGN  |UNPARTITIONED|
-                  -- UNNEST  |UNPARTITIONED|
-                    -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
\ No newline at end of file
+        -- STREAM_PROJECT  |PARTITIONED|
+          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+            -- HYBRID_HASH_JOIN [$$88][$$89]  |PARTITIONED|
+              -- HASH_PARTITION_EXCHANGE [$$88]  |PARTITIONED|
+                -- ASSIGN  |PARTITIONED|
+                  -- STREAM_SELECT  |PARTITIONED|
+                    -- ASSIGN  |PARTITIONED|
+                      -- STREAM_PROJECT  |PARTITIONED|
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          -- BTREE_SEARCH (Test.Users.Users)  |PARTITIONED|
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              -- STABLE_SORT [$$110(ASC)]  |PARTITIONED|
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  -- STREAM_PROJECT  |PARTITIONED|
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      -- BTREE_SEARCH (Test.Users.usersNameIdx)  |PARTITIONED|
+                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                          -- ASSIGN  |PARTITIONED|
+                                            -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+              -- HASH_PARTITION_EXCHANGE [$$89]  |PARTITIONED|
+                -- STREAM_SELECT  |PARTITIONED|
+                  -- ASSIGN  |PARTITIONED|
+                    -- ASSIGN  |PARTITIONED|
+                      -- STREAM_PROJECT  |PARTITIONED|
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          -- BTREE_SEARCH (Test.Users.Users)  |PARTITIONED|
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              -- STABLE_SORT [$$114(ASC)]  |PARTITIONED|
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  -- STREAM_PROJECT  |PARTITIONED|
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      -- BTREE_SEARCH (Test.Users.usersNameIdx)  |PARTITIONED|
+                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                          -- ASSIGN  |PARTITIONED|
+                                            -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/primary-key-index/insert-primary-key-index-with-auto-gen-pk.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/primary-key-index/insert-primary-key-index-with-auto-gen-pk.plan
index 9a296a5..5f89a6d 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/primary-key-index/insert-primary-key-index-with-auto-gen-pk.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/primary-key-index/insert-primary-key-index-with-auto-gen-pk.plan
@@ -4,7 +4,7 @@
       -- INSERT_DELETE  |PARTITIONED|
         -- HASH_PARTITION_EXCHANGE [$$3]  |PARTITIONED|
           -- ASSIGN  |UNPARTITIONED|
-            -- STREAM_PROJECT  |UNPARTITIONED|
-              -- ASSIGN  |UNPARTITIONED|
+            -- ASSIGN  |UNPARTITIONED|
+              -- STREAM_PROJECT  |UNPARTITIONED|
                 -- ASSIGN  |UNPARTITIONED|
                   -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results_cbo/index-through-object/index-through-object.9.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results_cbo/index-through-object/index-through-object.9.plan
index 75a50b4..a7f92e3 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results_cbo/index-through-object/index-through-object.9.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results_cbo/index-through-object/index-through-object.9.plan
@@ -2,40 +2,32 @@
   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
     -- STREAM_PROJECT  |PARTITIONED|
       -- ASSIGN  |PARTITIONED|
-        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-          -- NESTED_LOOP  |PARTITIONED|
-            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-              -- STREAM_PROJECT  |PARTITIONED|
-                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                  -- HYBRID_HASH_JOIN [$$88][$$89]  |PARTITIONED|
-                    -- HASH_PARTITION_EXCHANGE [$$88]  |PARTITIONED|
-                      -- ASSIGN  |PARTITIONED|
-                        -- STREAM_SELECT  |PARTITIONED|
-                          -- ASSIGN  |PARTITIONED|
-                            -- STREAM_PROJECT  |PARTITIONED|
-                              -- ASSIGN  |PARTITIONED|
-                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                  -- REPLICATE  |PARTITIONED|
-                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                      -- STREAM_PROJECT  |PARTITIONED|
-                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                          -- DATASOURCE_SCAN (Test.Users)  |PARTITIONED|
-                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                              -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                    -- HASH_PARTITION_EXCHANGE [$$89]  |PARTITIONED|
-                      -- STREAM_SELECT  |PARTITIONED|
+        -- STREAM_PROJECT  |PARTITIONED|
+          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+            -- HYBRID_HASH_JOIN [$$88][$$89]  |PARTITIONED|
+              -- HASH_PARTITION_EXCHANGE [$$88]  |PARTITIONED|
+                -- ASSIGN  |PARTITIONED|
+                  -- STREAM_SELECT  |PARTITIONED|
+                    -- ASSIGN  |PARTITIONED|
+                      -- STREAM_PROJECT  |PARTITIONED|
                         -- ASSIGN  |PARTITIONED|
-                          -- ASSIGN  |PARTITIONED|
-                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                              -- REPLICATE  |PARTITIONED|
-                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                  -- STREAM_PROJECT  |PARTITIONED|
-                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                      -- DATASOURCE_SCAN (Test.Users)  |PARTITIONED|
-                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-            -- BROADCAST_EXCHANGE  |PARTITIONED|
-              -- STREAM_SELECT  |UNPARTITIONED|
-                -- ASSIGN  |UNPARTITIONED|
-                  -- UNNEST  |UNPARTITIONED|
-                    -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
+                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                            -- REPLICATE  |PARTITIONED|
+                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                -- STREAM_PROJECT  |PARTITIONED|
+                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                    -- DATASOURCE_SCAN (Test.Users)  |PARTITIONED|
+                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+              -- HASH_PARTITION_EXCHANGE [$$89]  |PARTITIONED|
+                -- STREAM_SELECT  |PARTITIONED|
+                  -- ASSIGN  |PARTITIONED|
+                    -- ASSIGN  |PARTITIONED|
+                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                        -- REPLICATE  |PARTITIONED|
+                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                            -- STREAM_PROJECT  |PARTITIONED|
+                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                -- DATASOURCE_SCAN (Test.Users)  |PARTITIONED|
+                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/dml/insert-closed/insert-closed.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/dml/insert-closed/insert-closed.1.ddl.sqlpp
new file mode 100644
index 0000000..b3ab9eb
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/dml/insert-closed/insert-closed.1.ddl.sqlpp
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop  dataverse test if exists;
+create  dataverse test;
+
+use test;
+
+create type testtype01 as
+ closed {
+  id : string,
+  name : string?
+};
+
+create  dataset testds01(testtype01) primary key id;
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/dml/insert-closed/insert-closed.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/dml/insert-closed/insert-closed.2.update.sqlpp
new file mode 100644
index 0000000..8f7d83e
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/dml/insert-closed/insert-closed.2.update.sqlpp
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use test;
+
+insert into testds01([{"id":"1", "name":"John1"}]);
+
+insert into testds01 {"id":"2", "name":"John2"};
+
+insert into testds01([{"id":"3", "name":"John3"}, {"id":"4", "name":"John4"}]);
+
+insert into testds01([{"id":"5", "name":"John5"}, {"id":"6"}]);
+
+insert into testds01([{"id":"7"}, {"id":"8", "name":"John8"}]);
+
+insert into testds01([{"id":"9"}]);
+
+insert into testds01 select element {"id":"10", "name":"John10"};
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/dml/insert-closed/insert-closed.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/dml/insert-closed/insert-closed.3.query.sqlpp
new file mode 100644
index 0000000..be5fb5d
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/dml/insert-closed/insert-closed.3.query.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use test;
+
+select * from testds01 order by id;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/array_fun/array_remove/array_remove.5.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/array_fun/array_remove/array_remove.5.plan
index 0100d55..5b6ddc1 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/array_fun/array_remove/array_remove.5.plan
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/array_fun/array_remove/array_remove.5.plan
@@ -2,7 +2,7 @@
 -- DISTRIBUTE_RESULT  |UNPARTITIONED|
   exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
   -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
-    unnest $$d <- scan-collection(ordered-list-constructor({"id": 1, "t1": array-remove(ordered-list-constructor(1, 2, 3, ordered-list-constructor(9, 8), ordered-list-constructor("str1", "str2"), ordered-list-constructor(90, 100)), array: [ 9, 8 ], array: [ 90, 100 ])}, {"id": 2, "t2": cast(array: [ array: [ 5, 1, 2 ], array: [ 90, 100 ] ])}, {"id": 3, "t3": cast(array-remove(ordered-list-constructor({"id": 1, "age": 34}, {"id": 2, "age": 29}, {"id": 3, "age": 90}), {"id": 4, "age": 90}, {"id": 2, "age": 29}))})) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+    unnest $$d <- scan-collection(ordered-list-constructor({"id": 1, "t1": array-remove(ordered-list-constructor(1, 2, 3, ordered-list-constructor(9, 8), ordered-list-constructor("str1", "str2"), ordered-list-constructor(90, 100)), array: [ 9, 8 ], array: [ 90, 100 ])}, {"id": 2, "t2": cast(array: [ array: [ 5, 1, 2 ], array: [ 90, 100 ] ])}, {"id": 3, "t3": cast(array: [ { "id": 1, "age": 34 }, { "id": 3, "age": 90 } ])})) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
     -- UNNEST  |UNPARTITIONED|
       empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
       -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/dml/insert-closed/insert-closed.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/dml/insert-closed/insert-closed.1.adm
new file mode 100644
index 0000000..4166875
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/dml/insert-closed/insert-closed.1.adm
@@ -0,0 +1,10 @@
+{ "testds01": { "id": "1", "name": "John1" } }
+{ "testds01": { "id": "10", "name": "John10" } }
+{ "testds01": { "id": "2", "name": "John2" } }
+{ "testds01": { "id": "3", "name": "John3" } }
+{ "testds01": { "id": "4", "name": "John4" } }
+{ "testds01": { "id": "5", "name": "John5" } }
+{ "testds01": { "id": "6" } }
+{ "testds01": { "id": "7" } }
+{ "testds01": { "id": "8", "name": "John8" } }
+{ "testds01": { "id": "9" } }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index 6d83fb5..1800d47 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -4570,6 +4570,11 @@
       </compilation-unit>
     </test-case>
     <test-case FilePath="dml">
+      <compilation-unit name="insert-closed">
+        <output-dir compare="Text">insert-closed</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="dml">
       <compilation-unit name="insert-duplicated-keys">
         <output-dir compare="Text">insert-duplicated-keys</output-dir>
         <expected-error>Inserting duplicate keys into the primary storage</expected-error>