ASTERIXDB-1451: Remove Record Casting for insert/delete/upsert

This change includes the following:
- Introduce cast function in case of delete operation
  after the primary index to ensure types are passed
  correctly to enforced indexes.
- Introduce cast function in case of upsert operation
  before old secondary keys extraction to ensure types
  are passed correctly to enforced indexes.
- Replace all record casts with open field casts for
  insert/delete/upsert operations.
- Sonar-Qube fixes.

Change-Id: I6a80105798ea1c86a6a0eb69a79b9573b54931b7
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1146
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java
index c64258f..c487a96 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java
@@ -20,10 +20,9 @@
 
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Comparator;
-import java.util.LinkedHashMap;
+import java.util.HashMap;
 import java.util.List;
-import java.util.Stack;
+import java.util.Map;
 
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.config.DatasetConfig.IndexType;
@@ -32,25 +31,22 @@
 import org.apache.asterix.metadata.declared.AqlDataSource;
 import org.apache.asterix.metadata.declared.AqlIndex;
 import org.apache.asterix.metadata.declared.AqlMetadataProvider;
-import org.apache.asterix.metadata.declared.DatasetDataSource;
 import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.metadata.entities.Index;
 import org.apache.asterix.metadata.entities.InternalDatasetDetails;
 import org.apache.asterix.om.base.AInt32;
 import org.apache.asterix.om.base.AOrderedList;
 import org.apache.asterix.om.base.AString;
+import org.apache.asterix.om.base.IAObject;
 import org.apache.asterix.om.constants.AsterixConstantValue;
 import org.apache.asterix.om.functions.AsterixBuiltinFunctions;
 import org.apache.asterix.om.typecomputer.base.TypeCastUtils;
 import org.apache.asterix.om.types.AOrderedListType;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.ATypeTag;
-import org.apache.asterix.om.types.AUnionType;
 import org.apache.asterix.om.types.BuiltinType;
 import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.om.types.hierachy.ATypeHierarchy;
 import org.apache.asterix.om.util.NonTaggedFormatUtil;
-import org.apache.commons.lang3.ArrayUtils;
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.commons.lang3.mutable.MutableObject;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -77,6 +73,12 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
 import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
 
+/**
+ * This rule matches the pattern:
+ * assign --> insert-delete-upsert --> sink
+ * and produces
+ * assign --> insert-delete-upsert --> *(secondary indexes index-insert-delete-upsert) --> sink
+ */
 public class IntroduceSecondaryIndexInsertDeleteRule implements IAlgebraicRewriteRule {
 
     @Override
@@ -88,71 +90,38 @@
     @Override
     public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
             throws AlgebricksException {
-        AbstractLogicalOperator op0 = (AbstractLogicalOperator) opRef.getValue();
-        if (op0.getOperatorTag() != LogicalOperatorTag.SINK) {
+        AbstractLogicalOperator sinkOp = (AbstractLogicalOperator) opRef.getValue();
+        if (sinkOp.getOperatorTag() != LogicalOperatorTag.SINK) {
             return false;
         }
-        AbstractLogicalOperator op1 = (AbstractLogicalOperator) op0.getInputs().get(0).getValue();
-        if (op1.getOperatorTag() != LogicalOperatorTag.INSERT_DELETE_UPSERT) {
+        if (sinkOp.getInputs().get(0).getValue().getOperatorTag() != LogicalOperatorTag.INSERT_DELETE_UPSERT) {
             return false;
         }
-
-        FunctionIdentifier fid = null;
         /** find the record variable */
-        InsertDeleteUpsertOperator insertOp = (InsertDeleteUpsertOperator) op1;
-        boolean isBulkload = insertOp.isBulkload();
-        ILogicalExpression recordExpr = insertOp.getPayloadExpression().getValue();
-        List<Mutable<ILogicalExpression>> metaExprs = insertOp.getAdditionalNonFilteringExpressions();
-        LogicalVariable recordVar = null;
-        LogicalVariable metaVar = null;
-        List<LogicalVariable> usedRecordVars = new ArrayList<>();
-        /** assume the payload is always a single variable expression */
-        recordExpr.getUsedVariables(usedRecordVars);
-        if (usedRecordVars.size() == 1) {
-            recordVar = usedRecordVars.get(0);
-        }
-        if (metaExprs != null) {
-            List<LogicalVariable> metaVars = new ArrayList<>();
-            for (Mutable<ILogicalExpression> expr : metaExprs) {
-                expr.getValue().getUsedVariables(metaVars);
-            }
-            if (metaVars.size() > 1) {
-                throw new AlgebricksException(
-                        "Number of meta fields can't be more than 1. Number of meta fields found = " + metaVars.size());
-            }
-            metaVar = metaVars.get(0);
-        }
+        InsertDeleteUpsertOperator primaryIndexModificationOp =
+                (InsertDeleteUpsertOperator) sinkOp.getInputs().get(0).getValue();
+        boolean isBulkload = primaryIndexModificationOp.isBulkload();
+        ILogicalExpression newRecordExpr = primaryIndexModificationOp.getPayloadExpression().getValue();
+        List<Mutable<ILogicalExpression>> newMetaExprs =
+                primaryIndexModificationOp.getAdditionalNonFilteringExpressions();
+        LogicalVariable newRecordVar;
+        LogicalVariable newMetaVar = null;
 
         /**
-         * op2 is the assign operator which extracts primary keys from the input
+         * inputOp is the assign operator which extracts primary keys from the input
          * variables (record or meta)
          */
-        AbstractLogicalOperator op2 = (AbstractLogicalOperator) op1.getInputs().get(0).getValue();
+        AbstractLogicalOperator inputOp =
+                (AbstractLogicalOperator) primaryIndexModificationOp.getInputs().get(0).getValue();
 
-        if (recordVar == null) {
-            /**
-             * For the case primary key-assignment expressions are constant
-             * expressions, find assign op that creates record to be
-             * inserted/deleted.
-             */
-            while (fid != AsterixBuiltinFunctions.OPEN_RECORD_CONSTRUCTOR) {
-                if (op2.getInputs().size() == 0) {
-                    return false;
-                }
-                op2 = (AbstractLogicalOperator) op2.getInputs().get(0).getValue();
-                if (op2.getOperatorTag() != LogicalOperatorTag.ASSIGN) {
-                    continue;
-                }
-                AssignOperator assignOp = (AssignOperator) op2;
-                ILogicalExpression assignExpr = assignOp.getExpressions().get(0).getValue();
-                if (assignExpr.getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) {
-                    ScalarFunctionCallExpression funcExpr =
-                            (ScalarFunctionCallExpression) assignOp.getExpressions().get(0).getValue();
-                    fid = funcExpr.getFunctionIdentifier();
-                }
+        newRecordVar = getRecordVar(context, inputOp, newRecordExpr, 0);
+        if (newMetaExprs != null && !newMetaExprs.isEmpty()) {
+            if (newMetaExprs.size() > 1) {
+                throw new AlgebricksException(
+                        "Number of meta records can't be more than 1. Number of meta records found = "
+                                + newMetaExprs.size());
             }
-            AssignOperator assignOp2 = (AssignOperator) op2;
-            recordVar = assignOp2.getVariables().get(0);
+            newMetaVar = getRecordVar(context, inputOp, newMetaExprs.get(0).getValue(), 1);
         }
 
         /*
@@ -160,10 +129,10 @@
          * Note: We have two operators:
          * 1. An InsertDeleteOperator (primary)
          * 2. An IndexInsertDeleteOperator (secondary)
-         * The current insertOp is of the first type
+         * The current primaryIndexModificationOp is of the first type
          */
 
-        AqlDataSource datasetSource = (AqlDataSource) insertOp.getDataSource();
+        AqlDataSource datasetSource = (AqlDataSource) primaryIndexModificationOp.getDataSource();
         AqlMetadataProvider mp = (AqlMetadataProvider) context.getMetadataProvider();
         String dataverseName = datasetSource.getId().getDataverseName();
         String datasetName = datasetSource.getId().getDatasourceName();
@@ -175,7 +144,7 @@
             return false;
         }
 
-        // Create operators for secondary index insert/delete.
+        // Create operators for secondary index insert / delete.
         String itemTypeName = dataset.getItemTypeName();
         IAType itemType = mp.findType(dataset.getItemTypeDataverseName(), itemTypeName);
         if (itemType.getTypeTag() != ATypeTag.RECORD) {
@@ -187,49 +156,35 @@
         if (dataset.hasMetaPart()) {
             metaType = (ARecordType) mp.findType(dataset.getMetaItemTypeDataverseName(), dataset.getMetaItemTypeName());
         }
-        // recType might be replaced with enforced record type and we want to keep a reference to the original record
-        // type
-        ARecordType originalRecType = recType;
         List<Index> indexes = mp.getDatasetIndexes(dataset.getDataverseName(), dataset.getDatasetName());
         // Set the top operator pointer to the primary IndexInsertDeleteOperator
-        ILogicalOperator currentTop = op1;
+        ILogicalOperator currentTop = primaryIndexModificationOp;
         boolean hasSecondaryIndex = false;
 
         // Put an n-gram or a keyword index in the later stage of index-update,
         // since TokenizeOperator needs to be involved.
-        Collections.sort(indexes, new Comparator<Index>() {
-            @Override
-            public int compare(Index o1, Index o2) {
-                return o1.getIndexType().ordinal() - o2.getIndexType().ordinal();
-            }
-
-        });
-
-        // Check whether multiple indexes exist
-        int secondaryIndexTotalCnt = 0;
-        for (Index index : indexes) {
-            if (index.isSecondaryIndex()) {
-                secondaryIndexTotalCnt++;
-            }
-        }
+        Collections.sort(indexes, (o1, o2) -> o1.getIndexType().ordinal() - o2.getIndexType().ordinal());
 
         // At this point, we have the data type info, and the indexes info as well
+        int secondaryIndexTotalCnt = indexes.size() - 1;
         if (secondaryIndexTotalCnt > 0) {
-            op0.getInputs().clear();
+            sinkOp.getInputs().clear();
+        } else {
+            return false;
         }
         // Initialize inputs to the SINK operator Op0 (The SINK) is now without input
-
         // Prepare filtering field information (This is the filter created using the "filter with" key word in the
         // create dataset ddl)
         List<String> filteringFields = ((InternalDatasetDetails) dataset.getDatasetDetails()).getFilterField();
-        List<LogicalVariable> filteringVars = null;
+        List<LogicalVariable> filteringVars;
         List<Mutable<ILogicalExpression>> filteringExpressions = null;
 
         if (filteringFields != null) {
             // The filter field var already exists. we can simply get it from the insert op
-            filteringVars = new ArrayList<LogicalVariable>();
-            filteringExpressions = new ArrayList<Mutable<ILogicalExpression>>();
-            for (Mutable<ILogicalExpression> filteringExpression : insertOp.getAdditionalFilteringExpressions()) {
+            filteringVars = new ArrayList<>();
+            filteringExpressions = new ArrayList<>();
+            for (Mutable<ILogicalExpression> filteringExpression : primaryIndexModificationOp
+                    .getAdditionalFilteringExpressions()) {
                 filteringExpression.getValue().getUsedVariables(filteringVars);
                 for (LogicalVariable var : filteringVars) {
                     filteringExpressions
@@ -237,51 +192,10 @@
                 }
             }
         }
-        LogicalVariable enforcedRecordVar = recordVar;
-
-        /*
-         * if the index is enforcing field types (For open indexes), We add a cast
-         * operator to ensure type safety
-         */
-        if (insertOp.getOperation() == Kind.INSERT || insertOp.getOperation() == Kind.UPSERT) {
-            try {
-                DatasetDataSource ds = (DatasetDataSource) (insertOp.getDataSource());
-                ARecordType insertRecType = (ARecordType) ds.getItemType();
-                // create the expected record type = the original + the optional open field
-                ARecordType enforcedType = createEnforcedType(insertRecType, indexes);
-                if (!enforcedType.equals(insertRecType)) {
-                    // A new variable which represents the casted record
-                    LogicalVariable castedRecVar = context.newVar();
-                    context.addNotToBeInlinedVar(castedRecVar);
-                    //introduce casting to enforced type
-                    AbstractFunctionCallExpression castFunc = new ScalarFunctionCallExpression(
-                            FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.CAST_TYPE));
-                    // The first argument is the record
-                    castFunc.getArguments()
-                            .add(new MutableObject<ILogicalExpression>(insertOp.getPayloadExpression().getValue()));
-                    TypeCastUtils.setRequiredAndInputTypes(castFunc, enforcedType, insertRecType);
-                    // AssignOperator puts in the cast var the casted record
-                    AssignOperator castedRecordAssignOperator =
-                            new AssignOperator(castedRecVar, new MutableObject<ILogicalExpression>(castFunc));
-                    // Connect the current top of the plan to the cast operator
-                    castedRecordAssignOperator.getInputs().addAll(currentTop.getInputs());
-                    currentTop.getInputs().clear();
-                    currentTop.getInputs().add(new MutableObject<>(castedRecordAssignOperator));
-                    enforcedRecordVar = castedRecVar;
-                    recType = enforcedType;
-                    context.computeAndSetTypeEnvironmentForOperator(castedRecordAssignOperator);
-                    context.computeAndSetTypeEnvironmentForOperator(currentTop);
-                    // We don't need to cast the old rec, we just need an assignment function that extracts the SK
-                    // and an expression which reference the new variables.
-                }
-            } catch (AsterixException e) {
-                throw new AlgebricksException(e);
-            }
-        }
 
         // Replicate Operator is applied only when doing the bulk-load.
-        AbstractLogicalOperator replicateOp = null;
-        if (secondaryIndexTotalCnt > 1 && insertOp.isBulkload()) {
+        ReplicateOperator replicateOp = null;
+        if (secondaryIndexTotalCnt > 1 && primaryIndexModificationOp.isBulkload()) {
             // Split the logical plan into "each secondary index update branch"
             // to replicate each <PK,RECORD> pair.
             replicateOp = new ReplicateOperator(secondaryIndexTotalCnt);
@@ -291,6 +205,50 @@
             currentTop = replicateOp;
         }
 
+        /*
+         * The two maps are used to store variables to which [casted] field access is assigned.
+         * One for the beforeOp record and the other for the new record.
+         * There are two uses for these maps:
+         * 1. used for shared fields in indexes with overlapping keys.
+         * 2. used for setting variables of secondary keys for each secondary index operator.
+         */
+        Map<IndexFieldId, LogicalVariable> fieldVarsForBeforeOperation = new HashMap<>();
+        Map<IndexFieldId, LogicalVariable> fieldVarsForNewRecord = new HashMap<>();
+        /*
+         * if the index is enforcing field types (For open indexes), We add a cast
+         * operator to ensure type safety
+         */
+        try {
+            if (primaryIndexModificationOp.getOperation() == Kind.INSERT
+                    || primaryIndexModificationOp.getOperation() == Kind.UPSERT
+                    /* Actually, delete should not be here but it is now until issue
+                     * https://issues.apache.org/jira/browse/ASTERIXDB-1507
+                     * is solved
+                     */
+                    || primaryIndexModificationOp.getOperation() == Kind.DELETE) {
+                injectFieldAccessesForIndexes(context, dataset, indexes, fieldVarsForNewRecord, recType,
+                        metaType, newRecordVar, newMetaVar, primaryIndexModificationOp, false);
+                if (replicateOp != null) {
+                    context.computeAndSetTypeEnvironmentForOperator(replicateOp);
+                }
+            }
+            if (primaryIndexModificationOp.getOperation() == Kind.UPSERT
+            /* Actually, delete should be here but it is not until issue
+             * https://issues.apache.org/jira/browse/ASTERIXDB-1507
+             * is solved
+             */) {
+                List<LogicalVariable> beforeOpMetaVars =
+                        primaryIndexModificationOp.getBeforeOpAdditionalNonFilteringVars();
+                LogicalVariable beforeOpMetaVar = beforeOpMetaVars == null ? null : beforeOpMetaVars.get(0);
+                currentTop =
+                        injectFieldAccessesForIndexes(context, dataset, indexes, fieldVarsForBeforeOperation, recType,
+                                metaType, primaryIndexModificationOp.getBeforeOpRecordVar(), beforeOpMetaVar,
+                                currentTop, true);
+            }
+        } catch (AsterixException e) {
+            throw new AlgebricksException(e);
+        }
+
         // Iterate each secondary index and applying Index Update operations.
         // At first, op1 is the index insert op insertOp
         for (Index index : indexes) {
@@ -300,88 +258,36 @@
             hasSecondaryIndex = true;
             // Get the secondary fields names and types
             List<List<String>> secondaryKeyFields = index.getKeyFieldNames();
-            List<IAType> secondaryKeyTypes = index.getKeyFieldTypes();
-            List<LogicalVariable> secondaryKeyVars = new ArrayList<LogicalVariable>();
-            List<Integer> indicators = index.getKeyFieldSourceIndicators();
-            List<Mutable<ILogicalExpression>> expressions = new ArrayList<Mutable<ILogicalExpression>>();
-            List<Mutable<ILogicalExpression>> secondaryExpressions = new ArrayList<Mutable<ILogicalExpression>>();
+            List<LogicalVariable> secondaryKeyVars = new ArrayList<>();
+            List<Mutable<ILogicalExpression>> secondaryExpressions = new ArrayList<>();
+            List<Mutable<ILogicalExpression>> beforeOpSecondaryExpressions = new ArrayList<>();
+            ILogicalOperator replicateOutput;
 
             for (int i = 0; i < secondaryKeyFields.size(); i++) {
-                List<String> secondaryKey = secondaryKeyFields.get(i);
-                ARecordType sourceType = recType;
-                LogicalVariable sourceVar = enforcedRecordVar;
-                if (dataset.hasMetaPart()) {
-                    sourceType = indicators.get(i).intValue() == 0 ? recType : metaType;
-                    sourceVar = indicators.get(i).intValue() == 0 ? enforcedRecordVar : metaVar;
+                IndexFieldId indexFieldId =
+                        new IndexFieldId(index.getKeyFieldSourceIndicators().get(i), secondaryKeyFields.get(i));
+                LogicalVariable skVar = fieldVarsForNewRecord.get(indexFieldId);
+                secondaryKeyVars.add(skVar);
+                secondaryExpressions.add(new MutableObject<ILogicalExpression>(
+                        new VariableReferenceExpression(skVar)));
+                if (primaryIndexModificationOp.getOperation() == Kind.UPSERT) {
+                    beforeOpSecondaryExpressions.add(new MutableObject<ILogicalExpression>(
+                            new VariableReferenceExpression(fieldVarsForBeforeOperation.get(indexFieldId))));
                 }
-                prepareVarAndExpression(secondaryKey, sourceType.getFieldNames(), sourceVar, expressions,
-                        secondaryKeyVars, context);
-            }
-            // Used with upsert operation
-            // in case of upsert, we need vars and expressions for the old SK as well.
-            List<LogicalVariable> prevSecondaryKeyVars = null;
-            List<Mutable<ILogicalExpression>> prevExpressions = null;
-            List<Mutable<ILogicalExpression>> prevSecondaryExpressions = null;
-            AssignOperator prevSecondaryKeyAssign = null;
-            if (insertOp.getOperation() == Kind.UPSERT) {
-                prevSecondaryKeyVars = new ArrayList<LogicalVariable>();
-                prevExpressions = new ArrayList<Mutable<ILogicalExpression>>();
-                prevSecondaryExpressions = new ArrayList<Mutable<ILogicalExpression>>();
-                for (int i = 0; i < secondaryKeyFields.size(); i++) {
-                    List<String> secondaryKey = secondaryKeyFields.get(i);
-                    prepareVarAndExpression(secondaryKey,
-                            (indicators.get(i).intValue() == 0) ? originalRecType.getFieldNames()
-                                    : metaType.getFieldNames(),
-                            (indicators.get(i).intValue() == 0) ? insertOp.getPrevRecordVar()
-                                    : insertOp.getPrevAdditionalNonFilteringVars().get(0),
-                            prevExpressions, prevSecondaryKeyVars, context);
-                }
-                prevSecondaryKeyAssign = new AssignOperator(prevSecondaryKeyVars, prevExpressions);
-            }
-            AssignOperator assign = new AssignOperator(secondaryKeyVars, expressions);
-            AssignOperator topAssign = assign;
-            if (insertOp.getOperation() == Kind.UPSERT) {
-                prevSecondaryKeyAssign.getInputs().add(new MutableObject<ILogicalOperator>(topAssign));
-                topAssign = prevSecondaryKeyAssign;
-            }
-            // Only apply replicate operator when doing bulk-load
-            if (secondaryIndexTotalCnt > 1 && insertOp.isBulkload()) {
-                assign.getInputs().add(new MutableObject<ILogicalOperator>(replicateOp));
-            } else {
-                assign.getInputs().add(new MutableObject<ILogicalOperator>(currentTop));
             }
 
-            context.computeAndSetTypeEnvironmentForOperator(assign);
-            if (insertOp.getOperation() == Kind.UPSERT) {
-                context.computeAndSetTypeEnvironmentForOperator(prevSecondaryKeyAssign);
-            }
-            currentTop = topAssign;
-
-            // in case of an Upsert operation, the currentTop is an assign which has the old secondary keys + the new secondary keys
-            if (index.getIndexType() == IndexType.BTREE || index.getIndexType() == IndexType.SINGLE_PARTITION_WORD_INVIX
-                    || index.getIndexType() == IndexType.SINGLE_PARTITION_NGRAM_INVIX
-                    || index.getIndexType() == IndexType.LENGTH_PARTITIONED_WORD_INVIX
-                    || index.getIndexType() == IndexType.LENGTH_PARTITIONED_NGRAM_INVIX) {
+            IndexInsertDeleteUpsertOperator indexUpdate;
+            if (index.getIndexType() != IndexType.RTREE) {
                 // Create an expression per key
-                for (LogicalVariable secondaryKeyVar : secondaryKeyVars) {
-                    secondaryExpressions.add(
-                            new MutableObject<ILogicalExpression>(new VariableReferenceExpression(secondaryKeyVar)));
-                }
-                Mutable<ILogicalExpression> filterExpression = null;
-                if (insertOp.getOperation() == Kind.UPSERT) {
-                    for (LogicalVariable oldSecondaryKeyVar : prevSecondaryKeyVars) {
-                        prevSecondaryExpressions.add(new MutableObject<ILogicalExpression>(
-                                new VariableReferenceExpression(oldSecondaryKeyVar)));
-                    }
-                } else {
-                    filterExpression = createFilterExpression(secondaryKeyVars,
-                            context.getOutputTypeEnvironment(currentTop), false);
-                }
+                Mutable<ILogicalExpression> filterExpression =
+                        (primaryIndexModificationOp.getOperation() == Kind.UPSERT) ? null
+                                : createFilterExpression(secondaryKeyVars,
+                                        context.getOutputTypeEnvironment(currentTop), index.isEnforcingKeyFileds());
                 AqlIndex dataSourceIndex = new AqlIndex(index, dataverseName, datasetName, mp);
 
                 // Introduce the TokenizeOperator only when doing bulk-load,
                 // and index type is keyword or n-gram.
-                if (index.getIndexType() != IndexType.BTREE && insertOp.isBulkload()) {
+                if (index.getIndexType() != IndexType.BTREE && primaryIndexModificationOp.isBulkload()) {
                     // Note: Bulk load case, we don't need to take care of it for upsert operation
                     // Check whether the index is length-partitioned or not.
                     // If partitioned, [input variables to TokenizeOperator,
@@ -391,27 +297,25 @@
                     // and fed into the IndexInsertDeleteOperator.
                     // Input variables are passed since TokenizeOperator is not an
                     // filtering operator.
-                    boolean isPartitioned = false;
-                    if (index.getIndexType() == IndexType.LENGTH_PARTITIONED_WORD_INVIX
-                            || index.getIndexType() == IndexType.LENGTH_PARTITIONED_NGRAM_INVIX) {
-                        isPartitioned = true;
-                    }
+                    boolean isPartitioned = index.getIndexType() == IndexType.LENGTH_PARTITIONED_WORD_INVIX
+                            || index.getIndexType() == IndexType.LENGTH_PARTITIONED_NGRAM_INVIX;
 
                     // Create a new logical variable - token
-                    List<LogicalVariable> tokenizeKeyVars = new ArrayList<LogicalVariable>();
-                    List<Mutable<ILogicalExpression>> tokenizeKeyExprs = new ArrayList<Mutable<ILogicalExpression>>();
+                    List<LogicalVariable> tokenizeKeyVars = new ArrayList<>();
+                    List<Mutable<ILogicalExpression>> tokenizeKeyExprs = new ArrayList<>();
                     LogicalVariable tokenVar = context.newVar();
                     tokenizeKeyVars.add(tokenVar);
                     tokenizeKeyExprs
                             .add(new MutableObject<ILogicalExpression>(new VariableReferenceExpression(tokenVar)));
 
                     // Check the field type of the secondary key.
-                    IAType secondaryKeyType = null;
-                    Pair<IAType, Boolean> keyPairType =
-                            Index.getNonNullableKeyFieldType(secondaryKeyFields.get(0), recType);
+                    IAType secondaryKeyType;
+                    Pair<IAType, Boolean> keyPairType = Index.getNonNullableOpenFieldType(
+                            index.getKeyFieldTypes().get(0), secondaryKeyFields.get(0),
+                            recType);
                     secondaryKeyType = keyPairType.first;
 
-                    List<Object> varTypes = new ArrayList<Object>();
+                    List<Object> varTypes = new ArrayList<>();
                     varTypes.add(NonTaggedFormatUtil.getTokenType(secondaryKeyType));
 
                     // If the index is a length-partitioned, then create
@@ -428,64 +332,53 @@
 
                     // TokenizeOperator to tokenize [SK, PK] pairs
                     TokenizeOperator tokenUpdate = new TokenizeOperator(dataSourceIndex,
-                            insertOp.getPrimaryKeyExpressions(), secondaryExpressions, tokenizeKeyVars,
-                            filterExpression, insertOp.getOperation(), insertOp.isBulkload(), isPartitioned, varTypes);
-                    tokenUpdate.getInputs().add(new MutableObject<ILogicalOperator>(assign));
+                            primaryIndexModificationOp.getPrimaryKeyExpressions(), secondaryExpressions,
+                            tokenizeKeyVars,
+                            filterExpression, primaryIndexModificationOp.getOperation(),
+                            primaryIndexModificationOp.isBulkload(), isPartitioned, varTypes);
+                    tokenUpdate.getInputs().add(new MutableObject<ILogicalOperator>(currentTop));
                     context.computeAndSetTypeEnvironmentForOperator(tokenUpdate);
-
-                    IndexInsertDeleteUpsertOperator indexUpdate =
-                            new IndexInsertDeleteUpsertOperator(dataSourceIndex, insertOp.getPrimaryKeyExpressions(),
-                                    tokenizeKeyExprs, filterExpression, insertOp.getOperation(), insertOp.isBulkload(),
-                                    insertOp.getAdditionalNonFilteringExpressions() == null ? 0
-                                            : insertOp.getAdditionalNonFilteringExpressions().size());
+                    replicateOutput = tokenUpdate;
+                    indexUpdate = new IndexInsertDeleteUpsertOperator(dataSourceIndex,
+                            primaryIndexModificationOp.getPrimaryKeyExpressions(), tokenizeKeyExprs, filterExpression,
+                            primaryIndexModificationOp.getOperation(), primaryIndexModificationOp.isBulkload(),
+                            primaryIndexModificationOp.getAdditionalNonFilteringExpressions() == null ? 0
+                                    : primaryIndexModificationOp.getAdditionalNonFilteringExpressions().size());
                     indexUpdate.setAdditionalFilteringExpressions(filteringExpressions);
                     indexUpdate.getInputs().add(new MutableObject<ILogicalOperator>(tokenUpdate));
-
-                    context.computeAndSetTypeEnvironmentForOperator(indexUpdate);
-
-                    currentTop = indexUpdate;
-                    op0.getInputs().add(new MutableObject<ILogicalOperator>(currentTop));
-
                 } else {
                     // When TokenizeOperator is not needed
-                    IndexInsertDeleteUpsertOperator indexUpdate =
-                            new IndexInsertDeleteUpsertOperator(dataSourceIndex, insertOp.getPrimaryKeyExpressions(),
-                                    secondaryExpressions, filterExpression, insertOp.getOperation(),
-                                    insertOp.isBulkload(), insertOp.getAdditionalNonFilteringExpressions() == null ? 0
-                                            : insertOp.getAdditionalNonFilteringExpressions().size());
-
+                    indexUpdate = new IndexInsertDeleteUpsertOperator(dataSourceIndex,
+                            primaryIndexModificationOp.getPrimaryKeyExpressions(), secondaryExpressions,
+                            filterExpression,
+                            primaryIndexModificationOp.getOperation(), primaryIndexModificationOp.isBulkload(),
+                            primaryIndexModificationOp.getAdditionalNonFilteringExpressions() == null ? 0
+                                    : primaryIndexModificationOp.getAdditionalNonFilteringExpressions().size());
                     indexUpdate.setAdditionalFilteringExpressions(filteringExpressions);
+                    replicateOutput = indexUpdate;
                     // We add the necessary expressions for upsert
-                    if (insertOp.getOperation() == Kind.UPSERT) {
-                        indexUpdate.setPrevSecondaryKeyExprs(prevSecondaryExpressions);
+                    if (primaryIndexModificationOp.getOperation() == Kind.UPSERT) {
+                        indexUpdate.setBeforeOpSecondaryKeyExprs(beforeOpSecondaryExpressions);
                         if (filteringFields != null) {
-                            indexUpdate.setPrevAdditionalFilteringExpression(new MutableObject<ILogicalExpression>(
-                                    new VariableReferenceExpression(insertOp.getPrevFilterVar())));
+                            indexUpdate.setBeforeOpAdditionalFilteringExpression(new MutableObject<ILogicalExpression>(
+                                    new VariableReferenceExpression(
+                                            primaryIndexModificationOp.getBeforeOpFilterVar())));
                         }
                     }
                     indexUpdate.getInputs().add(new MutableObject<ILogicalOperator>(currentTop));
-
-                    currentTop = indexUpdate;
-                    context.computeAndSetTypeEnvironmentForOperator(indexUpdate);
-
-                    if (insertOp.isBulkload()) {
-                        op0.getInputs().add(new MutableObject<ILogicalOperator>(currentTop));
-                    }
-
                 }
-
-            } else if (index.getIndexType() == IndexType.RTREE) {
+            } else {
                 // Get type, dimensions and number of keys
-                Pair<IAType, Boolean> keyPairType =
-                        Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(0), secondaryKeyFields.get(0), recType);
+                Pair<IAType, Boolean> keyPairType = Index.getNonNullableOpenFieldType(index.getKeyFieldTypes().get(0),
+                        secondaryKeyFields.get(0), recType);
                 IAType spatialType = keyPairType.first;
-                boolean isPointMBR =
-                        spatialType.getTypeTag() == ATypeTag.POINT || spatialType.getTypeTag() == ATypeTag.POINT3D;
+                boolean isPointMBR = spatialType.getTypeTag() == ATypeTag.POINT
+                        || spatialType.getTypeTag() == ATypeTag.POINT3D;
                 int dimension = NonTaggedFormatUtil.getNumDimensions(spatialType.getTypeTag());
                 int numKeys = (isPointMBR && isBulkload) ? dimension : dimension * 2;
                 // Get variables and expressions
-                List<LogicalVariable> keyVarList = new ArrayList<LogicalVariable>();
-                List<Mutable<ILogicalExpression>> keyExprList = new ArrayList<Mutable<ILogicalExpression>>();
+                List<LogicalVariable> keyVarList = new ArrayList<>();
+                List<Mutable<ILogicalExpression>> keyExprList = new ArrayList<>();
                 for (int i = 0; i < numKeys; i++) {
                     LogicalVariable keyVar = context.newVar();
                     keyVarList.add(keyVar);
@@ -499,6 +392,7 @@
                             new ConstantExpression(new AsterixConstantValue(new AInt32(i)))));
                     keyExprList.add(new MutableObject<ILogicalExpression>(createMBR));
                 }
+                secondaryExpressions.clear();
                 for (LogicalVariable secondaryKeyVar : keyVarList) {
                     secondaryExpressions.add(
                             new MutableObject<ILogicalExpression>(new VariableReferenceExpression(secondaryKeyVar)));
@@ -514,11 +408,12 @@
                 AssignOperator assignCoordinates = new AssignOperator(keyVarList, keyExprList);
                 assignCoordinates.getInputs().add(new MutableObject<ILogicalOperator>(currentTop));
                 context.computeAndSetTypeEnvironmentForOperator(assignCoordinates);
+                replicateOutput = assignCoordinates;
                 Mutable<ILogicalExpression> filterExpression = null;
                 AssignOperator originalAssignCoordinates = null;
-                // We do something similar for previous key if the operation is an upsert
-                if (insertOp.getOperation() == Kind.UPSERT) {
-                    List<LogicalVariable> originalKeyVarList = new ArrayList<LogicalVariable>();
+                // We do something similar for beforeOp key if the operation is an upsert
+                if (primaryIndexModificationOp.getOperation() == Kind.UPSERT) {
+                    List<LogicalVariable> originalKeyVarList = new ArrayList<>();
                     List<Mutable<ILogicalExpression>> originalKeyExprList = new ArrayList<>();
                     // we don't do any filtering since nulls are expected here and there
                     for (int i = 0; i < numKeys; i++) {
@@ -526,26 +421,18 @@
                         originalKeyVarList.add(keyVar);
                         AbstractFunctionCallExpression createMBR = new ScalarFunctionCallExpression(
                                 FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.CREATE_MBR));
-                        createMBR.getArguments().add(new MutableObject<ILogicalExpression>(
-                                new VariableReferenceExpression(prevSecondaryKeyVars.get(0))));
+                        createMBR.getArguments().add(beforeOpSecondaryExpressions.get(0));
                         createMBR.getArguments().add(new MutableObject<ILogicalExpression>(
                                 new ConstantExpression(new AsterixConstantValue(new AInt32(dimension)))));
                         createMBR.getArguments().add(new MutableObject<ILogicalExpression>(
                                 new ConstantExpression(new AsterixConstantValue(new AInt32(i)))));
                         originalKeyExprList.add(new MutableObject<ILogicalExpression>(createMBR));
                     }
+                    beforeOpSecondaryExpressions.clear();
                     for (LogicalVariable secondaryKeyVar : originalKeyVarList) {
-                        prevSecondaryExpressions.add(new MutableObject<ILogicalExpression>(
+                        beforeOpSecondaryExpressions.add(new MutableObject<ILogicalExpression>(
                                 new VariableReferenceExpression(secondaryKeyVar)));
                     }
-                    if (isPointMBR && isBulkload) {
-                        //for PointMBR optimization: see SecondaryRTreeOperationsHelper.buildLoadingJobSpec() and
-                        //createFieldPermutationForBulkLoadOp(int) for more details.
-                        for (LogicalVariable secondaryKeyVar : originalKeyVarList) {
-                            prevSecondaryExpressions.add(new MutableObject<ILogicalExpression>(
-                                    new VariableReferenceExpression(secondaryKeyVar)));
-                        }
-                    }
                     originalAssignCoordinates = new AssignOperator(originalKeyVarList, originalKeyExprList);
                     originalAssignCoordinates.getInputs().add(new MutableObject<ILogicalOperator>(assignCoordinates));
                     context.computeAndSetTypeEnvironmentForOperator(originalAssignCoordinates);
@@ -557,33 +444,35 @@
                             context.getOutputTypeEnvironment(assignCoordinates), forceFilter);
                 }
                 AqlIndex dataSourceIndex = new AqlIndex(index, dataverseName, datasetName, mp);
-                IndexInsertDeleteUpsertOperator indexUpdate =
-                        new IndexInsertDeleteUpsertOperator(dataSourceIndex, insertOp.getPrimaryKeyExpressions(),
-                                secondaryExpressions, filterExpression, insertOp.getOperation(), insertOp.isBulkload(),
-                                insertOp.getAdditionalNonFilteringExpressions() == null ? 0
-                                        : insertOp.getAdditionalNonFilteringExpressions().size());
+                indexUpdate = new IndexInsertDeleteUpsertOperator(dataSourceIndex,
+                        primaryIndexModificationOp.getPrimaryKeyExpressions(), secondaryExpressions, filterExpression,
+                        primaryIndexModificationOp.getOperation(), primaryIndexModificationOp.isBulkload(),
+                        primaryIndexModificationOp.getAdditionalNonFilteringExpressions() == null ? 0
+                                : primaryIndexModificationOp.getAdditionalNonFilteringExpressions().size());
                 indexUpdate.setAdditionalFilteringExpressions(filteringExpressions);
-                if (insertOp.getOperation() == Kind.UPSERT) {
-                    // set old secondary key expressions
+                if (primaryIndexModificationOp.getOperation() == Kind.UPSERT) {
+                    // set before op secondary key expressions
                     if (filteringFields != null) {
-                        indexUpdate.setPrevAdditionalFilteringExpression(new MutableObject<ILogicalExpression>(
-                                new VariableReferenceExpression(insertOp.getPrevFilterVar())));
+                        indexUpdate.setBeforeOpAdditionalFilteringExpression(new MutableObject<ILogicalExpression>(
+                                new VariableReferenceExpression(primaryIndexModificationOp.getBeforeOpFilterVar())));
                     }
                     // set filtering expressions
-                    indexUpdate.setPrevSecondaryKeyExprs(prevSecondaryExpressions);
-                    // assign --> assign previous values --> secondary index upsert
+                    indexUpdate.setBeforeOpSecondaryKeyExprs(beforeOpSecondaryExpressions);
+                    // assign --> assign beforeOp values --> secondary index upsert
                     indexUpdate.getInputs().add(new MutableObject<ILogicalOperator>(originalAssignCoordinates));
                 } else {
                     indexUpdate.getInputs().add(new MutableObject<ILogicalOperator>(assignCoordinates));
                 }
+            }
+            context.computeAndSetTypeEnvironmentForOperator(indexUpdate);
+            if (!primaryIndexModificationOp.isBulkload() || secondaryIndexTotalCnt == 1) {
                 currentTop = indexUpdate;
-                context.computeAndSetTypeEnvironmentForOperator(indexUpdate);
-
-                if (insertOp.isBulkload()) {
-                    // For bulk load, we connect all fanned out insert operator to a single SINK operator
-                    op0.getInputs().add(new MutableObject<ILogicalOperator>(currentTop));
-                }
-
+            } else {
+                replicateOp.getOutputs().add(new MutableObject<>(replicateOutput));
+            }
+            if (primaryIndexModificationOp.isBulkload()) {
+                // For bulk load, we connect all fanned out insert operator to a single SINK operator
+                sinkOp.getInputs().add(new MutableObject<ILogicalOperator>(indexUpdate));
             }
 
         }
@@ -591,183 +480,181 @@
             return false;
         }
 
-        if (!insertOp.isBulkload()) {
+        if (!primaryIndexModificationOp.isBulkload()) {
             // If this is an upsert, we need to
             // Remove the current input to the SINK operator (It is actually already removed above)
-            op0.getInputs().clear();
+            sinkOp.getInputs().clear();
             // Connect the last index update to the SINK
-            op0.getInputs().add(new MutableObject<ILogicalOperator>(currentTop));
+            sinkOp.getInputs().add(new MutableObject<ILogicalOperator>(currentTop));
         }
         return true;
     }
 
-    // Merges typed index fields with specified recordType, allowing indexed fields to be optional.
-    // I.e. the type { "personId":int32, "name": string, "address" : { "street": string } } with typed indexes on age:int32, address.state:string
-    //      will be merged into type { "personId":int32, "name": string, "age": int32? "address" : { "street": string, "state": string? } }
-    // Used by open indexes to enforce the type of an indexed record
-    public static ARecordType createEnforcedType(ARecordType initialType, List<Index> indexes)
-            throws AsterixException, AlgebricksException {
-        ARecordType enforcedType = initialType;
+    private LogicalVariable getRecordVar(IOptimizationContext context, AbstractLogicalOperator inputOp,
+            ILogicalExpression recordExpr,
+            int expectedRecordIndex) throws AlgebricksException {
+        if (exprIsRecord(context.getOutputTypeEnvironment(inputOp), recordExpr)) {
+            return ((VariableReferenceExpression) recordExpr).getVariableReference();
+        } else {
+            /**
+             * For the case primary key-assignment expressions are constant
+             * expressions, find assign op that creates record to be
+             * inserted/deleted.
+             */
+            FunctionIdentifier fid = null;
+            AbstractLogicalOperator currentInputOp = inputOp;
+            while (fid != AsterixBuiltinFunctions.OPEN_RECORD_CONSTRUCTOR) {
+                if (currentInputOp.getInputs().isEmpty()) {
+                    return null;
+                }
+                currentInputOp = (AbstractLogicalOperator) currentInputOp.getInputs().get(0).getValue();
+                if (currentInputOp.getOperatorTag() != LogicalOperatorTag.ASSIGN) {
+                    continue;
+                }
+                AssignOperator assignOp = (AssignOperator) currentInputOp;
+                ILogicalExpression assignExpr = assignOp.getExpressions().get(expectedRecordIndex).getValue();
+                if (assignExpr.getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) {
+                    ScalarFunctionCallExpression funcExpr = (ScalarFunctionCallExpression) assignOp.getExpressions()
+                            .get(expectedRecordIndex).getValue();
+                    fid = funcExpr.getFunctionIdentifier();
+                }
+            }
+            return ((AssignOperator) currentInputOp).getVariables().get(0);
+        }
+    }
+
+    private boolean exprIsRecord(IVariableTypeEnvironment typeEnvironment, ILogicalExpression recordExpr)
+            throws AlgebricksException {
+        if (recordExpr.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
+            IAType type = (IAType) typeEnvironment.getType(recordExpr);
+            return type != null && type.getTypeTag() == ATypeTag.RECORD;
+        }
+        return false;
+    }
+
+    private ILogicalOperator injectFieldAccessesForIndexes(IOptimizationContext context, Dataset dataset,
+            List<Index> indexes, Map<IndexFieldId, LogicalVariable> fieldAccessVars, ARecordType recType,
+            ARecordType metaType, LogicalVariable recordVar, LogicalVariable metaVar, ILogicalOperator currentTop,
+            boolean afterOp) throws AlgebricksException {
+        List<LogicalVariable> vars = new ArrayList<>();
+        List<Mutable<ILogicalExpression>> exprs = new ArrayList<>();
         for (Index index : indexes) {
-            if (!index.isSecondaryIndex() || !index.isEnforcingKeyFileds()) {
+            if (index.isPrimaryIndex()) {
                 continue;
             }
-            if (index.hasMetaFields()) {
-                throw new AlgebricksException("Indexing an open field is only supported on the record part");
-            }
+            List<IAType> skTypes = index.getKeyFieldTypes();
+            List<List<String>> skNames = index.getKeyFieldNames();
+            List<Integer> indicators = index.getKeyFieldSourceIndicators();
             for (int i = 0; i < index.getKeyFieldNames().size(); i++) {
-                Stack<Pair<ARecordType, String>> nestedTypeStack = new Stack<Pair<ARecordType, String>>();
-                List<String> splits = index.getKeyFieldNames().get(i);
-                ARecordType nestedFieldType = enforcedType;
-                boolean openRecords = false;
-                String bridgeName = nestedFieldType.getTypeName();
-                int j;
-                // Build the stack for the enforced type
-                for (j = 1; j < splits.size(); j++) {
-                    nestedTypeStack.push(new Pair<ARecordType, String>(nestedFieldType, splits.get(j - 1)));
-                    bridgeName = nestedFieldType.getTypeName();
-                    nestedFieldType = (ARecordType) enforcedType.getSubFieldType(splits.subList(0, j));
-                    if (nestedFieldType == null) {
-                        openRecords = true;
-                        break;
-                    }
+                IndexFieldId indexFieldId = new IndexFieldId(indicators.get(i), skNames.get(i));
+                if (fieldAccessVars.containsKey(indexFieldId)) {
+                    // already handled in a different index
+                    continue;
                 }
-                if (openRecords == true) {
-                    // create the smallest record
-                    enforcedType = new ARecordType(splits.get(splits.size() - 2),
-                            new String[] { splits.get(splits.size() - 1) },
-                            new IAType[] { AUnionType.createUnknownableType(index.getKeyFieldTypes().get(i)) }, true);
-                    // create the open part of the nested field
-                    for (int k = splits.size() - 3; k > (j - 2); k--) {
-                        enforcedType = new ARecordType(splits.get(k), new String[] { splits.get(k + 1) },
-                                new IAType[] { AUnionType.createUnknownableType(enforcedType) }, true);
-                    }
-                    // Bridge the gap
-                    Pair<ARecordType, String> gapPair = nestedTypeStack.pop();
-                    ARecordType parent = gapPair.first;
-
-                    IAType[] parentFieldTypes = ArrayUtils.addAll(parent.getFieldTypes().clone(),
-                            new IAType[] { AUnionType.createUnknownableType(enforcedType) });
-                    enforcedType = new ARecordType(bridgeName,
-                            ArrayUtils.addAll(parent.getFieldNames(), enforcedType.getTypeName()), parentFieldTypes,
-                            true);
+                ARecordType sourceType = dataset.hasMetaPart()
+                        ? indicators.get(i).intValue() == Index.RECORD_INDICATOR ? recType : metaType : recType;
+                LogicalVariable sourceVar = dataset.hasMetaPart()
+                        ? indicators.get(i).intValue() == Index.RECORD_INDICATOR ? recordVar : metaVar
+                        : recordVar;
+                LogicalVariable fieldVar = context.newVar();
+                // create record variable ref
+                Mutable<ILogicalExpression> varRef =
+                        new MutableObject<>(new VariableReferenceExpression(sourceVar));
+                IAType fieldType = sourceType.getSubFieldType(indexFieldId.fieldName);
+                AbstractFunctionCallExpression theFieldAccessFunc;
+                if (fieldType == null) {
+                    // Open field. must prevent inlining to maintain the cast before the primaryOp and
+                    // make handling of records with incorrect value type for this field easier and cleaner
+                    context.addNotToBeInlinedVar(fieldVar);
+                    // create field access
+                    AbstractFunctionCallExpression fieldAccessFunc =
+                            getOpenOrNestedFieldAccessFunction(varRef, indexFieldId.fieldName);
+                    // create cast
+                    theFieldAccessFunc = new ScalarFunctionCallExpression(
+                            FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.CAST_TYPE));
+                    // The first argument is the field
+                    theFieldAccessFunc.getArguments()
+                            .add(new MutableObject<ILogicalExpression>(fieldAccessFunc));
+                    TypeCastUtils.setRequiredAndInputTypes(theFieldAccessFunc, skTypes.get(i),
+                            BuiltinType.ANY);
                 } else {
-                    //Schema is closed all the way to the field
-                    //enforced fields are either null or strongly typed
-                    LinkedHashMap<String, IAType> recordNameTypesMap = createRecordNameTypeMap(nestedFieldType);
-                    // if a an enforced field already exists and the type is correct
-                    IAType enforcedFieldType = recordNameTypesMap.get(splits.get(splits.size() - 1));
-                    if (enforcedFieldType != null && enforcedFieldType.getTypeTag() == ATypeTag.UNION
-                            && ((AUnionType) enforcedFieldType).isUnknownableType()) {
-                        enforcedFieldType = ((AUnionType) enforcedFieldType).getActualType();
-                    }
-                    if (enforcedFieldType != null && !ATypeHierarchy.canPromote(enforcedFieldType.getTypeTag(),
-                            index.getKeyFieldTypes().get(i).getTypeTag())) {
-                        throw new AlgebricksException("Cannot enforce field " + index.getKeyFieldNames().get(i)
-                                + " to have type " + index.getKeyFieldTypes().get(i));
-                    }
-                    if (enforcedFieldType == null) {
-                        recordNameTypesMap.put(splits.get(splits.size() - 1),
-                                AUnionType.createUnknownableType(index.getKeyFieldTypes().get(i)));
-                    }
-                    enforcedType = new ARecordType(nestedFieldType.getTypeName(),
-                            recordNameTypesMap.keySet().toArray(new String[recordNameTypesMap.size()]),
-                            recordNameTypesMap.values().toArray(new IAType[recordNameTypesMap.size()]),
-                            nestedFieldType.isOpen());
+                    // Get the desired field position
+                    int pos = indexFieldId.fieldName.size() > 1 ? -1
+                            : sourceType.getFieldIndex(indexFieldId.fieldName.get(0));
+                    // Field not found --> This is either an open field or a nested field. it can't be accessed by index
+                    theFieldAccessFunc =
+                            (pos == -1) ? getOpenOrNestedFieldAccessFunction(varRef, indexFieldId.fieldName)
+                                    : getClosedFieldAccessFunction(varRef, pos);
                 }
-
-                // Create the enforcedtype for the nested fields in the schema, from the ground up
-                if (nestedTypeStack.size() > 0) {
-                    while (!nestedTypeStack.isEmpty()) {
-                        Pair<ARecordType, String> nestedTypePair = nestedTypeStack.pop();
-                        ARecordType nestedRecType = nestedTypePair.first;
-                        IAType[] nestedRecTypeFieldTypes = nestedRecType.getFieldTypes().clone();
-                        nestedRecTypeFieldTypes[nestedRecType.getFieldIndex(nestedTypePair.second)] = enforcedType;
-                        enforcedType = new ARecordType(nestedRecType.getTypeName() + "_enforced",
-                                nestedRecType.getFieldNames(), nestedRecTypeFieldTypes, nestedRecType.isOpen());
-                    }
-                }
+                vars.add(fieldVar);
+                exprs.add(new MutableObject<ILogicalExpression>(theFieldAccessFunc));
+                fieldAccessVars.put(indexFieldId, fieldVar);
             }
         }
-        return enforcedType;
+        // AssignOperator assigns secondary keys to their vars
+        AssignOperator castedFieldAssignOperator = new AssignOperator(vars, exprs);
+        return introduceNewOp(context, currentTop, castedFieldAssignOperator, afterOp);
     }
 
-    private static LinkedHashMap<String, IAType> createRecordNameTypeMap(ARecordType nestedFieldType) {
-        LinkedHashMap<String, IAType> recordNameTypesMap = new LinkedHashMap<>();
-        for (int j = 0; j < nestedFieldType.getFieldNames().length; j++) {
-            recordNameTypesMap.put(nestedFieldType.getFieldNames()[j], nestedFieldType.getFieldTypes()[j]);
-        }
-        return recordNameTypesMap;
-    }
-
-    /***
-     * This method takes a list of {fields}: a subset of {recordFields}, the original record variable
-     * and populate expressions with expressions which evaluate to those fields (using field access functions) and
-     * variables to represent them
-     *
-     * @param fields
-     *            desired fields
-     * @param recordFields
-     *            all the record fields
-     * @param recordVar
-     *            the record variable
-     * @param expressions
-     * @param vars
-     * @param context
-     * @throws AlgebricksException
-     */
-    @SuppressWarnings("unchecked")
-    private void prepareVarAndExpression(List<String> fields, String[] recordFields, LogicalVariable recordVar,
-            List<Mutable<ILogicalExpression>> expressions, List<LogicalVariable> vars, IOptimizationContext context)
-            throws AlgebricksException {
-        // Get a reference to the record variable
-        Mutable<ILogicalExpression> varRef = new MutableObject<>(new VariableReferenceExpression(recordVar));
-        // Get the desired field position
-        int pos = -1;
-        if (fields.size() == 1) {
-            for (int j = 0; j < recordFields.length; j++) {
-                if (recordFields[j].equals(fields.get(0))) {
-                    pos = j;
-                    break;
-                }
-            }
-        }
-        // Field not found --> This is either an open field or a nested field. it can't be accessed by index
-        AbstractFunctionCallExpression func;
-        if (pos == -1) {
-            if (fields.size() > 1) {
-                AOrderedList fieldList = new AOrderedList(new AOrderedListType(BuiltinType.ASTRING, null));
-                for (int i = 0; i < fields.size(); i++) {
-                    fieldList.add(new AString(fields.get(i)));
-                }
-                Mutable<ILogicalExpression> fieldRef = new MutableObject<ILogicalExpression>(
-                        new ConstantExpression(new AsterixConstantValue(fieldList)));
-                // Create an expression for the nested case
-                func = new ScalarFunctionCallExpression(
-                        FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.FIELD_ACCESS_NESTED), varRef, fieldRef);
-            } else {
-                Mutable<ILogicalExpression> fieldRef = new MutableObject<ILogicalExpression>(
-                        new ConstantExpression(new AsterixConstantValue(new AString(fields.get(0)))));
-                // Create an expression for the open field case (By name)
-                func = new ScalarFunctionCallExpression(
-                        FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.FIELD_ACCESS_BY_NAME), varRef, fieldRef);
-            }
+    private static ILogicalOperator introduceNewOp(IOptimizationContext context, ILogicalOperator currentTopOp,
+            ILogicalOperator newOp, boolean afterOp) throws AlgebricksException {
+        if (afterOp) {
+            newOp.getInputs().add(new MutableObject<>(currentTopOp));
+            context.computeAndSetTypeEnvironmentForOperator(newOp);
+            return newOp;
         } else {
-            // Assumes the indexed field is in the closed portion of the type.
-            Mutable<ILogicalExpression> indexRef = new MutableObject<ILogicalExpression>(
-                    new ConstantExpression(new AsterixConstantValue(new AInt32(pos))));
-            func = new ScalarFunctionCallExpression(
-                    FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.FIELD_ACCESS_BY_INDEX), varRef, indexRef);
+            newOp.getInputs().addAll(currentTopOp.getInputs());
+            currentTopOp.getInputs().clear();
+            currentTopOp.getInputs().add(new MutableObject<>(newOp));
+            context.computeAndSetTypeEnvironmentForOperator(newOp);
+            context.computeAndSetTypeEnvironmentForOperator(currentTopOp);
+            return currentTopOp;
         }
-        expressions.add(new MutableObject<ILogicalExpression>(func));
-        LogicalVariable newVar = context.newVar();
-        vars.add(newVar);
     }
 
-    @SuppressWarnings("unchecked")
+    private static AbstractFunctionCallExpression getClosedFieldAccessFunction(Mutable<ILogicalExpression> varRef,
+            int position) {
+        Mutable<ILogicalExpression> indexRef = new MutableObject<>(
+                new ConstantExpression(new AsterixConstantValue(new AInt32(position))));
+        return new ScalarFunctionCallExpression(
+                FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.FIELD_ACCESS_BY_INDEX), varRef, indexRef);
+    }
+
+    private static AbstractFunctionCallExpression getOpenOrNestedFieldAccessFunction(Mutable<ILogicalExpression> varRef,
+            List<String> fields) {
+        ScalarFunctionCallExpression func;
+        if (fields.size() > 1) {
+            IAObject fieldList = stringListToAOrderedList(fields);
+            Mutable<ILogicalExpression> fieldRef = constantToMutableLogicalExpression(fieldList);
+            // Create an expression for the nested case
+            func = new ScalarFunctionCallExpression(
+                    FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.FIELD_ACCESS_NESTED), varRef, fieldRef);
+        } else {
+            IAObject fieldList = new AString(fields.get(0));
+            Mutable<ILogicalExpression> fieldRef = constantToMutableLogicalExpression(fieldList);
+            // Create an expression for the open field case (By name)
+            func = new ScalarFunctionCallExpression(
+                    FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.FIELD_ACCESS_BY_NAME), varRef, fieldRef);
+        }
+        return func;
+    }
+
+    private static AOrderedList stringListToAOrderedList(List<String> fields) {
+        AOrderedList fieldList = new AOrderedList(new AOrderedListType(BuiltinType.ASTRING, null));
+        for (int i = 0; i < fields.size(); i++) {
+            fieldList.add(new AString(fields.get(i)));
+        }
+        return fieldList;
+    }
+
+    private static Mutable<ILogicalExpression> constantToMutableLogicalExpression(IAObject constantObject) {
+        return new MutableObject<>(
+                new ConstantExpression(new AsterixConstantValue(constantObject)));
+    }
+
     private Mutable<ILogicalExpression> createFilterExpression(List<LogicalVariable> secondaryKeyVars,
             IVariableTypeEnvironment typeEnv, boolean forceFilter) throws AlgebricksException {
-        List<Mutable<ILogicalExpression>> filterExpressions = new ArrayList<Mutable<ILogicalExpression>>();
+        List<Mutable<ILogicalExpression>> filterExpressions = new ArrayList<>();
         // Add 'is not null' to all nullable secondary index keys as a filtering
         // condition.
         for (LogicalVariable secondaryKeyVar : secondaryKeyVars) {
@@ -775,26 +662,50 @@
             if (!NonTaggedFormatUtil.isOptional(secondaryKeyType) && !forceFilter) {
                 continue;
             }
-            ScalarFunctionCallExpression isUnknownFuncExpr =
-                    new ScalarFunctionCallExpression(FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.IS_UNKOWN),
-                            new MutableObject<ILogicalExpression>(new VariableReferenceExpression(secondaryKeyVar)));
-            ScalarFunctionCallExpression notFuncExpr =
-                    new ScalarFunctionCallExpression(FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.NOT),
-                            new MutableObject<ILogicalExpression>(isUnknownFuncExpr));
+            ScalarFunctionCallExpression isUnknownFuncExpr = new ScalarFunctionCallExpression(
+                    FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.IS_UNKOWN),
+                    new MutableObject<ILogicalExpression>(new VariableReferenceExpression(secondaryKeyVar)));
+            ScalarFunctionCallExpression notFuncExpr = new ScalarFunctionCallExpression(
+                    FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.NOT),
+                    new MutableObject<ILogicalExpression>(isUnknownFuncExpr));
             filterExpressions.add(new MutableObject<ILogicalExpression>(notFuncExpr));
         }
         // No nullable secondary keys.
         if (filterExpressions.isEmpty()) {
             return null;
         }
-        Mutable<ILogicalExpression> filterExpression = null;
+        Mutable<ILogicalExpression> filterExpression;
         if (filterExpressions.size() > 1) {
             // Create a conjunctive condition.
-            filterExpression = new MutableObject<ILogicalExpression>(new ScalarFunctionCallExpression(
+            filterExpression = new MutableObject<>(new ScalarFunctionCallExpression(
                     FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.AND), filterExpressions));
         } else {
             filterExpression = filterExpressions.get(0);
         }
         return filterExpression;
     }
+
+    private class IndexFieldId {
+        private int indicator;
+        private List<String> fieldName;
+
+        public IndexFieldId(int indicator, List<String> fieldName) {
+            this.indicator = indicator;
+            this.fieldName = fieldName;
+        }
+
+        @Override
+        public int hashCode() {
+            return 31 * indicator + fieldName.hashCode();
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (o instanceof IndexFieldId) {
+                IndexFieldId oIndexFieldId = (IndexFieldId) o;
+                return indicator == oIndexFieldId.indicator && fieldName.equals(oIndexFieldId.fieldName);
+            }
+            return false;
+        }
+    }
 }
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ReplaceSinkOpWithCommitOpRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ReplaceSinkOpWithCommitOpRule.java
index 8aefd1a..0c36d0b 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ReplaceSinkOpWithCommitOpRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ReplaceSinkOpWithCommitOpRule.java
@@ -110,7 +110,7 @@
                                 FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.IS_MISSING));
                         // argument is the previous record
                         isPrevMissingFunc.getArguments().add(new MutableObject<ILogicalExpression>(
-                                new VariableReferenceExpression(insertDeleteUpsertOperator.getPrevRecordVar())));
+                                new VariableReferenceExpression(insertDeleteUpsertOperator.getBeforeOpRecordVar())));
                         orFunc.getArguments().add(new MutableObject<ILogicalExpression>(isPrevMissingFunc));
                         orFunc.getArguments().add(new MutableObject<ILogicalExpression>(isNewMissingFunc));
 
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/typecast/StaticTypeCastUtil.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/typecast/StaticTypeCastUtil.java
index eac35cd..ba79534 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/typecast/StaticTypeCastUtil.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/typecast/StaticTypeCastUtil.java
@@ -506,9 +506,10 @@
      *            the expression reference
      * @param argExpr
      *            the original expression
+     * @throws AlgebricksException
      */
     private static void injectCastFunction(IFunctionInfo funcInfo, IAType reqType, IAType inputType,
-            Mutable<ILogicalExpression> exprRef, ILogicalExpression argExpr) {
+            Mutable<ILogicalExpression> exprRef, ILogicalExpression argExpr) throws AlgebricksException {
         ScalarFunctionCallExpression cast = new ScalarFunctionCallExpression(funcInfo);
         cast.getArguments().add(new MutableObject<ILogicalExpression>(argExpr));
         exprRef.setValue(cast);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index b430807..90bf6f6 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -25,10 +25,13 @@
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.rmi.RemoteException;
+import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Date;
+import java.util.Deque;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -152,9 +155,10 @@
 import org.apache.asterix.metadata.utils.MetadataLockManager;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.AUnionType;
 import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.om.types.TypeSignature;
-import org.apache.asterix.optimizer.rules.IntroduceSecondaryIndexInsertDeleteRule;
+import org.apache.asterix.om.types.hierachy.ATypeHierarchy;
 import org.apache.asterix.runtime.util.AsterixAppContextInfo;
 import org.apache.asterix.runtime.util.AsterixClusterProperties;
 import org.apache.asterix.transaction.management.service.transaction.DatasetIdFactory;
@@ -175,6 +179,7 @@
 import org.apache.asterix.translator.util.ValidateUtil;
 import org.apache.asterix.util.FlushDatasetUtils;
 import org.apache.asterix.util.JobUtils;
+import org.apache.commons.lang3.ArrayUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.mutable.MutableBoolean;
 import org.apache.commons.lang3.mutable.MutableObject;
@@ -976,7 +981,7 @@
 
             ARecordType enforcedType = null;
             if (stmtCreateIndex.isEnforced()) {
-                enforcedType = IntroduceSecondaryIndexInsertDeleteRule.createEnforcedType(aRecordType,
+                enforcedType = createEnforcedType(aRecordType,
                         Lists.newArrayList(index));
             }
 
@@ -2473,7 +2478,7 @@
                     dataverseName);
             jobsToExecute.add(DatasetOperations.compactDatasetJobSpec(dataverse, datasetName, metadataProvider));
             ARecordType aRecordType = (ARecordType) dt.getDatatype();
-            ARecordType enforcedType = IntroduceSecondaryIndexInsertDeleteRule.createEnforcedType(
+            ARecordType enforcedType = createEnforcedType(
                     aRecordType, indexes);
             if (ds.getDatasetType() == DatasetType.INTERNAL) {
                 for (int j = 0; j < indexes.size(); j++) {
@@ -3124,4 +3129,105 @@
         rewriter.rewrite(stmt);
     }
 
+    /*
+     * Merges typed index fields with specified recordType, allowing indexed fields to be optional.
+     * I.e. the type { "personId":int32, "name": string, "address" : { "street": string } } with typed indexes
+     * on age:int32, address.state:string will be merged into type { "personId":int32, "name": string,
+     * "age": int32? "address" : { "street": string, "state": string? } } Used by open indexes to enforce
+     * the type of an indexed record
+     */
+    private static ARecordType createEnforcedType(ARecordType initialType, List<Index> indexes)
+            throws AlgebricksException {
+        ARecordType enforcedType = initialType;
+        for (Index index : indexes) {
+            if (!index.isSecondaryIndex() || !index.isEnforcingKeyFileds()) {
+                continue;
+            }
+            if (index.hasMetaFields()) {
+                throw new AlgebricksException("Indexing an open field is only supported on the record part");
+            }
+            for (int i = 0; i < index.getKeyFieldNames().size(); i++) {
+                Deque<Pair<ARecordType, String>> nestedTypeStack = new ArrayDeque<>();
+                List<String> splits = index.getKeyFieldNames().get(i);
+                ARecordType nestedFieldType = enforcedType;
+                boolean openRecords = false;
+                String bridgeName = nestedFieldType.getTypeName();
+                int j;
+                // Build the stack for the enforced type
+                for (j = 1; j < splits.size(); j++) {
+                    nestedTypeStack.push(new Pair<ARecordType, String>(nestedFieldType, splits.get(j - 1)));
+                    bridgeName = nestedFieldType.getTypeName();
+                    nestedFieldType = (ARecordType) enforcedType.getSubFieldType(splits.subList(0, j));
+                    if (nestedFieldType == null) {
+                        openRecords = true;
+                        break;
+                    }
+                }
+                if (openRecords) {
+                    // create the smallest record
+                    enforcedType = new ARecordType(splits.get(splits.size() - 2),
+                            new String[] { splits.get(splits.size() - 1) },
+                            new IAType[] { AUnionType.createUnknownableType(index.getKeyFieldTypes().get(i)) }, true);
+                    // create the open part of the nested field
+                    for (int k = splits.size() - 3; k > (j - 2); k--) {
+                        enforcedType = new ARecordType(splits.get(k), new String[] { splits.get(k + 1) },
+                                new IAType[] { AUnionType.createUnknownableType(enforcedType) }, true);
+                    }
+                    // Bridge the gap
+                    Pair<ARecordType, String> gapPair = nestedTypeStack.pop();
+                    ARecordType parent = gapPair.first;
+
+                    IAType[] parentFieldTypes = ArrayUtils.addAll(parent.getFieldTypes().clone(),
+                            new IAType[] { AUnionType.createUnknownableType(enforcedType) });
+                    enforcedType = new ARecordType(bridgeName,
+                            ArrayUtils.addAll(parent.getFieldNames(), enforcedType.getTypeName()), parentFieldTypes,
+                            true);
+                } else {
+                    //Schema is closed all the way to the field
+                    //enforced fields are either null or strongly typed
+                    LinkedHashMap<String, IAType> recordNameTypesMap = createRecordNameTypeMap(nestedFieldType);
+                    // if a an enforced field already exists and the type is correct
+                    IAType enforcedFieldType = recordNameTypesMap.get(splits.get(splits.size() - 1));
+                    if (enforcedFieldType != null && enforcedFieldType.getTypeTag() == ATypeTag.UNION
+                            && ((AUnionType) enforcedFieldType).isUnknownableType()) {
+                        enforcedFieldType = ((AUnionType) enforcedFieldType).getActualType();
+                    }
+                    if (enforcedFieldType != null && !ATypeHierarchy.canPromote(enforcedFieldType.getTypeTag(),
+                            index.getKeyFieldTypes().get(i).getTypeTag())) {
+                        throw new AlgebricksException("Cannot enforce field " + index.getKeyFieldNames().get(i)
+                                + " to have type " + index.getKeyFieldTypes().get(i));
+                    }
+                    if (enforcedFieldType == null) {
+                        recordNameTypesMap.put(splits.get(splits.size() - 1),
+                                AUnionType.createUnknownableType(index.getKeyFieldTypes().get(i)));
+                    }
+                    enforcedType = new ARecordType(nestedFieldType.getTypeName(),
+                            recordNameTypesMap.keySet().toArray(new String[recordNameTypesMap.size()]),
+                            recordNameTypesMap.values().toArray(new IAType[recordNameTypesMap.size()]),
+                            nestedFieldType.isOpen());
+                }
+
+                // Create the enforced type for the nested fields in the schema, from the ground up
+                if (!nestedTypeStack.isEmpty()) {
+                    while (!nestedTypeStack.isEmpty()) {
+                        Pair<ARecordType, String> nestedTypePair = nestedTypeStack.pop();
+                        ARecordType nestedRecType = nestedTypePair.first;
+                        IAType[] nestedRecTypeFieldTypes = nestedRecType.getFieldTypes().clone();
+                        nestedRecTypeFieldTypes[nestedRecType.getFieldIndex(nestedTypePair.second)] = enforcedType;
+                        enforcedType = new ARecordType(nestedRecType.getTypeName() + "_enforced",
+                                nestedRecType.getFieldNames(), nestedRecTypeFieldTypes, nestedRecType.isOpen());
+                    }
+                }
+            }
+        }
+        return enforcedType;
+    }
+
+    private static LinkedHashMap<String, IAType> createRecordNameTypeMap(ARecordType nestedFieldType) {
+        LinkedHashMap<String, IAType> recordNameTypesMap = new LinkedHashMap<>();
+        for (int j = 0; j < nestedFieldType.getFieldNames().length; j++) {
+            recordNameTypesMap.put(nestedFieldType.getFieldNames()[j], nestedFieldType.getFieldTypes()[j]);
+        }
+        return recordNameTypesMap;
+    }
 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/disjunction-to-join-delete-1.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/disjunction-to-join-delete-1.plan
index c65c71c..4e6eef4 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/disjunction-to-join-delete-1.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/disjunction-to-join-delete-1.plan
@@ -4,18 +4,17 @@
       -- INDEX_INSERT_DELETE  |PARTITIONED|
         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
           -- STREAM_PROJECT  |PARTITIONED|
-            -- ASSIGN  |PARTITIONED|
-              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                -- INSERT_DELETE  |PARTITIONED|
-                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                    -- MATERIALIZE  |PARTITIONED|
-                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                        -- ASSIGN  |PARTITIONED|
-                          -- STREAM_PROJECT  |PARTITIONED|
-                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                              -- BTREE_SEARCH  |PARTITIONED|
-                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                  -- STABLE_SORT [$$11(ASC)]  |PARTITIONED|
-                                    -- HASH_PARTITION_EXCHANGE [$$11]  |PARTITIONED|
-                                      -- UNNEST  |UNPARTITIONED|
-                                        -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
+            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+              -- INSERT_DELETE  |PARTITIONED|
+                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                  -- MATERIALIZE  |PARTITIONED|
+                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                      -- ASSIGN  |PARTITIONED|
+                        -- STREAM_PROJECT  |PARTITIONED|
+                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                            -- BTREE_SEARCH  |PARTITIONED|
+                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                -- STABLE_SORT [$$11(ASC)]  |PARTITIONED|
+                                  -- HASH_PARTITION_EXCHANGE [$$11]  |PARTITIONED|
+                                    -- UNNEST  |UNPARTITIONED|
+                                      -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/disjunction-to-join-delete-2.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/disjunction-to-join-delete-2.plan
index 25b3396..a96a3be 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/disjunction-to-join-delete-2.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/disjunction-to-join-delete-2.plan
@@ -4,24 +4,23 @@
       -- INDEX_INSERT_DELETE  |PARTITIONED|
         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
           -- STREAM_PROJECT  |PARTITIONED|
-            -- ASSIGN  |PARTITIONED|
-              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                -- INSERT_DELETE  |PARTITIONED|
-                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                    -- MATERIALIZE  |PARTITIONED|
-                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                        -- ASSIGN  |PARTITIONED|
-                          -- STREAM_PROJECT  |PARTITIONED|
-                            -- STREAM_SELECT  |PARTITIONED|
-                              -- STREAM_PROJECT  |PARTITIONED|
-                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                  -- BTREE_SEARCH  |PARTITIONED|
-                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                      -- STABLE_SORT [$$13(ASC)]  |PARTITIONED|
-                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                          -- STREAM_PROJECT  |PARTITIONED|
-                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                              -- BTREE_SEARCH  |PARTITIONED|
-                                                -- BROADCAST_EXCHANGE  |PARTITIONED|
-                                                  -- UNNEST  |UNPARTITIONED|
-                                                    -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
+            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+              -- INSERT_DELETE  |PARTITIONED|
+                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                  -- MATERIALIZE  |PARTITIONED|
+                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                      -- ASSIGN  |PARTITIONED|
+                        -- STREAM_PROJECT  |PARTITIONED|
+                          -- STREAM_SELECT  |PARTITIONED|
+                            -- STREAM_PROJECT  |PARTITIONED|
+                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                -- BTREE_SEARCH  |PARTITIONED|
+                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                    -- STABLE_SORT [$$13(ASC)]  |PARTITIONED|
+                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                        -- STREAM_PROJECT  |PARTITIONED|
+                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                            -- BTREE_SEARCH  |PARTITIONED|
+                                              -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                                -- UNNEST  |UNPARTITIONED|
+                                                  -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/disjunction-to-join-delete-3.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/disjunction-to-join-delete-3.plan
index 1cb56c9..1f686bd2 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/disjunction-to-join-delete-3.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/disjunction-to-join-delete-3.plan
@@ -4,23 +4,22 @@
       -- INDEX_INSERT_DELETE  |PARTITIONED|
         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
           -- STREAM_PROJECT  |PARTITIONED|
-            -- ASSIGN  |PARTITIONED|
-              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                -- INSERT_DELETE  |PARTITIONED|
-                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                    -- MATERIALIZE  |PARTITIONED|
-                      -- HASH_PARTITION_EXCHANGE [$$8]  |PARTITIONED|
-                        -- ASSIGN  |PARTITIONED|
-                          -- STREAM_PROJECT  |PARTITIONED|
-                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                              -- HYBRID_HASH_JOIN [$$11][$$9]  |PARTITIONED|
-                                -- HASH_PARTITION_EXCHANGE [$$11]  |PARTITIONED|
-                                  -- UNNEST  |UNPARTITIONED|
-                                    -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
-                                -- HASH_PARTITION_EXCHANGE [$$9]  |PARTITIONED|
-                                  -- ASSIGN  |PARTITIONED|
-                                    -- STREAM_PROJECT  |PARTITIONED|
-                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                        -- DATASOURCE_SCAN  |PARTITIONED|
-                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                            -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+              -- INSERT_DELETE  |PARTITIONED|
+                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                  -- MATERIALIZE  |PARTITIONED|
+                    -- HASH_PARTITION_EXCHANGE [$$8]  |PARTITIONED|
+                      -- ASSIGN  |PARTITIONED|
+                        -- STREAM_PROJECT  |PARTITIONED|
+                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                            -- HYBRID_HASH_JOIN [$$11][$$9]  |PARTITIONED|
+                              -- HASH_PARTITION_EXCHANGE [$$11]  |PARTITIONED|
+                                -- UNNEST  |UNPARTITIONED|
+                                  -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
+                              -- HASH_PARTITION_EXCHANGE [$$9]  |PARTITIONED|
+                                -- ASSIGN  |PARTITIONED|
+                                  -- STREAM_PROJECT  |PARTITIONED|
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      -- DATASOURCE_SCAN  |PARTITIONED|
+                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/insert-and-scan-dataset-with-index.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/insert-and-scan-dataset-with-index.plan
index 8bc296b..9623371 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/insert-and-scan-dataset-with-index.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/insert-and-scan-dataset-with-index.plan
@@ -4,17 +4,16 @@
       -- INDEX_INSERT_DELETE  |PARTITIONED|
         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
           -- STREAM_PROJECT  |PARTITIONED|
-            -- ASSIGN  |PARTITIONED|
-              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                -- INSERT_DELETE  |PARTITIONED|
-                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                    -- MATERIALIZE  |PARTITIONED|
-                      -- HASH_PARTITION_EXCHANGE [$$10]  |PARTITIONED|
-                        -- ASSIGN  |PARTITIONED|
-                          -- STREAM_PROJECT  |PARTITIONED|
+            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+              -- INSERT_DELETE  |PARTITIONED|
+                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                  -- MATERIALIZE  |PARTITIONED|
+                    -- HASH_PARTITION_EXCHANGE [$$10]  |PARTITIONED|
+                      -- ASSIGN  |PARTITIONED|
+                        -- STREAM_PROJECT  |PARTITIONED|
+                          -- ASSIGN  |PARTITIONED|
                             -- ASSIGN  |PARTITIONED|
-                              -- ASSIGN  |PARTITIONED|
-                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                  -- DATASOURCE_SCAN  |PARTITIONED|
-                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                -- DATASOURCE_SCAN  |PARTITIONED|
+                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/scan-delete-rtree-secondary-index.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/scan-delete-rtree-secondary-index.plan
index ca4a6c2..317b163 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/scan-delete-rtree-secondary-index.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/scan-delete-rtree-secondary-index.plan
@@ -17,16 +17,15 @@
           -- STREAM_PROJECT  |PARTITIONED|
             -- ASSIGN  |PARTITIONED|
               -- STREAM_PROJECT  |PARTITIONED|
-                -- ASSIGN  |PARTITIONED|
-                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                    -- INSERT_DELETE  |PARTITIONED|
-                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                        -- MATERIALIZE  |PARTITIONED|
-                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                            -- ASSIGN  |PARTITIONED|
-                              -- STREAM_PROJECT  |PARTITIONED|
-                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                  -- BTREE_SEARCH  |PARTITIONED|
-                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                      -- ASSIGN  |PARTITIONED|
-                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                  -- INSERT_DELETE  |PARTITIONED|
+                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                      -- MATERIALIZE  |PARTITIONED|
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          -- ASSIGN  |PARTITIONED|
+                            -- STREAM_PROJECT  |PARTITIONED|
+                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                -- BTREE_SEARCH  |PARTITIONED|
+                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                    -- ASSIGN  |PARTITIONED|
+                                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/scan-insert-secondary-index.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/scan-insert-secondary-index.plan
index 5806723..0ca0482 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/scan-insert-secondary-index.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/scan-insert-secondary-index.plan
@@ -4,21 +4,19 @@
       -- INDEX_INSERT_DELETE  |PARTITIONED|
         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
           -- STREAM_PROJECT  |PARTITIONED|
-            -- ASSIGN  |PARTITIONED|
-              -- STREAM_PROJECT  |PARTITIONED|
+            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+              -- INDEX_INSERT_DELETE  |PARTITIONED|
                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                  -- INDEX_INSERT_DELETE  |PARTITIONED|
+                  -- STREAM_PROJECT  |PARTITIONED|
                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                      -- ASSIGN  |PARTITIONED|
+                      -- INSERT_DELETE  |PARTITIONED|
                         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                          -- INSERT_DELETE  |PARTITIONED|
-                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          -- ASSIGN  |PARTITIONED|
+                            -- STREAM_PROJECT  |PARTITIONED|
                               -- ASSIGN  |PARTITIONED|
                                 -- STREAM_PROJECT  |PARTITIONED|
                                   -- ASSIGN  |PARTITIONED|
-                                    -- STREAM_PROJECT  |PARTITIONED|
-                                      -- ASSIGN  |PARTITIONED|
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      -- DATASOURCE_SCAN  |PARTITIONED|
                                         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                          -- DATASOURCE_SCAN  |PARTITIONED|
-                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                              -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/skip-index/skip-ngram-index-search-in-delete.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/skip-index/skip-ngram-index-search-in-delete.plan
index 31de832..c469ca8 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/skip-index/skip-ngram-index-search-in-delete.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/skip-index/skip-ngram-index-search-in-delete.plan
@@ -4,16 +4,15 @@
       -- INDEX_INSERT_DELETE  |PARTITIONED|
         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
           -- STREAM_PROJECT  |PARTITIONED|
-            -- ASSIGN  |PARTITIONED|
-              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                -- INSERT_DELETE  |PARTITIONED|
-                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                    -- MATERIALIZE  |PARTITIONED|
-                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                        -- ASSIGN  |PARTITIONED|
-                          -- STREAM_SELECT  |PARTITIONED|
-                            -- STREAM_PROJECT  |PARTITIONED|
-                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                -- DATASOURCE_SCAN  |PARTITIONED|
-                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+              -- INSERT_DELETE  |PARTITIONED|
+                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                  -- MATERIALIZE  |PARTITIONED|
+                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                      -- ASSIGN  |PARTITIONED|
+                        -- STREAM_SELECT  |PARTITIONED|
+                          -- STREAM_PROJECT  |PARTITIONED|
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              -- DATASOURCE_SCAN  |PARTITIONED|
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/skip-index/skip-rtree-index-search-in-delete.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/skip-index/skip-rtree-index-search-in-delete.plan
index 0c406e2..49569d4 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/skip-index/skip-rtree-index-search-in-delete.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/skip-index/skip-rtree-index-search-in-delete.plan
@@ -6,16 +6,15 @@
           -- STREAM_PROJECT  |PARTITIONED|
             -- ASSIGN  |PARTITIONED|
               -- STREAM_PROJECT  |PARTITIONED|
-                -- ASSIGN  |PARTITIONED|
-                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                    -- INSERT_DELETE  |PARTITIONED|
-                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                        -- MATERIALIZE  |PARTITIONED|
-                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                            -- ASSIGN  |PARTITIONED|
-                              -- STREAM_SELECT  |PARTITIONED|
-                                -- STREAM_PROJECT  |PARTITIONED|
-                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                    -- DATASOURCE_SCAN  |PARTITIONED|
-                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                  -- INSERT_DELETE  |PARTITIONED|
+                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                      -- MATERIALIZE  |PARTITIONED|
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          -- ASSIGN  |PARTITIONED|
+                            -- STREAM_SELECT  |PARTITIONED|
+                              -- STREAM_PROJECT  |PARTITIONED|
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  -- DATASOURCE_SCAN  |PARTITIONED|
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/skip-index/skip-secondary-btree-index-search-in-delete.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/skip-index/skip-secondary-btree-index-search-in-delete.plan
index 31de832..1ba839d 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/skip-index/skip-secondary-btree-index-search-in-delete.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/skip-index/skip-secondary-btree-index-search-in-delete.plan
@@ -4,16 +4,15 @@
       -- INDEX_INSERT_DELETE  |PARTITIONED|
         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
           -- STREAM_PROJECT  |PARTITIONED|
-            -- ASSIGN  |PARTITIONED|
-              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                -- INSERT_DELETE  |PARTITIONED|
-                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                    -- MATERIALIZE  |PARTITIONED|
-                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                        -- ASSIGN  |PARTITIONED|
-                          -- STREAM_SELECT  |PARTITIONED|
-                            -- STREAM_PROJECT  |PARTITIONED|
-                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                -- DATASOURCE_SCAN  |PARTITIONED|
-                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+              -- INSERT_DELETE  |PARTITIONED|
+                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                  -- MATERIALIZE  |PARTITIONED|
+                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                      -- ASSIGN  |PARTITIONED|
+                        -- STREAM_SELECT  |PARTITIONED|
+                          -- STREAM_PROJECT  |PARTITIONED|
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              -- DATASOURCE_SCAN  |PARTITIONED|
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/skip-index/skip-word-index-search-in-delete.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/skip-index/skip-word-index-search-in-delete.plan
index 31de832..1ba839d 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/skip-index/skip-word-index-search-in-delete.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/skip-index/skip-word-index-search-in-delete.plan
@@ -4,16 +4,15 @@
       -- INDEX_INSERT_DELETE  |PARTITIONED|
         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
           -- STREAM_PROJECT  |PARTITIONED|
-            -- ASSIGN  |PARTITIONED|
-              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                -- INSERT_DELETE  |PARTITIONED|
-                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                    -- MATERIALIZE  |PARTITIONED|
-                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                        -- ASSIGN  |PARTITIONED|
-                          -- STREAM_SELECT  |PARTITIONED|
-                            -- STREAM_PROJECT  |PARTITIONED|
-                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                -- DATASOURCE_SCAN  |PARTITIONED|
-                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+              -- INSERT_DELETE  |PARTITIONED|
+                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                  -- MATERIALIZE  |PARTITIONED|
+                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                      -- ASSIGN  |PARTITIONED|
+                        -- STREAM_SELECT  |PARTITIONED|
+                          -- STREAM_PROJECT  |PARTITIONED|
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              -- DATASOURCE_SCAN  |PARTITIONED|
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/open-index-enforced/type-checking/enforced-type-delete/enforced-type-delete.1.ddl.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/open-index-enforced/type-checking/enforced-type-delete/enforced-type-delete.1.ddl.aql
new file mode 100644
index 0000000..92d8f7d
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/open-index-enforced/type-checking/enforced-type-delete/enforced-type-delete.1.ddl.aql
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description  : Delete from enforced index and validate deletion
+* Expected Res : Success
+* Date         : 22 Aug 2016
+*/
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+create type OrderOpenType as open {
+  o_orderkey: int64
+}
+
+create dataset OrdersOpen(OrderOpenType)
+primary key o_orderkey;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/open-index-enforced/type-checking/enforced-type-delete/enforced-type-delete.2.update.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/open-index-enforced/type-checking/enforced-type-delete/enforced-type-delete.2.update.aql
new file mode 100644
index 0000000..ad05499
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/open-index-enforced/type-checking/enforced-type-delete/enforced-type-delete.2.update.aql
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description  : Delete from enforced index and validate deletion
+* Expected Res : Success
+* Date         : 22 Aug 2016
+*/
+use dataverse test;
+
+insert into dataset OrdersOpen (
+  {"o_orderkey": 1,
+  "o_custkey": 1}
+)
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/open-index-enforced/type-checking/enforced-type-delete/enforced-type-delete.3.ddl.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/open-index-enforced/type-checking/enforced-type-delete/enforced-type-delete.3.ddl.aql
new file mode 100644
index 0000000..af5b71d
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/open-index-enforced/type-checking/enforced-type-delete/enforced-type-delete.3.ddl.aql
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description  : Delete from enforced index and validate deletion
+* Expected Res : Success
+* Date         : 22 Aug 2016
+*/
+use dataverse test;
+
+create index idx_Orders_Custkey on
+OrdersOpen(o_custkey:int32?) enforced;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/open-index-enforced/type-checking/enforced-type-delete/enforced-type-delete.4.update.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/open-index-enforced/type-checking/enforced-type-delete/enforced-type-delete.4.update.aql
new file mode 100644
index 0000000..588b9e2
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/open-index-enforced/type-checking/enforced-type-delete/enforced-type-delete.4.update.aql
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description  : Delete from enforced index and validate deletion
+* Expected Res : Success
+* Date         : 22 Aug 2016
+*/
+use dataverse test;
+
+delete $v from dataset OrdersOpen
+where $v. o_orderkey = 1;
+
+insert into dataset OrdersOpen (
+  {"o_orderkey": 1,
+   "o_custkey": 2}
+);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/open-index-enforced/type-checking/enforced-type-delete/enforced-type-delete.5.query.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/open-index-enforced/type-checking/enforced-type-delete/enforced-type-delete.5.query.aql
new file mode 100644
index 0000000..e6ac100
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/open-index-enforced/type-checking/enforced-type-delete/enforced-type-delete.5.query.aql
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description  : Delete from enforced index and validate deletion
+* Expected Res : Success
+* Date         : 22 Aug 2016
+*/
+use dataverse test;
+
+let $l :=  for $o in dataset('OrdersOpen')
+where $o.o_custkey >=-1
+return $o.o_orderKey
+return count($l);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/open-index-enforced/type-checking/enforced-type-upsert/enforced-type-upsert.1.ddl.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/open-index-enforced/type-checking/enforced-type-upsert/enforced-type-upsert.1.ddl.aql
new file mode 100644
index 0000000..f710221
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/open-index-enforced/type-checking/enforced-type-upsert/enforced-type-upsert.1.ddl.aql
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description  : Upsert from enforced index and validate result
+* Expected Res : Success
+* Date         : 22 Aug 2016
+*/
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+create type OrderOpenType as open {
+  o_orderkey: int64
+}
+
+create dataset OrdersOpen(OrderOpenType)
+primary key o_orderkey;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/open-index-enforced/type-checking/enforced-type-upsert/enforced-type-upsert.2.update.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/open-index-enforced/type-checking/enforced-type-upsert/enforced-type-upsert.2.update.aql
new file mode 100644
index 0000000..1e4c7cd
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/open-index-enforced/type-checking/enforced-type-upsert/enforced-type-upsert.2.update.aql
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description  : Upsert from enforced index and validate result
+* Expected Res : Success
+* Date         : 22 Aug 2016
+*/
+use dataverse test;
+
+insert into dataset OrdersOpen (
+  {"o_orderkey": 1,
+   "o_custkey": 1}
+)
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/open-index-enforced/type-checking/enforced-type-upsert/enforced-type-upsert.3.ddl.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/open-index-enforced/type-checking/enforced-type-upsert/enforced-type-upsert.3.ddl.aql
new file mode 100644
index 0000000..0cf0fab
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/open-index-enforced/type-checking/enforced-type-upsert/enforced-type-upsert.3.ddl.aql
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description  : Upsert from enforced index and validate result
+* Expected Res : Success
+* Date         : 22 Aug 2016
+*/
+use dataverse test;
+
+create index idx_Orders_Custkey on
+OrdersOpen(o_custkey:int32?) enforced;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/open-index-enforced/type-checking/enforced-type-upsert/enforced-type-upsert.4.update.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/open-index-enforced/type-checking/enforced-type-upsert/enforced-type-upsert.4.update.aql
new file mode 100644
index 0000000..b7c6ec8
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/open-index-enforced/type-checking/enforced-type-upsert/enforced-type-upsert.4.update.aql
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description  : Upsert from enforced index and validate result
+* Expected Res : Success
+* Date         : 22 Aug 2016
+*/
+use dataverse test;
+
+upsert into dataset OrdersOpen (
+  {"o_orderkey": 1,
+   "o_custkey": 2}
+);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/open-index-enforced/type-checking/enforced-type-upsert/enforced-type-upsert.5.query.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/open-index-enforced/type-checking/enforced-type-upsert/enforced-type-upsert.5.query.aql
new file mode 100644
index 0000000..7e5ef7c
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/open-index-enforced/type-checking/enforced-type-upsert/enforced-type-upsert.5.query.aql
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description  : Upsert from enforced index and validate result
+* Expected Res : Success
+* Date         : 22 Aug 2016
+*/
+use dataverse test;
+
+let $l :=  for $o in dataset('OrdersOpen')
+where $o.o_custkey >=-1
+return $o.o_orderKey
+return count($l);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/open-index-enforced/type-checking/enforced-type-delete/enforced-type-delete.5.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/open-index-enforced/type-checking/enforced-type-delete/enforced-type-delete.5.adm
new file mode 100644
index 0000000..56a6051
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/open-index-enforced/type-checking/enforced-type-delete/enforced-type-delete.5.adm
@@ -0,0 +1 @@
+1
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/open-index-enforced/type-checking/enforced-type-upsert/enforced-type-upsert.5.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/open-index-enforced/type-checking/enforced-type-upsert/enforced-type-upsert.5.adm
new file mode 100644
index 0000000..56a6051
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/open-index-enforced/type-checking/enforced-type-upsert/enforced-type-upsert.5.adm
@@ -0,0 +1 @@
+1
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
index b00d995..103d7bd 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
@@ -3511,6 +3511,18 @@
         </compilation-unit>
       </test-case>
     </test-group>
+    <test-group name="open-index-enforced/type-checking">
+      <test-case FilePath="open-index-enforced/type-checking">
+        <compilation-unit name="enforced-type-delete">
+          <output-dir compare="Text">enforced-type-delete</output-dir>
+        </compilation-unit>
+      </test-case>
+      <test-case FilePath="open-index-enforced/type-checking">
+        <compilation-unit name="enforced-type-upsert">
+          <output-dir compare="Text">enforced-type-upsert</output-dir>
+        </compilation-unit>
+      </test-case>
+    </test-group>
   </test-group>
   <test-group name="nested-open-index">
     <test-group name="nested-open-index/index-join">
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Index.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Index.java
index bdf9ed0..f33a2b6 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Index.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Index.java
@@ -37,6 +37,8 @@
 public class Index implements IMetadataEntity<Index>, Comparable<Index> {
 
     private static final long serialVersionUID = 1L;
+    public static final int RECORD_INDICATOR = 0;
+    public static final int META_INDICATOR = 1;
 
     private final String dataverseName;
     // Enforced to be unique within a dataverse.
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/base/TypeCastUtils.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/base/TypeCastUtils.java
index 04883e4..4bdcbba 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/base/TypeCastUtils.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/base/TypeCastUtils.java
@@ -19,7 +19,11 @@
 
 package org.apache.asterix.om.typecomputer.base;
 
+import org.apache.asterix.om.typecomputer.impl.TypeComputeUtils;
+import org.apache.asterix.om.types.ATypeTag;
 import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.types.hierachy.ATypeHierarchy;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
 
 public class TypeCastUtils {
@@ -27,14 +31,20 @@
     private TypeCastUtils() {
     }
 
-    public static boolean setRequiredAndInputTypes(AbstractFunctionCallExpression expr, IAType requiredRecordType,
-            IAType inputRecordType) {
+    public static boolean setRequiredAndInputTypes(AbstractFunctionCallExpression expr, IAType requiredType,
+            IAType inputType) throws AlgebricksException {
         boolean changed = false;
         Object[] opaqueParameters = expr.getOpaqueParameters();
         if (opaqueParameters == null) {
             opaqueParameters = new Object[2];
-            opaqueParameters[0] = requiredRecordType;
-            opaqueParameters[1] = inputRecordType;
+            opaqueParameters[0] = requiredType;
+            opaqueParameters[1] = inputType;
+            if (TypeComputeUtils.getActualType(inputType).getTypeTag() != ATypeTag.ANY
+                    && TypeComputeUtils.getActualType(requiredType).getTypeTag() != ATypeTag.ANY
+                    && !ATypeHierarchy.isCompatible(requiredType.getTypeTag(),
+                            TypeComputeUtils.getActualType(inputType).getTypeTag())) {
+                throw new AlgebricksException(inputType + " can't be casted to " + requiredType);
+            }
             expr.setOpaqueParameters(opaqueParameters);
             changed = true;
         }
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/CastTypeComputer.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/CastTypeComputer.java
index 64f85cb..e4a751b 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/CastTypeComputer.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/CastTypeComputer.java
@@ -19,28 +19,22 @@
 
 package org.apache.asterix.om.typecomputer.impl;
 
-import org.apache.asterix.om.typecomputer.base.IResultTypeComputer;
+import org.apache.asterix.om.typecomputer.base.AbstractResultTypeComputer;
 import org.apache.asterix.om.typecomputer.base.TypeCastUtils;
 import org.apache.asterix.om.types.IAType;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
-import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
-import org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
-import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
 
 /**
  * The type computer for the cast-list function
- *
- * @author yingyib
  */
-public class CastTypeComputer implements IResultTypeComputer {
+public class CastTypeComputer extends AbstractResultTypeComputer {
 
     public static final CastTypeComputer INSTANCE = new CastTypeComputer();
 
     @Override
-    public IAType computeType(ILogicalExpression expression, IVariableTypeEnvironment env,
-            IMetadataProvider<?, ?> metadataProvider) throws AlgebricksException {
-        ScalarFunctionCallExpression funcExpr = (ScalarFunctionCallExpression) expression;
-        return TypeCastUtils.getRequiredType(funcExpr);
+    protected IAType getResultType(ILogicalExpression expr, IAType... strippedInputTypes) throws AlgebricksException {
+        return TypeCastUtils.getRequiredType((AbstractFunctionCallExpression) expr);
     }
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IndexInsertDeleteUpsertOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IndexInsertDeleteUpsertOperator.java
index d47492c..02765f1 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IndexInsertDeleteUpsertOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IndexInsertDeleteUpsertOperator.java
@@ -173,7 +173,7 @@
         return prevSecondaryKeyExprs;
     }
 
-    public void setPrevSecondaryKeyExprs(List<Mutable<ILogicalExpression>> prevSecondaryKeyExprs) {
+    public void setBeforeOpSecondaryKeyExprs(List<Mutable<ILogicalExpression>> prevSecondaryKeyExprs) {
         this.prevSecondaryKeyExprs = prevSecondaryKeyExprs;
     }
 
@@ -181,7 +181,8 @@
         return prevAdditionalFilteringExpression;
     }
 
-    public void setPrevAdditionalFilteringExpression(Mutable<ILogicalExpression> prevAdditionalFilteringExpression) {
+    public void
+            setBeforeOpAdditionalFilteringExpression(Mutable<ILogicalExpression> prevAdditionalFilteringExpression) {
         this.prevAdditionalFilteringExpression = prevAdditionalFilteringExpression;
     }
 
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteUpsertOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteUpsertOperator.java
index 7d6c299..5dc327a 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteUpsertOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteUpsertOperator.java
@@ -216,7 +216,7 @@
         return additionalFilteringExpressions;
     }
 
-    public LogicalVariable getPrevRecordVar() {
+    public LogicalVariable getBeforeOpRecordVar() {
         return prevRecordVar;
     }
 
@@ -228,7 +228,7 @@
         prevRecordType = recordType;
     }
 
-    public LogicalVariable getPrevFilterVar() {
+    public LogicalVariable getBeforeOpFilterVar() {
         return prevFilterVar;
     }
 
@@ -244,7 +244,7 @@
         this.prevFilterType = prevFilterType;
     }
 
-    public List<LogicalVariable> getPrevAdditionalNonFilteringVars() {
+    public List<LogicalVariable> getBeforeOpAdditionalNonFilteringVars() {
         return prevAdditionalNonFilteringVars;
     }
 
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
index e966406..0d50dc2 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
@@ -46,12 +46,12 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestMapOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
@@ -68,8 +68,8 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.HashPartitionExchangePOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.HashPartitionMergeExchangePOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.physical.RangePartitionMergeExchangePOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.RangePartitionExchangePOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.RangePartitionMergeExchangePOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.SortMergeExchangePOperator;
 import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn;
 import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
@@ -436,6 +436,9 @@
 
     @Override
     public Void visitReplicateOperator(ReplicateOperator op, Void arg) throws AlgebricksException {
+        for (Mutable<ILogicalOperator> outputOp : op.getOutputs()) {
+            VariableUtilities.getUsedVariables(outputOp.getValue(), usedVariables);
+        }
         return null;
     }
 
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
index fe8e044..96d06c4 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
@@ -41,6 +41,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator.Kind;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestMapOperator;
@@ -63,7 +64,6 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator.Kind;
 import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionVisitor;
 import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
 
@@ -150,7 +150,8 @@
 
     @Override
     public Void visitInnerJoinOperator(InnerJoinOperator op, Integer indent) throws AlgebricksException {
-        addIndent(indent).append("join (").append(op.getCondition().getValue().accept(exprVisitor, indent)).append(")");
+        addIndent(indent).append("join (").append(op.getCondition().getValue().accept(exprVisitor, indent)).
+        append(")");
         return null;
     }
 
@@ -381,9 +382,10 @@
         pprintExprList(op.getPrimaryKeyExpressions(), indent);
         if (op.getOperation() == Kind.UPSERT) {
             buffer.append(
-                    " out: ([record-before-upsert:" + op.getPrevRecordVar()
-                            + ((op.getPrevAdditionalNonFilteringVars() != null)
-                                    ? (", additional-before-upsert: " + op.getPrevAdditionalNonFilteringVars()) : "")
+                    " out: ([record-before-upsert:" + op.getBeforeOpRecordVar()
+                            + ((op.getBeforeOpAdditionalNonFilteringVars() != null)
+                                    ? (", additional-before-upsert: " + op.getBeforeOpAdditionalNonFilteringVars())
+                                    : "")
                             + "]) ");
         }
         if (op.isBulkload()) {