Fix Indexing on Open fields and Meta fields

This change fix the following cases:
1. Build a secondary index on a meta field: success
2. Build an open index on a meta field: failure
3. Build a secondary index on an open field in record part: success

Testing ingestion and querying are working correctly for these cases.

Change-Id: I6195149940f150250a65f2515e9ac9d6de2a33f9
Reviewed-on: https://asterix-gerrit.ics.uci.edu/930
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java
index 6205962..f5cfa2b 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java
@@ -102,17 +102,30 @@
         InsertDeleteUpsertOperator insertOp = (InsertDeleteUpsertOperator) op1;
         boolean isBulkload = insertOp.isBulkload();
         ILogicalExpression recordExpr = insertOp.getPayloadExpression().getValue();
+        List<Mutable<ILogicalExpression>> metaExprs = insertOp.getAdditionalNonFilteringExpressions();
         LogicalVariable recordVar = null;
+        LogicalVariable metaVar = null;
         List<LogicalVariable> usedRecordVars = new ArrayList<>();
         /** assume the payload is always a single variable expression */
         recordExpr.getUsedVariables(usedRecordVars);
         if (usedRecordVars.size() == 1) {
             recordVar = usedRecordVars.get(0);
         }
+        if (metaExprs != null) {
+            List<LogicalVariable> metaVars = new ArrayList<>();
+            for (Mutable<ILogicalExpression> expr : metaExprs) {
+                expr.getValue().getUsedVariables(metaVars);
+            }
+            if (metaVars.size() > 1) {
+                throw new AlgebricksException(
+                        "Number of meta fields can't be more than 1. Number of meta fields found = " + metaVars.size());
+            }
+            metaVar = metaVars.get(0);
+        }
 
         /**
-         * op2 is the assign operator which extract primary keys from the record
-         * variable
+         * op2 is the assign operator which extracts primary keys from the input
+         * variables (record or meta)
          */
         AbstractLogicalOperator op2 = (AbstractLogicalOperator) op1.getInputs().get(0).getValue();
 
@@ -169,6 +182,11 @@
             throw new AlgebricksException("Only record types can be indexed.");
         }
         ARecordType recType = (ARecordType) itemType;
+        // meta type
+        ARecordType metaType = null;
+        if (dataset.hasMetaPart()) {
+            metaType = (ARecordType) mp.findType(dataset.getMetaItemTypeDataverseName(), dataset.getMetaItemTypeName());
+        }
         // recType might be replaced with enforced record type and we want to keep a reference to the original record
         // type
         ARecordType originalRecType = recType;
@@ -228,7 +246,7 @@
         if (insertOp.getOperation() == Kind.INSERT || insertOp.getOperation() == Kind.UPSERT) {
             try {
                 DatasetDataSource ds = (DatasetDataSource) (insertOp.getDataSource());
-                ARecordType insertRecType = (ARecordType) ds.getSchemaTypes()[ds.getSchemaTypes().length - 1];
+                ARecordType insertRecType = (ARecordType) ds.getItemType();
                 // A new variable which represents the casted record
                 LogicalVariable castedRecVar = context.newVar();
                 // create the expected record type = the original + the optional open field
@@ -281,11 +299,19 @@
             List<List<String>> secondaryKeyFields = index.getKeyFieldNames();
             List<IAType> secondaryKeyTypes = index.getKeyFieldTypes();
             List<LogicalVariable> secondaryKeyVars = new ArrayList<LogicalVariable>();
+            List<Integer> indicators = index.getKeyFieldSourceIndicators();
             List<Mutable<ILogicalExpression>> expressions = new ArrayList<Mutable<ILogicalExpression>>();
             List<Mutable<ILogicalExpression>> secondaryExpressions = new ArrayList<Mutable<ILogicalExpression>>();
 
-            for (List<String> secondaryKey : secondaryKeyFields) {
-                prepareVarAndExpression(secondaryKey, recType.getFieldNames(), enforcedRecordVar, expressions,
+            for (int i = 0; i < secondaryKeyFields.size(); i++) {
+                List<String> secondaryKey = secondaryKeyFields.get(i);
+                ARecordType sourceType = recType;
+                LogicalVariable sourceVar = enforcedRecordVar;
+                if (dataset.hasMetaPart()) {
+                    sourceType = indicators.get(i).intValue() == 0 ? recType : metaType;
+                    sourceVar = indicators.get(i).intValue() == 0 ? enforcedRecordVar : metaVar;
+                }
+                prepareVarAndExpression(secondaryKey, sourceType.getFieldNames(), sourceVar, expressions,
                         secondaryKeyVars, context);
             }
             // Used with upsert operation
@@ -298,14 +324,18 @@
                 prevSecondaryKeyVars = new ArrayList<LogicalVariable>();
                 prevExpressions = new ArrayList<Mutable<ILogicalExpression>>();
                 prevSecondaryExpressions = new ArrayList<Mutable<ILogicalExpression>>();
-                for (List<String> secondaryKey : secondaryKeyFields) {
-                    prepareVarAndExpression(secondaryKey, originalRecType.getFieldNames(), insertOp.getPrevRecordVar(),
+                for (int i = 0; i < secondaryKeyFields.size(); i++) {
+                    List<String> secondaryKey = secondaryKeyFields.get(i);
+                    prepareVarAndExpression(secondaryKey,
+                            (indicators.get(i).intValue() == 0) ? originalRecType.getFieldNames()
+                                    : metaType.getFieldNames(),
+                            (indicators.get(i).intValue() == 0) ? insertOp.getPrevRecordVar()
+                                    : insertOp.getPrevAdditionalNonFilteringVars().get(0),
                             prevExpressions, prevSecondaryKeyVars, context);
                 }
                 prevSecondaryKeyAssign = new AssignOperator(prevSecondaryKeyVars, prevExpressions);
             }
             AssignOperator assign = new AssignOperator(secondaryKeyVars, expressions);
-
             AssignOperator topAssign = assign;
             if (insertOp.getOperation() == Kind.UPSERT) {
                 prevSecondaryKeyAssign.getInputs().add(new MutableObject<ILogicalOperator>(topAssign));
@@ -402,7 +432,9 @@
 
                     IndexInsertDeleteUpsertOperator indexUpdate = new IndexInsertDeleteUpsertOperator(dataSourceIndex,
                             insertOp.getPrimaryKeyExpressions(), tokenizeKeyExprs, filterExpression,
-                            insertOp.getOperation(), insertOp.isBulkload());
+                            insertOp.getOperation(), insertOp.isBulkload(),
+                            insertOp.getAdditionalNonFilteringExpressions() == null ? 0
+                                    : insertOp.getAdditionalNonFilteringExpressions().size());
                     indexUpdate.setAdditionalFilteringExpressions(filteringExpressions);
                     indexUpdate.getInputs().add(new MutableObject<ILogicalOperator>(tokenUpdate));
 
@@ -415,7 +447,9 @@
                     // When TokenizeOperator is not needed
                     IndexInsertDeleteUpsertOperator indexUpdate = new IndexInsertDeleteUpsertOperator(dataSourceIndex,
                             insertOp.getPrimaryKeyExpressions(), secondaryExpressions, filterExpression,
-                            insertOp.getOperation(), insertOp.isBulkload());
+                            insertOp.getOperation(), insertOp.isBulkload(),
+                            insertOp.getAdditionalNonFilteringExpressions() == null ? 0
+                                    : insertOp.getAdditionalNonFilteringExpressions().size());
 
                     indexUpdate.setAdditionalFilteringExpressions(filteringExpressions);
                     // We add the necessary expressions for upsert
@@ -482,7 +516,7 @@
                 // We do something similar for previous key if the operation is an upsert
                 if (insertOp.getOperation() == Kind.UPSERT) {
                     List<LogicalVariable> originalKeyVarList = new ArrayList<LogicalVariable>();
-                    List<Mutable<ILogicalExpression>> originalKeyExprList = new ArrayList<Mutable<ILogicalExpression>>();
+                    List<Mutable<ILogicalExpression>> originalKeyExprList = new ArrayList<>();
                     // we don't do any filtering since nulls are expected here and there
                     for (int i = 0; i < numKeys; i++) {
                         LogicalVariable keyVar = context.newVar();
@@ -522,7 +556,9 @@
                 AqlIndex dataSourceIndex = new AqlIndex(index, dataverseName, datasetName, mp);
                 IndexInsertDeleteUpsertOperator indexUpdate = new IndexInsertDeleteUpsertOperator(dataSourceIndex,
                         insertOp.getPrimaryKeyExpressions(), secondaryExpressions, filterExpression,
-                        insertOp.getOperation(), insertOp.isBulkload());
+                        insertOp.getOperation(), insertOp.isBulkload(),
+                        insertOp.getAdditionalNonFilteringExpressions() == null ? 0
+                                : insertOp.getAdditionalNonFilteringExpressions().size());
                 indexUpdate.setAdditionalFilteringExpressions(filteringExpressions);
                 if (insertOp.getOperation() == Kind.UPSERT) {
                     // set old secondary key expressions
@@ -573,6 +609,9 @@
             if (!index.isSecondaryIndex() || !index.isEnforcingKeyFileds()) {
                 continue;
             }
+            if (index.hasMetaFields()) {
+                throw new AlgebricksException("Indexing an open field is only supported on the record part");
+            }
             for (int i = 0; i < index.getKeyFieldNames().size(); i++) {
                 Stack<Pair<ARecordType, String>> nestedTypeStack = new Stack<Pair<ARecordType, String>>();
                 List<String> splits = index.getKeyFieldNames().get(i);
@@ -609,14 +648,10 @@
                     enforcedType = new ARecordType(bridgeName,
                             ArrayUtils.addAll(parent.getFieldNames(), enforcedType.getTypeName()), parentFieldTypes,
                             true);
-
                 } else {
                     //Schema is closed all the way to the field
                     //enforced fields are either null or strongly typed
-                    LinkedHashMap<String, IAType> recordNameTypesMap = new LinkedHashMap<String, IAType>();
-                    for (j = 0; j < nestedFieldType.getFieldNames().length; j++) {
-                        recordNameTypesMap.put(nestedFieldType.getFieldNames()[j], nestedFieldType.getFieldTypes()[j]);
-                    }
+                    LinkedHashMap<String, IAType> recordNameTypesMap = createRecordNameTypeMap(nestedFieldType);
                     // if a an enforced field already exists and the type is correct
                     IAType enforcedFieldType = recordNameTypesMap.get(splits.get(splits.size() - 1));
                     if (enforcedFieldType != null && enforcedFieldType.getTypeTag() == ATypeTag.UNION
@@ -654,6 +689,14 @@
         return enforcedType;
     }
 
+    private static LinkedHashMap<String, IAType> createRecordNameTypeMap(ARecordType nestedFieldType) {
+        LinkedHashMap<String, IAType> recordNameTypesMap = new LinkedHashMap<>();
+        for (int j = 0; j < nestedFieldType.getFieldNames().length; j++) {
+            recordNameTypesMap.put(nestedFieldType.getFieldNames()[j], nestedFieldType.getFieldTypes()[j]);
+        }
+        return recordNameTypesMap;
+    }
+
     /***
      * This method takes a list of {fields}: a subset of {recordFields}, the original record variable
      * and populate expressions with expressions which evaluate to those fields (using field access functions) and
@@ -675,8 +718,7 @@
             List<Mutable<ILogicalExpression>> expressions, List<LogicalVariable> vars, IOptimizationContext context)
             throws AlgebricksException {
         // Get a reference to the record variable
-        Mutable<ILogicalExpression> varRef = new MutableObject<ILogicalExpression>(
-                new VariableReferenceExpression(recordVar));
+        Mutable<ILogicalExpression> varRef = new MutableObject<>(new VariableReferenceExpression(recordVar));
         // Get the desired field position
         int pos = -1;
         if (fields.size() == 1) {
@@ -730,12 +772,12 @@
             if (!NonTaggedFormatUtil.isOptional(secondaryKeyType) && !forceFilter) {
                 continue;
             }
-            ScalarFunctionCallExpression isNullFuncExpr = new ScalarFunctionCallExpression(
+            ScalarFunctionCallExpression isUnknownFuncExpr = new ScalarFunctionCallExpression(
                     FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.IS_UNKOWN),
                     new MutableObject<ILogicalExpression>(new VariableReferenceExpression(secondaryKeyVar)));
             ScalarFunctionCallExpression notFuncExpr = new ScalarFunctionCallExpression(
                     FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.NOT),
-                    new MutableObject<ILogicalExpression>(isNullFuncExpr));
+                    new MutableObject<ILogicalExpression>(isUnknownFuncExpr));
             filterExpressions.add(new MutableObject<ILogicalExpression>(notFuncExpr));
         }
         // No nullable secondary keys.
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AbstractIntroduceAccessMethodRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AbstractIntroduceAccessMethodRule.java
index d17b663..ed1803d 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AbstractIntroduceAccessMethodRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AbstractIntroduceAccessMethodRule.java
@@ -108,7 +108,7 @@
 
     protected void fillSubTreeIndexExprs(OptimizableOperatorSubTree subTree,
             Map<IAccessMethod, AccessMethodAnalysisContext> analyzedAMs, IOptimizationContext context)
-                    throws AlgebricksException {
+            throws AlgebricksException {
         Iterator<Map.Entry<IAccessMethod, AccessMethodAnalysisContext>> amIt = analyzedAMs.entrySet().iterator();
         // Check applicability of indexes by access method type.
         while (amIt.hasNext()) {
@@ -257,7 +257,7 @@
                                 @Override
                                 public Object getVarType(LogicalVariable var, List<LogicalVariable> nonNullVariables,
                                         List<List<LogicalVariable>> correlatedNullableVariableLists)
-                                                throws AlgebricksException {
+                                        throws AlgebricksException {
                                     if (var.equals(optFuncExpr.getSourceVar(exprAndVarIdx.second))) {
                                         return keyType;
                                     }
@@ -435,7 +435,7 @@
     protected boolean fillIndexExprs(List<Index> datasetIndexes, List<String> fieldName, IAType fieldType,
             IOptimizableFuncExpr optFuncExpr, int matchedFuncExprIndex, int varIdx,
             OptimizableOperatorSubTree matchedSubTree, AccessMethodAnalysisContext analysisCtx)
-                    throws AlgebricksException {
+            throws AlgebricksException {
         List<Index> indexCandidates = new ArrayList<Index>();
         // Add an index to the candidates if one of the indexed fields is
         // fieldName
@@ -462,9 +462,18 @@
             IOptimizationContext context) throws AlgebricksException {
         int optFuncExprIndex = 0;
         List<Index> datasetIndexes = new ArrayList<Index>();
+        LogicalVariable datasetMetaVar = null;
+        LogicalVariable datasetRecordVar = null;
         if (subTree.dataSourceType != DataSourceType.COLLECTION_SCAN) {
             datasetIndexes = metadataProvider.getDatasetIndexes(subTree.dataset.getDataverseName(),
                     subTree.dataset.getDatasetName());
+            List<LogicalVariable> datasetVars = subTree.getDataSourceVariables();
+            if (subTree.dataset.hasMetaPart()) {
+                datasetMetaVar = datasetVars.get(datasetVars.size() - 1);
+                datasetRecordVar = datasetVars.get(datasetVars.size() - 2);
+            } else {
+                datasetRecordVar = datasetVars.get(datasetVars.size() - 1);
+            }
         }
         for (IOptimizableFuncExpr optFuncExpr : analysisCtx.matchedFuncExprs) {
             // Try to match variables from optFuncExpr to assigns or unnests.
@@ -472,73 +481,11 @@
                     .size(); assignOrUnnestIndex++) {
                 AbstractLogicalOperator op = subTree.assignsAndUnnests.get(assignOrUnnestIndex);
                 if (op.getOperatorTag() == LogicalOperatorTag.ASSIGN) {
-                    AssignOperator assignOp = (AssignOperator) op;
-                    List<LogicalVariable> varList = assignOp.getVariables();
-                    for (int varIndex = 0; varIndex < varList.size(); varIndex++) {
-                        LogicalVariable var = varList.get(varIndex);
-                        int optVarIndex = optFuncExpr.findLogicalVar(var);
-                        // No matching var in optFuncExpr.
-                        if (optVarIndex == -1) {
-                            continue;
-                        }
-                        // At this point we have matched the optimizable func
-                        // expr at optFuncExprIndex to an assigned variable.
-                        // Remember matching subtree.
-                        optFuncExpr.setOptimizableSubTree(optVarIndex, subTree);
-                        List<String> fieldName = getFieldNameFromSubTree(optFuncExpr, subTree, assignOrUnnestIndex,
-                                varIndex, subTree.recordType, optVarIndex,
-                                optFuncExpr.getFuncExpr().getArguments().get(optVarIndex).getValue());
-                        if (fieldName == null) {
-                            continue;
-                        }
-                        IAType fieldType = (IAType) context.getOutputTypeEnvironment(assignOp)
-                                .getType(optFuncExpr.getLogicalExpr(optVarIndex));
-                        // Set the fieldName in the corresponding matched
-                        // function expression.
-                        optFuncExpr.setFieldName(optVarIndex, fieldName);
-                        optFuncExpr.setFieldType(optVarIndex, fieldType);
-
-                        setTypeTag(context, subTree, optFuncExpr, optVarIndex);
-                        if (subTree.hasDataSource()) {
-                            fillIndexExprs(datasetIndexes, fieldName, fieldType, optFuncExpr, optFuncExprIndex,
-                                    optVarIndex, subTree, analysisCtx);
-                        }
-                    }
+                    analyzeAssignOp((AssignOperator) op, optFuncExpr, subTree, assignOrUnnestIndex, datasetRecordVar,
+                            datasetMetaVar, context, datasetIndexes, optFuncExprIndex, analysisCtx);
                 } else {
-                    UnnestOperator unnestOp = (UnnestOperator) op;
-                    LogicalVariable var = unnestOp.getVariable();
-                    int funcVarIndex = optFuncExpr.findLogicalVar(var);
-                    // No matching var in optFuncExpr.
-                    if (funcVarIndex == -1) {
-                        continue;
-                    }
-                    // At this point we have matched the optimizable func expr
-                    // at optFuncExprIndex to an unnest variable.
-                    // Remember matching subtree.
-                    optFuncExpr.setOptimizableSubTree(funcVarIndex, subTree);
-                    List<String> fieldName = null;
-                    if (subTree.dataSourceType == DataSourceType.COLLECTION_SCAN) {
-                        optFuncExpr.setLogicalExpr(funcVarIndex, new VariableReferenceExpression(var));
-                    } else {
-                        fieldName = getFieldNameFromSubTree(optFuncExpr, subTree, assignOrUnnestIndex, 0,
-                                subTree.recordType, funcVarIndex,
-                                optFuncExpr.getFuncExpr().getArguments().get(funcVarIndex).getValue());
-                        if (fieldName == null) {
-                            continue;
-                        }
-                    }
-                    IAType fieldType = (IAType) context.getOutputTypeEnvironment(unnestOp)
-                            .getType(optFuncExpr.getLogicalExpr(funcVarIndex));
-                    // Set the fieldName in the corresponding matched function
-                    // expression.
-                    optFuncExpr.setFieldName(funcVarIndex, fieldName);
-                    optFuncExpr.setFieldType(funcVarIndex, fieldType);
-
-                    setTypeTag(context, subTree, optFuncExpr, funcVarIndex);
-                    if (subTree.hasDataSource()) {
-                        fillIndexExprs(datasetIndexes, fieldName, fieldType, optFuncExpr, optFuncExprIndex,
-                                funcVarIndex, subTree, analysisCtx);
-                    }
+                    analyzeUnnestOp((UnnestOperator) op, optFuncExpr, subTree, assignOrUnnestIndex, datasetRecordVar,
+                            datasetMetaVar, context, datasetIndexes, optFuncExprIndex, analysisCtx);
                 }
             }
 
@@ -567,10 +514,88 @@
         }
     }
 
+    private void analyzeUnnestOp(UnnestOperator unnestOp, IOptimizableFuncExpr optFuncExpr,
+            OptimizableOperatorSubTree subTree, int assignOrUnnestIndex, LogicalVariable datasetRecordVar,
+            LogicalVariable datasetMetaVar, IOptimizationContext context, List<Index> datasetIndexes,
+            int optFuncExprIndex, AccessMethodAnalysisContext analysisCtx) throws AlgebricksException {
+        LogicalVariable var = unnestOp.getVariable();
+        int funcVarIndex = optFuncExpr.findLogicalVar(var);
+        // No matching var in optFuncExpr.
+        if (funcVarIndex == -1) {
+            return;
+        }
+        // At this point we have matched the optimizable func expr
+        // at optFuncExprIndex to an unnest variable.
+        // Remember matching subtree.
+        optFuncExpr.setOptimizableSubTree(funcVarIndex, subTree);
+        List<String> fieldName = null;
+        if (subTree.dataSourceType == DataSourceType.COLLECTION_SCAN) {
+            optFuncExpr.setLogicalExpr(funcVarIndex, new VariableReferenceExpression(var));
+        } else {
+            fieldName = getFieldNameFromSubTree(optFuncExpr, subTree, assignOrUnnestIndex, 0, subTree.recordType,
+                    funcVarIndex, optFuncExpr.getFuncExpr().getArguments().get(funcVarIndex).getValue(),
+                    datasetRecordVar, subTree.metaRecordType, datasetMetaVar);
+            if (fieldName == null) {
+                return;
+            }
+        }
+        IAType fieldType = (IAType) context.getOutputTypeEnvironment(unnestOp)
+                .getType(optFuncExpr.getLogicalExpr(funcVarIndex));
+        // Set the fieldName in the corresponding matched function
+        // expression.
+        optFuncExpr.setFieldName(funcVarIndex, fieldName);
+        optFuncExpr.setFieldType(funcVarIndex, fieldType);
+
+        setTypeTag(context, subTree, optFuncExpr, funcVarIndex);
+        if (subTree.hasDataSource()) {
+            fillIndexExprs(datasetIndexes, fieldName, fieldType, optFuncExpr, optFuncExprIndex, funcVarIndex, subTree,
+                    analysisCtx);
+        }
+    }
+
+    private void analyzeAssignOp(AssignOperator assignOp, IOptimizableFuncExpr optFuncExpr,
+            OptimizableOperatorSubTree subTree, int assignOrUnnestIndex, LogicalVariable datasetRecordVar,
+            LogicalVariable datasetMetaVar, IOptimizationContext context, List<Index> datasetIndexes,
+            int optFuncExprIndex, AccessMethodAnalysisContext analysisCtx) throws AlgebricksException {
+        List<LogicalVariable> varList = assignOp.getVariables();
+        for (int varIndex = 0; varIndex < varList.size(); varIndex++) {
+            LogicalVariable var = varList.get(varIndex);
+            int optVarIndex = optFuncExpr.findLogicalVar(var);
+            // No matching var in optFuncExpr.
+            if (optVarIndex == -1) {
+                continue;
+            }
+            // At this point we have matched the optimizable func
+            // expr at optFuncExprIndex to an assigned variable.
+            // Remember matching subtree.
+            optFuncExpr.setOptimizableSubTree(optVarIndex, subTree);
+
+            List<String> fieldName = getFieldNameFromSubTree(optFuncExpr, subTree, assignOrUnnestIndex, varIndex,
+                    subTree.recordType, optVarIndex,
+                    optFuncExpr.getFuncExpr().getArguments().get(optVarIndex).getValue(), datasetRecordVar,
+                    subTree.metaRecordType, datasetMetaVar);
+            if (fieldName == null) {
+                continue;
+            }
+            IAType fieldType = (IAType) context.getOutputTypeEnvironment(assignOp)
+                    .getType(optFuncExpr.getLogicalExpr(optVarIndex));
+            // Set the fieldName in the corresponding matched
+            // function expression.
+            optFuncExpr.setFieldName(optVarIndex, fieldName);
+            optFuncExpr.setFieldType(optVarIndex, fieldType);
+
+            setTypeTag(context, subTree, optFuncExpr, optVarIndex);
+            if (subTree.hasDataSource()) {
+                fillIndexExprs(datasetIndexes, fieldName, fieldType, optFuncExpr, optFuncExprIndex, optVarIndex,
+                        subTree, analysisCtx);
+            }
+        }
+    }
+
     private void matchVarsFromOptFuncExprToDataSourceScan(IOptimizableFuncExpr optFuncExpr, int optFuncExprIndex,
             List<Index> datasetIndexes, List<LogicalVariable> dsVarList, OptimizableOperatorSubTree subTree,
             AccessMethodAnalysisContext analysisCtx, IOptimizationContext context, boolean fromAdditionalDataSource)
-                    throws AlgebricksException {
+            throws AlgebricksException {
         for (int varIndex = 0; varIndex < dsVarList.size(); varIndex++) {
             LogicalVariable var = dsVarList.get(varIndex);
             int funcVarIndex = optFuncExpr.findLogicalVar(var);
@@ -640,7 +665,8 @@
      */
     protected List<String> getFieldNameFromSubTree(IOptimizableFuncExpr optFuncExpr, OptimizableOperatorSubTree subTree,
             int opIndex, int assignVarIndex, ARecordType recordType, int funcVarIndex,
-            ILogicalExpression parentFuncExpr) throws AlgebricksException {
+            ILogicalExpression parentFuncExpr, LogicalVariable recordVar, ARecordType metaType, LogicalVariable metaVar)
+            throws AlgebricksException {
         // Get expression corresponding to opVar at varIndex.
         AbstractLogicalExpression expr = null;
         AbstractFunctionCallExpression childFuncExpr = null;
@@ -705,6 +731,8 @@
             isByName = true;
         }
         if (isFieldAccess) {
+            LogicalVariable sourceVar = ((VariableReferenceExpression) funcExpr.getArguments().get(0).getValue())
+                    .getVariableReference();
             optFuncExpr.setLogicalExpr(funcVarIndex, parentFuncExpr);
             int[] assignAndExpressionIndexes = null;
 
@@ -743,7 +771,7 @@
                 //Recursive call on nested assign
                 List<String> parentFieldNames = getFieldNameFromSubTree(optFuncExpr, subTree,
                         assignAndExpressionIndexes[0], assignAndExpressionIndexes[1], recordType, funcVarIndex,
-                        parentFuncExpr);
+                        parentFuncExpr, recordVar, metaType, metaVar);
 
                 if (parentFieldNames == null) {
                     //Nested assign was not a field access.
@@ -752,8 +780,9 @@
                 }
 
                 if (!isByName) {
-                    fieldName = ((ARecordType) recordType.getSubFieldType(parentFieldNames))
-                            .getFieldNames()[fieldIndex];
+                    fieldName = sourceVar.equals(metaVar)
+                            ? ((ARecordType) metaType.getSubFieldType(parentFieldNames)).getFieldNames()[fieldIndex]
+                            : ((ARecordType) recordType.getSubFieldType(parentFieldNames)).getFieldNames()[fieldIndex];
                 }
                 optFuncExpr.setSourceVar(funcVarIndex, ((AssignOperator) op).getVariables().get(assignVarIndex));
                 //add fieldName to the nested fieldName, return
@@ -773,9 +802,10 @@
                 if (nestedAccessFieldName != null) {
                     return nestedAccessFieldName;
                 }
-                return new ArrayList<String>(Arrays.asList(fieldName));
+                return new ArrayList<>(Arrays.asList(fieldName));
             }
-            return new ArrayList<String>(Arrays.asList(recordType.getFieldNames()[fieldIndex]));
+            return new ArrayList<>(Arrays.asList(sourceVar.equals(metaVar) ? metaType.getFieldNames()[fieldIndex]
+                    : recordType.getFieldNames()[fieldIndex]));
 
         }
 
@@ -806,7 +836,7 @@
                     if (var.equals(curVar)) {
                         optFuncExpr.setSourceVar(funcVarIndex, var);
                         return getFieldNameFromSubTree(optFuncExpr, subTree, assignOrUnnestIndex, varIndex, recordType,
-                                funcVarIndex, childFuncExpr);
+                                funcVarIndex, childFuncExpr, recordVar, metaType, metaVar);
                     }
                 }
             } else {
@@ -814,7 +844,7 @@
                 LogicalVariable var = unnestOp.getVariable();
                 if (var.equals(curVar)) {
                     getFieldNameFromSubTree(optFuncExpr, subTree, assignOrUnnestIndex, 0, recordType, funcVarIndex,
-                            childFuncExpr);
+                            childFuncExpr, recordVar, metaType, metaVar);
                 }
             }
         }
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
index bfa29ec..0040b61 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
@@ -135,7 +135,6 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
@@ -216,19 +215,19 @@
         ILogicalOperator etsOp = new EmptyTupleSourceOperator();
 
         // Add a logical variable for the record.
-        List<LogicalVariable> payloadVars = new ArrayList<LogicalVariable>();
+        List<LogicalVariable> payloadVars = new ArrayList<>();
         payloadVars.add(context.newVar());
 
         // Create a scan operator and make the empty tuple source its input
         DataSourceScanOperator dssOp = new DataSourceScanOperator(payloadVars, lds);
-        dssOp.getInputs().add(new MutableObject<ILogicalOperator>(etsOp));
+        dssOp.getInputs().add(new MutableObject<>(etsOp));
         ILogicalExpression payloadExpr = new VariableReferenceExpression(payloadVars.get(0));
-        Mutable<ILogicalExpression> payloadRef = new MutableObject<ILogicalExpression>(payloadExpr);
+        Mutable<ILogicalExpression> payloadRef = new MutableObject<>(payloadExpr);
 
         // Creating the assign to extract the PK out of the record
-        ArrayList<LogicalVariable> pkVars = new ArrayList<LogicalVariable>();
-        ArrayList<Mutable<ILogicalExpression>> pkExprs = new ArrayList<Mutable<ILogicalExpression>>();
-        List<Mutable<ILogicalExpression>> varRefsForLoading = new ArrayList<Mutable<ILogicalExpression>>();
+        ArrayList<LogicalVariable> pkVars = new ArrayList<>();
+        ArrayList<Mutable<ILogicalExpression>> pkExprs = new ArrayList<>();
+        List<Mutable<ILogicalExpression>> varRefsForLoading = new ArrayList<>();
         LogicalVariable payloadVar = payloadVars.get(0);
         for (List<String> keyFieldName : partitionKeys) {
             PlanTranslationUtil.prepareVarAndExpression(keyFieldName, payloadVar, pkVars, pkExprs, varRefsForLoading,
@@ -236,11 +235,11 @@
         }
 
         AssignOperator assign = new AssignOperator(pkVars, pkExprs);
-        assign.getInputs().add(new MutableObject<ILogicalOperator>(dssOp));
+        assign.getInputs().add(new MutableObject<>(dssOp));
 
         // If the input is pre-sorted, we set the ordering property explicitly in the assign
         if (clffs.alreadySorted()) {
-            List<OrderColumn> orderColumns = new ArrayList<OrderColumn>();
+            List<OrderColumn> orderColumns = new ArrayList<>();
             for (int i = 0; i < pkVars.size(); ++i) {
                 orderColumns.add(new OrderColumn(pkVars.get(i), OrderKind.ASC));
             }
@@ -253,9 +252,9 @@
         List<Mutable<ILogicalExpression>> additionalFilteringExpressions = null;
         AssignOperator additionalFilteringAssign = null;
         if (additionalFilteringField != null) {
-            additionalFilteringVars = new ArrayList<LogicalVariable>();
-            additionalFilteringAssignExpressions = new ArrayList<Mutable<ILogicalExpression>>();
-            additionalFilteringExpressions = new ArrayList<Mutable<ILogicalExpression>>();
+            additionalFilteringVars = new ArrayList<>();
+            additionalFilteringAssignExpressions = new ArrayList<>();
+            additionalFilteringExpressions = new ArrayList<>();
             PlanTranslationUtil.prepareVarAndExpression(additionalFilteringField, payloadVar, additionalFilteringVars,
                     additionalFilteringAssignExpressions, additionalFilteringExpressions, context);
             additionalFilteringAssign = new AssignOperator(additionalFilteringVars,
@@ -267,15 +266,15 @@
         insertOp.setAdditionalFilteringExpressions(additionalFilteringExpressions);
 
         if (additionalFilteringAssign != null) {
-            additionalFilteringAssign.getInputs().add(new MutableObject<ILogicalOperator>(assign));
-            insertOp.getInputs().add(new MutableObject<ILogicalOperator>(additionalFilteringAssign));
+            additionalFilteringAssign.getInputs().add(new MutableObject<>(assign));
+            insertOp.getInputs().add(new MutableObject<>(additionalFilteringAssign));
         } else {
-            insertOp.getInputs().add(new MutableObject<ILogicalOperator>(assign));
+            insertOp.getInputs().add(new MutableObject<>(assign));
         }
 
         ILogicalOperator leafOperator = new SinkOperator();
-        leafOperator.getInputs().add(new MutableObject<ILogicalOperator>(insertOp));
-        return new ALogicalPlanImpl(new MutableObject<ILogicalOperator>(leafOperator));
+        leafOperator.getInputs().add(new MutableObject<>(insertOp));
+        return new ALogicalPlanImpl(new MutableObject<>(leafOperator));
     }
 
     @SuppressWarnings("unchecked")
@@ -283,8 +282,8 @@
     public ILogicalPlan translate(Query expr, String outputDatasetName, ICompiledDmlStatement stmt)
             throws AlgebricksException, AsterixException {
         Pair<ILogicalOperator, LogicalVariable> p = expr.accept(this,
-                new MutableObject<ILogicalOperator>(new EmptyTupleSourceOperator()));
-        ArrayList<Mutable<ILogicalOperator>> globalPlanRoots = new ArrayList<Mutable<ILogicalOperator>>();
+                new MutableObject<>(new EmptyTupleSourceOperator()));
+        ArrayList<Mutable<ILogicalOperator>> globalPlanRoots = new ArrayList<>();
         ILogicalOperator topOp = p.first;
         ProjectOperator project = (ProjectOperator) topOp;
         LogicalVariable unnestVar = project.getVariables().get(0);
@@ -297,12 +296,12 @@
             }
             metadataProvider.setOutputFile(outputFileSplit);
 
-            List<Mutable<ILogicalExpression>> writeExprList = new ArrayList<Mutable<ILogicalExpression>>(1);
-            writeExprList.add(new MutableObject<ILogicalExpression>(new VariableReferenceExpression(resVar)));
+            List<Mutable<ILogicalExpression>> writeExprList = new ArrayList<>(1);
+            writeExprList.add(new MutableObject<>(new VariableReferenceExpression(resVar)));
             ResultSetSinkId rssId = new ResultSetSinkId(metadataProvider.getResultSetId());
             ResultSetDataSink sink = new ResultSetDataSink(rssId, null);
             topOp = new DistributeResultOperator(writeExprList, sink);
-            topOp.getInputs().add(new MutableObject<ILogicalOperator>(project));
+            topOp.getInputs().add(new MutableObject<>(project));
 
             // Retrieve the Output RecordType (if any) and store it on
             // the DistributeResultOperator
@@ -318,11 +317,10 @@
             LogicalVariable seqVar = context.newVar();
             /** This assign adds a marker function collection-to-sequence: if the input is a singleton collection, unnest it; otherwise do nothing. */
             AssignOperator assignCollectionToSequence = new AssignOperator(seqVar,
-                    new MutableObject<ILogicalExpression>(new ScalarFunctionCallExpression(
+                    new MutableObject<>(new ScalarFunctionCallExpression(
                             FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.COLLECTION_TO_SEQUENCE),
-                            new MutableObject<ILogicalExpression>(new VariableReferenceExpression(resVar)))));
-            assignCollectionToSequence.getInputs()
-                    .add(new MutableObject<ILogicalOperator>(project.getInputs().get(0).getValue()));
+                            new MutableObject<>(new VariableReferenceExpression(resVar)))));
+            assignCollectionToSequence.getInputs().add(new MutableObject<>(project.getInputs().get(0).getValue()));
             project.getInputs().get(0).setValue(assignCollectionToSequence);
             project.getVariables().set(0, seqVar);
             resVar = seqVar;
@@ -330,9 +328,9 @@
                     stmt.getDatasetName());
             List<Integer> keySourceIndicator = ((InternalDatasetDetails) targetDatasource.getDataset()
                     .getDatasetDetails()).getKeySourceIndicator();
-            ArrayList<LogicalVariable> vars = new ArrayList<LogicalVariable>();
-            ArrayList<Mutable<ILogicalExpression>> exprs = new ArrayList<Mutable<ILogicalExpression>>();
-            List<Mutable<ILogicalExpression>> varRefsForLoading = new ArrayList<Mutable<ILogicalExpression>>();
+            ArrayList<LogicalVariable> vars = new ArrayList<>();
+            ArrayList<Mutable<ILogicalExpression>> exprs = new ArrayList<>();
+            List<Mutable<ILogicalExpression>> varRefsForLoading = new ArrayList<>();
             List<List<String>> partitionKeys = DatasetUtils.getPartitioningKeys(targetDatasource.getDataset());
             int numOfPrimaryKeys = partitionKeys.size();
             for (int i = 0; i < numOfPrimaryKeys; i++) {
@@ -354,23 +352,22 @@
             List<Mutable<ILogicalExpression>> additionalFilteringExpressions = null;
             AssignOperator additionalFilteringAssign = null;
             if (additionalFilteringField != null) {
-                additionalFilteringVars = new ArrayList<LogicalVariable>();
-                additionalFilteringAssignExpressions = new ArrayList<Mutable<ILogicalExpression>>();
-                additionalFilteringExpressions = new ArrayList<Mutable<ILogicalExpression>>();
+                additionalFilteringVars = new ArrayList<>();
+                additionalFilteringAssignExpressions = new ArrayList<>();
+                additionalFilteringExpressions = new ArrayList<>();
 
                 PlanTranslationUtil.prepareVarAndExpression(additionalFilteringField, resVar, additionalFilteringVars,
                         additionalFilteringAssignExpressions, additionalFilteringExpressions, context);
 
                 additionalFilteringAssign = new AssignOperator(additionalFilteringVars,
                         additionalFilteringAssignExpressions);
-                additionalFilteringAssign.getInputs().add(new MutableObject<ILogicalOperator>(project));
-                assign.getInputs().add(new MutableObject<ILogicalOperator>(additionalFilteringAssign));
+                additionalFilteringAssign.getInputs().add(new MutableObject<>(project));
+                assign.getInputs().add(new MutableObject<>(additionalFilteringAssign));
             } else {
-                assign.getInputs().add(new MutableObject<ILogicalOperator>(project));
+                assign.getInputs().add(new MutableObject<>(project));
             }
 
-            Mutable<ILogicalExpression> varRef = new MutableObject<ILogicalExpression>(
-                    new VariableReferenceExpression(resVar));
+            Mutable<ILogicalExpression> varRef = new MutableObject<>(new VariableReferenceExpression(resVar));
             ILogicalOperator leafOperator = null;
 
             switch (stmt.getKind()) {
@@ -382,9 +379,9 @@
                     InsertDeleteUpsertOperator insertOp = new InsertDeleteUpsertOperator(targetDatasource, varRef,
                             varRefsForLoading, InsertDeleteUpsertOperator.Kind.INSERT, false);
                     insertOp.setAdditionalFilteringExpressions(additionalFilteringExpressions);
-                    insertOp.getInputs().add(new MutableObject<ILogicalOperator>(assign));
+                    insertOp.getInputs().add(new MutableObject<>(assign));
                     leafOperator = new SinkOperator();
-                    leafOperator.getInputs().add(new MutableObject<ILogicalOperator>(insertOp));
+                    leafOperator.getInputs().add(new MutableObject<>(insertOp));
                     break;
                 }
                 case UPSERT: {
@@ -395,7 +392,7 @@
                     InsertDeleteUpsertOperator upsertOp = new InsertDeleteUpsertOperator(targetDatasource, varRef,
                             varRefsForLoading, InsertDeleteUpsertOperator.Kind.UPSERT, false);
                     upsertOp.setAdditionalFilteringExpressions(additionalFilteringExpressions);
-                    upsertOp.getInputs().add(new MutableObject<ILogicalOperator>(assign));
+                    upsertOp.getInputs().add(new MutableObject<>(assign));
                     // Create and add a new variable used for representing the original record
                     ARecordType recordType = (ARecordType) targetDatasource.getItemType();
                     upsertOp.setPrevRecordVar(context.newVar());
@@ -405,7 +402,7 @@
                         upsertOp.setPrevFilterType(recordType.getFieldType(additionalFilteringField.get(0)));
                     }
                     leafOperator = new SinkOperator();
-                    leafOperator.getInputs().add(new MutableObject<ILogicalOperator>(upsertOp));
+                    leafOperator.getInputs().add(new MutableObject<>(upsertOp));
                     break;
                 }
                 case DELETE: {
@@ -416,18 +413,18 @@
                     InsertDeleteUpsertOperator deleteOp = new InsertDeleteUpsertOperator(targetDatasource, varRef,
                             varRefsForLoading, InsertDeleteUpsertOperator.Kind.DELETE, false);
                     deleteOp.setAdditionalFilteringExpressions(additionalFilteringExpressions);
-                    deleteOp.getInputs().add(new MutableObject<ILogicalOperator>(assign));
+                    deleteOp.getInputs().add(new MutableObject<>(assign));
                     leafOperator = new SinkOperator();
-                    leafOperator.getInputs().add(new MutableObject<ILogicalOperator>(deleteOp));
+                    leafOperator.getInputs().add(new MutableObject<>(deleteOp));
                     break;
                 }
                 case CONNECT_FEED: {
                     InsertDeleteUpsertOperator insertOp = new InsertDeleteUpsertOperator(targetDatasource, varRef,
                             varRefsForLoading, InsertDeleteUpsertOperator.Kind.INSERT, false);
                     insertOp.setAdditionalFilteringExpressions(additionalFilteringExpressions);
-                    insertOp.getInputs().add(new MutableObject<ILogicalOperator>(assign));
+                    insertOp.getInputs().add(new MutableObject<>(assign));
                     leafOperator = new SinkOperator();
-                    leafOperator.getInputs().add(new MutableObject<ILogicalOperator>(insertOp));
+                    leafOperator.getInputs().add(new MutableObject<>(insertOp));
                     break;
                 }
                 case SUBSCRIBE_FEED: {
@@ -448,14 +445,13 @@
                         // add the meta function
                         IFunctionInfo finfoMeta = FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.META);
                         ScalarFunctionCallExpression metaFunction = new ScalarFunctionCallExpression(finfoMeta,
-                                new MutableObject<ILogicalExpression>(new VariableReferenceExpression(unnestVar)));
+                                new MutableObject<>(new VariableReferenceExpression(unnestVar)));
                         // create assign for the meta part
                         LogicalVariable metaVar = context.newVar();
                         metaExpSingletonList = new ArrayList<>(1);
-                        metaExpSingletonList
-                                .add(new MutableObject<ILogicalExpression>(new VariableReferenceExpression(metaVar)));
+                        metaExpSingletonList.add(new MutableObject<>(new VariableReferenceExpression(metaVar)));
                         metaAndKeysVars.add(metaVar);
-                        metaAndKeysExprs.add(new MutableObject<ILogicalExpression>(metaFunction));
+                        metaAndKeysExprs.add(new MutableObject<>(metaFunction));
                         project.getVariables().add(metaVar);
                     }
                     if (isChangeFeed) {
@@ -467,10 +463,9 @@
                                 funcCall.substituteVar(resVar, unnestVar);
                                 LogicalVariable pkVar = context.newVar();
                                 metaAndKeysVars.add(pkVar);
-                                metaAndKeysExprs.add(new MutableObject<ILogicalExpression>(assignExpr.getValue()));
+                                metaAndKeysExprs.add(new MutableObject<>(assignExpr.getValue()));
                                 project.getVariables().add(pkVar);
-                                varRefsForLoading.add(
-                                        new MutableObject<ILogicalExpression>(new VariableReferenceExpression(pkVar)));
+                                varRefsForLoading.add(new MutableObject<>(new VariableReferenceExpression(pkVar)));
                             }
                         }
                         // A change feed, we don't need the assign to access PKs
@@ -479,6 +474,15 @@
                         // Create and add a new variable used for representing the original record
                         feedModificationOp.setPrevRecordVar(context.newVar());
                         feedModificationOp.setPrevRecordType(targetDatasource.getItemType());
+                        if (targetDatasource.getDataset().hasMetaPart()) {
+                            List<LogicalVariable> metaVars = new ArrayList<>();
+                            metaVars.add(context.newVar());
+                            feedModificationOp.setPrevAdditionalNonFilteringVars(metaVars);
+                            List<Object> metaTypes = new ArrayList<>();
+                            metaTypes.add(targetDatasource.getMetaItemType());
+                            feedModificationOp.setPrevAdditionalNonFilteringTypes(metaTypes);
+                        }
+
                         if (additionalFilteringField != null) {
                             feedModificationOp.setPrevFilterVar(context.newVar());
                             feedModificationOp.setPrevFilterType(((ARecordType) targetDatasource.getItemType())
@@ -492,16 +496,16 @@
                     } else {
                         feedModificationOp = new InsertDeleteUpsertOperator(targetDatasource, varRef, varRefsForLoading,
                                 metaExpSingletonList, InsertDeleteUpsertOperator.Kind.INSERT, false);
-                        feedModificationOp.getInputs().add(new MutableObject<ILogicalOperator>(assign));
+                        feedModificationOp.getInputs().add(new MutableObject<>(assign));
                     }
                     if (targetDatasource.getDataset().hasMetaPart() || isChangeFeed) {
                         metaAndKeysAssign = new AssignOperator(metaAndKeysVars, metaAndKeysExprs);
                         metaAndKeysAssign.getInputs().add(project.getInputs().get(0));
-                        project.getInputs().set(0, new MutableObject<ILogicalOperator>(metaAndKeysAssign));
+                        project.getInputs().set(0, new MutableObject<>(metaAndKeysAssign));
                     }
                     feedModificationOp.setAdditionalFilteringExpressions(additionalFilteringExpressions);
                     leafOperator = new SinkOperator();
-                    leafOperator.getInputs().add(new MutableObject<ILogicalOperator>(feedModificationOp));
+                    leafOperator.getInputs().add(new MutableObject<>(feedModificationOp));
                     break;
                 }
                 default:
@@ -509,7 +513,7 @@
             }
             topOp = leafOperator;
         }
-        globalPlanRoots.add(new MutableObject<ILogicalOperator>(topOp));
+        globalPlanRoots.add(new MutableObject<>(topOp));
         ILogicalPlan plan = new ALogicalPlanImpl(globalPlanRoots);
         eliminateSharedOperatorReferenceForPlan(plan);
         return plan;
@@ -552,8 +556,7 @@
             case VARIABLE_EXPRESSION: {
                 v = context.newVar(lc.getVarExpr());
                 LogicalVariable prev = context.getVar(((VariableExpr) lc.getBindingExpr()).getVar().getId());
-                returnedOp = new AssignOperator(v,
-                        new MutableObject<ILogicalExpression>(new VariableReferenceExpression(prev)));
+                returnedOp = new AssignOperator(v, new MutableObject<>(new VariableReferenceExpression(prev)));
                 returnedOp.getInputs().add(tupSource);
                 break;
             }
@@ -561,12 +564,12 @@
                 v = context.newVar(lc.getVarExpr());
                 Pair<ILogicalExpression, Mutable<ILogicalOperator>> eo = langExprToAlgExpression(lc.getBindingExpr(),
                         tupSource);
-                returnedOp = new AssignOperator(v, new MutableObject<ILogicalExpression>(eo.first));
+                returnedOp = new AssignOperator(v, new MutableObject<>(eo.first));
                 returnedOp.getInputs().add(eo.second);
                 break;
             }
         }
-        return new Pair<ILogicalOperator, LogicalVariable>(returnedOp, v);
+        return new Pair<>(returnedOp, v);
     }
 
     @Override
@@ -576,13 +579,13 @@
         LogicalVariable v = context.newVar();
         AbstractFunctionCallExpression fldAccess = new ScalarFunctionCallExpression(
                 FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.FIELD_ACCESS_BY_NAME));
-        fldAccess.getArguments().add(new MutableObject<ILogicalExpression>(p.first));
+        fldAccess.getArguments().add(new MutableObject<>(p.first));
         ILogicalExpression faExpr = new ConstantExpression(
                 new AsterixConstantValue(new AString(fa.getIdent().getValue())));
-        fldAccess.getArguments().add(new MutableObject<ILogicalExpression>(faExpr));
-        AssignOperator a = new AssignOperator(v, new MutableObject<ILogicalExpression>(fldAccess));
+        fldAccess.getArguments().add(new MutableObject<>(faExpr));
+        AssignOperator a = new AssignOperator(v, new MutableObject<>(fldAccess));
         a.getInputs().add(p.second);
-        return new Pair<ILogicalOperator, LogicalVariable>(a, v);
+        return new Pair<>(a, v);
     }
 
     @Override
@@ -594,17 +597,17 @@
         if (ia.isAny()) {
             f = new ScalarFunctionCallExpression(
                     FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.ANY_COLLECTION_MEMBER));
-            f.getArguments().add(new MutableObject<ILogicalExpression>(p.first));
+            f.getArguments().add(new MutableObject<>(p.first));
         } else {
             Pair<ILogicalExpression, Mutable<ILogicalOperator>> indexPair = langExprToAlgExpression(ia.getIndexExpr(),
                     tupSource);
             f = new ScalarFunctionCallExpression(FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.GET_ITEM));
-            f.getArguments().add(new MutableObject<ILogicalExpression>(p.first));
-            f.getArguments().add(new MutableObject<ILogicalExpression>(indexPair.first));
+            f.getArguments().add(new MutableObject<>(p.first));
+            f.getArguments().add(new MutableObject<>(indexPair.first));
         }
-        AssignOperator a = new AssignOperator(v, new MutableObject<ILogicalExpression>(f));
+        AssignOperator a = new AssignOperator(v, new MutableObject<>(f));
         a.getInputs().add(p.second);
-        return new Pair<ILogicalOperator, LogicalVariable>(a, v);
+        return new Pair<>(a, v);
     }
 
     @Override
@@ -612,26 +615,26 @@
             throws AsterixException {
         LogicalVariable v = context.newVar();
         FunctionSignature signature = fcall.getFunctionSignature();
-        List<Mutable<ILogicalExpression>> args = new ArrayList<Mutable<ILogicalExpression>>();
+        List<Mutable<ILogicalExpression>> args = new ArrayList<>();
         Mutable<ILogicalOperator> topOp = tupSource;
 
         for (Expression expr : fcall.getExprList()) {
             switch (expr.getKind()) {
                 case VARIABLE_EXPRESSION: {
                     LogicalVariable var = context.getVar(((VariableExpr) expr).getVar().getId());
-                    args.add(new MutableObject<ILogicalExpression>(new VariableReferenceExpression(var)));
+                    args.add(new MutableObject<>(new VariableReferenceExpression(var)));
                     break;
                 }
                 case LITERAL_EXPRESSION: {
                     LiteralExpr val = (LiteralExpr) expr;
-                    args.add(new MutableObject<ILogicalExpression>(new ConstantExpression(
+                    args.add(new MutableObject<>(new ConstantExpression(
                             new AsterixConstantValue(ConstantHelper.objectFromLiteral(val.getValue())))));
                     break;
                 }
                 default: {
                     Pair<ILogicalExpression, Mutable<ILogicalOperator>> eo = langExprToAlgExpression(expr, topOp);
                     AbstractLogicalOperator o1 = (AbstractLogicalOperator) eo.second.getValue();
-                    args.add(new MutableObject<ILogicalExpression>(eo.first));
+                    args.add(new MutableObject<>(eo.first));
                     if (o1 != null && !(o1.getOperatorTag() == LogicalOperatorTag.ASSIGN && hasOnlyChild(o1, topOp))) {
                         topOp = eo.second;
                     }
@@ -656,12 +659,12 @@
             }
         }
 
-        AssignOperator op = new AssignOperator(v, new MutableObject<ILogicalExpression>(f));
+        AssignOperator op = new AssignOperator(v, new MutableObject<>(f));
         if (topOp != null) {
             op.getInputs().add(topOp);
         }
 
-        return new Pair<ILogicalOperator, LogicalVariable>(op, v);
+        return new Pair<>(op, v);
     }
 
     private AbstractFunctionCallExpression lookupUserDefinedFunction(FunctionSignature signature,
@@ -733,17 +736,17 @@
             for (Pair<Expression, Identifier> groupField : groupFieldList) {
                 ILogicalExpression groupFieldNameExpr = langExprToAlgExpression(
                         new LiteralExpr(new StringLiteral(groupField.second.getValue())), topOp).first;
-                groupRecordConstructorArgList.add(new MutableObject<ILogicalExpression>(groupFieldNameExpr));
+                groupRecordConstructorArgList.add(new MutableObject<>(groupFieldNameExpr));
                 ILogicalExpression groupFieldExpr = langExprToAlgExpression(groupField.first, topOp).first;
-                groupRecordConstructorArgList.add(new MutableObject<ILogicalExpression>(groupFieldExpr));
+                groupRecordConstructorArgList.add(new MutableObject<>(groupFieldExpr));
             }
             LogicalVariable groupVar = context.newVar(gc.getGroupVar());
             AssignOperator groupVarAssignOp = new AssignOperator(groupVar,
-                    new MutableObject<ILogicalExpression>(new ScalarFunctionCallExpression(
+                    new MutableObject<>(new ScalarFunctionCallExpression(
                             FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.OPEN_RECORD_CONSTRUCTOR),
                             groupRecordConstructorArgList)));
             groupVarAssignOp.getInputs().add(topOp);
-            topOp = new MutableObject<ILogicalOperator>(groupVarAssignOp);
+            topOp = new MutableObject<>(groupVarAssignOp);
         }
         if (gc.isGroupAll()) {
             List<LogicalVariable> aggVars = new ArrayList<>();
@@ -751,19 +754,19 @@
             for (VariableExpr var : gc.getWithVarList()) {
                 LogicalVariable aggVar = context.newVar();
                 LogicalVariable oldVar = context.getVar(var);
-                List<Mutable<ILogicalExpression>> flArgs = new ArrayList<Mutable<ILogicalExpression>>();
-                flArgs.add(new MutableObject<ILogicalExpression>(new VariableReferenceExpression(oldVar)));
+                List<Mutable<ILogicalExpression>> flArgs = new ArrayList<>();
+                flArgs.add(new MutableObject<>(new VariableReferenceExpression(oldVar)));
                 AggregateFunctionCallExpression fListify = AsterixBuiltinFunctions
                         .makeAggregateFunctionExpression(AsterixBuiltinFunctions.LISTIFY, flArgs);
                 aggVars.add(aggVar);
-                aggFuncs.add(new MutableObject<ILogicalExpression>(fListify));
+                aggFuncs.add(new MutableObject<>(fListify));
                 // Hide the variable that was part of the "with", replacing it with
                 // the one bound by the aggregation op.
                 context.setVar(var, aggVar);
             }
             AggregateOperator aggOp = new AggregateOperator(aggVars, aggFuncs);
             aggOp.getInputs().add(topOp);
-            return new Pair<ILogicalOperator, LogicalVariable>(aggOp, null);
+            return new Pair<>(aggOp, null);
         } else {
             GroupByOperator gOp = new GroupByOperator();
             for (GbyVariableExpressionPair ve : gc.getGbyPairList()) {
@@ -795,23 +798,22 @@
             for (VariableExpr var : gc.getWithVarList()) {
                 LogicalVariable aggVar = context.newVar();
                 LogicalVariable oldVar = context.getVar(var);
-                List<Mutable<ILogicalExpression>> flArgs = new ArrayList<Mutable<ILogicalExpression>>(1);
-                flArgs.add(new MutableObject<ILogicalExpression>(new VariableReferenceExpression(oldVar)));
+                List<Mutable<ILogicalExpression>> flArgs = new ArrayList<>(1);
+                flArgs.add(new MutableObject<>(new VariableReferenceExpression(oldVar)));
                 AggregateFunctionCallExpression fListify = AsterixBuiltinFunctions
                         .makeAggregateFunctionExpression(AsterixBuiltinFunctions.LISTIFY, flArgs);
                 AggregateOperator agg = new AggregateOperator(mkSingletonArrayList(aggVar),
-                        (List) mkSingletonArrayList(new MutableObject<ILogicalExpression>(fListify)));
+                        (List) mkSingletonArrayList(new MutableObject<>(fListify)));
 
-                agg.getInputs().add(new MutableObject<ILogicalOperator>(
-                        new NestedTupleSourceOperator(new MutableObject<ILogicalOperator>(gOp))));
-                ILogicalPlan plan = new ALogicalPlanImpl(new MutableObject<ILogicalOperator>(agg));
+                agg.getInputs().add(new MutableObject<>(new NestedTupleSourceOperator(new MutableObject<>(gOp))));
+                ILogicalPlan plan = new ALogicalPlanImpl(new MutableObject<>(agg));
                 gOp.getNestedPlans().add(plan);
                 // Hide the variable that was part of the "with", replacing it with
                 // the one bound by the aggregation op.
                 context.setVar(var, aggVar);
             }
             gOp.getAnnotations().put(OperatorAnnotations.USE_HASH_GROUP_BY, gc.hasHashGroupByHint());
-            return new Pair<ILogicalOperator, LogicalVariable>(gOp, null);
+            return new Pair<>(gOp, null);
         }
 
     }
@@ -833,54 +835,53 @@
 
         //Creates a subplan for the "then" branch.
         Pair<ILogicalOperator, LogicalVariable> opAndVarForThen = constructSubplanOperatorForBranch(pCond.first,
-                new MutableObject<ILogicalExpression>(new VariableReferenceExpression(varCond)), ifexpr.getThenExpr());
+                new MutableObject<>(new VariableReferenceExpression(varCond)), ifexpr.getThenExpr());
 
         // Creates a subplan for the "else" branch.
         AbstractFunctionCallExpression notVarCond = new ScalarFunctionCallExpression(
-                FunctionUtil.getFunctionInfo(AlgebricksBuiltinFunctions.NOT), Collections.singletonList(
-                        new MutableObject<ILogicalExpression>(new VariableReferenceExpression(varCond))));
+                FunctionUtil.getFunctionInfo(AlgebricksBuiltinFunctions.NOT),
+                Collections.singletonList(new MutableObject<>(new VariableReferenceExpression(varCond))));
         Pair<ILogicalOperator, LogicalVariable> opAndVarForElse = constructSubplanOperatorForBranch(
-                opAndVarForThen.first, new MutableObject<ILogicalExpression>(notVarCond), ifexpr.getElseExpr());
+                opAndVarForThen.first, new MutableObject<>(notVarCond), ifexpr.getElseExpr());
 
         // Uses switch-case function to select the results of two branches.
         LogicalVariable selectVar = context.newVar();
         List<Mutable<ILogicalExpression>> arguments = new ArrayList<>();
-        arguments.add(new MutableObject<ILogicalExpression>(new VariableReferenceExpression(varCond)));
-        arguments.add(new MutableObject<ILogicalExpression>(ConstantExpression.TRUE));
-        arguments.add(new MutableObject<ILogicalExpression>(new VariableReferenceExpression(opAndVarForThen.second)));
-        arguments.add(new MutableObject<ILogicalExpression>(ConstantExpression.FALSE));
-        arguments.add(new MutableObject<ILogicalExpression>(new VariableReferenceExpression(opAndVarForElse.second)));
+        arguments.add(new MutableObject<>(new VariableReferenceExpression(varCond)));
+        arguments.add(new MutableObject<>(ConstantExpression.TRUE));
+        arguments.add(new MutableObject<>(new VariableReferenceExpression(opAndVarForThen.second)));
+        arguments.add(new MutableObject<>(ConstantExpression.FALSE));
+        arguments.add(new MutableObject<>(new VariableReferenceExpression(opAndVarForElse.second)));
         AbstractFunctionCallExpression swithCaseExpr = new ScalarFunctionCallExpression(
                 FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.SWITCH_CASE), arguments);
-        AssignOperator assignOp = new AssignOperator(selectVar, new MutableObject<ILogicalExpression>(swithCaseExpr));
-        assignOp.getInputs().add(new MutableObject<ILogicalOperator>(opAndVarForElse.first));
+        AssignOperator assignOp = new AssignOperator(selectVar, new MutableObject<>(swithCaseExpr));
+        assignOp.getInputs().add(new MutableObject<>(opAndVarForElse.first));
 
         // Unnests the selected ("if" or "else") result.
         LogicalVariable unnestVar = context.newVar();
         UnnestOperator unnestOp = new UnnestOperator(unnestVar,
-                new MutableObject<ILogicalExpression>(new UnnestingFunctionCallExpression(
-                        FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.SCAN_COLLECTION),
-                        Collections.singletonList(
-                                new MutableObject<ILogicalExpression>(new VariableReferenceExpression(selectVar))))));
-        unnestOp.getInputs().add(new MutableObject<ILogicalOperator>(assignOp));
+                new MutableObject<>(new UnnestingFunctionCallExpression(
+                        FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.SCAN_COLLECTION), Collections
+                                .singletonList(new MutableObject<>(new VariableReferenceExpression(selectVar))))));
+        unnestOp.getInputs().add(new MutableObject<>(assignOp));
 
         // Produces the final result.
         LogicalVariable resultVar = context.newVar();
         AssignOperator finalAssignOp = new AssignOperator(resultVar,
-                new MutableObject<ILogicalExpression>(new VariableReferenceExpression(unnestVar)));
-        finalAssignOp.getInputs().add(new MutableObject<ILogicalOperator>(unnestOp));
-        return new Pair<ILogicalOperator, LogicalVariable>(finalAssignOp, resultVar);
+                new MutableObject<>(new VariableReferenceExpression(unnestVar)));
+        finalAssignOp.getInputs().add(new MutableObject<>(unnestOp));
+        return new Pair<>(finalAssignOp, resultVar);
     }
 
     @Override
     public Pair<ILogicalOperator, LogicalVariable> visit(LiteralExpr l, Mutable<ILogicalOperator> tupSource) {
         LogicalVariable var = context.newVar();
-        AssignOperator a = new AssignOperator(var, new MutableObject<ILogicalExpression>(
+        AssignOperator a = new AssignOperator(var, new MutableObject<>(
                 new ConstantExpression(new AsterixConstantValue(ConstantHelper.objectFromLiteral(l.getValue())))));
         if (tupSource != null) {
             a.getInputs().add(tupSource);
         }
-        return new Pair<ILogicalOperator, LogicalVariable>(a, var);
+        return new Pair<>(a, var);
     }
 
     @Override
@@ -910,7 +911,7 @@
 
                     // chain the operators
                     if (i == 0) {
-                        c.getArguments().add(new MutableObject<ILogicalExpression>(e));
+                        c.getArguments().add(new MutableObject<>(e));
                         currExpr = c;
                         if (op.isBroadcastOperand(i)) {
                             BroadcastExpressionAnnotation bcast = new BroadcastExpressionAnnotation();
@@ -918,9 +919,8 @@
                             c.getAnnotations().put(BroadcastExpressionAnnotation.BROADCAST_ANNOTATION_KEY, bcast);
                         }
                     } else {
-                        ((AbstractFunctionCallExpression) currExpr).getArguments()
-                                .add(new MutableObject<ILogicalExpression>(e));
-                        c.getArguments().add(new MutableObject<ILogicalExpression>(currExpr));
+                        ((AbstractFunctionCallExpression) currExpr).getArguments().add(new MutableObject<>(e));
+                        c.getArguments().add(new MutableObject<>(currExpr));
                         currExpr = c;
                         if (i == 1 && op.isBroadcastOperand(i)) {
                             BroadcastExpressionAnnotation bcast = new BroadcastExpressionAnnotation();
@@ -932,18 +932,16 @@
                     AbstractFunctionCallExpression f = createFunctionCallExpressionForBuiltinOperator(ops.get(i));
 
                     if (i == 0) {
-                        f.getArguments().add(new MutableObject<ILogicalExpression>(e));
+                        f.getArguments().add(new MutableObject<>(e));
                         currExpr = f;
                     } else {
-                        ((AbstractFunctionCallExpression) currExpr).getArguments()
-                                .add(new MutableObject<ILogicalExpression>(e));
-                        f.getArguments().add(new MutableObject<ILogicalExpression>(currExpr));
+                        ((AbstractFunctionCallExpression) currExpr).getArguments().add(new MutableObject<>(e));
+                        f.getArguments().add(new MutableObject<>(currExpr));
                         currExpr = f;
                     }
                 }
             } else { // don't forget the last expression...
-                ((AbstractFunctionCallExpression) currExpr).getArguments()
-                        .add(new MutableObject<ILogicalExpression>(e));
+                ((AbstractFunctionCallExpression) currExpr).getArguments().add(new MutableObject<>(e));
                 if (i == 1 && op.isBroadcastOperand(i)) {
                     BroadcastExpressionAnnotation bcast = new BroadcastExpressionAnnotation();
                     bcast.setObject(BroadcastSide.RIGHT);
@@ -962,11 +960,11 @@
         }
 
         LogicalVariable assignedVar = context.newVar();
-        AssignOperator a = new AssignOperator(assignedVar, new MutableObject<ILogicalExpression>(currExpr));
+        AssignOperator a = new AssignOperator(assignedVar, new MutableObject<>(currExpr));
 
         a.getInputs().add(topOp);
 
-        return new Pair<ILogicalOperator, LogicalVariable>(a, assignedVar);
+        return new Pair<>(a, assignedVar);
     }
 
     @Override
@@ -979,8 +977,7 @@
             Pair<ILogicalExpression, Mutable<ILogicalOperator>> p = langExprToAlgExpression(e, topOp);
             OrderModifier m = modifIter.next();
             OrderOperator.IOrder comp = (m == OrderModifier.ASC) ? OrderOperator.ASC_ORDER : OrderOperator.DESC_ORDER;
-            ord.getOrderExpressions().add(new Pair<IOrder, Mutable<ILogicalExpression>>(comp,
-                    new MutableObject<ILogicalExpression>(p.first)));
+            ord.getOrderExpressions().add(new Pair<>(comp, new MutableObject<>(p.first)));
             topOp = p.second;
         }
         ord.getInputs().add(topOp);
@@ -996,7 +993,7 @@
             RangeMapBuilder.verifyRangeOrder(oc.getRangeMap(), ascending);
             ord.getAnnotations().put(OperatorAnnotations.USE_RANGE_CONNECTOR, oc.getRangeMap());
         }
-        return new Pair<ILogicalOperator, LogicalVariable>(ord, null);
+        return new Pair<>(ord, null);
     }
 
     @Override
@@ -1011,8 +1008,7 @@
             Pair<ILogicalExpression, Mutable<ILogicalOperator>> eo1 = langExprToAlgExpression(qt.getExpr(), topOp);
             topOp = eo1.second;
             LogicalVariable uVar = context.newVar(qt.getVarExpr());
-            ILogicalOperator u = new UnnestOperator(uVar,
-                    new MutableObject<ILogicalExpression>(makeUnnestExpression(eo1.first)));
+            ILogicalOperator u = new UnnestOperator(uVar, new MutableObject<>(makeUnnestExpression(eo1.first)));
 
             if (firstOp == null) {
                 firstOp = u;
@@ -1020,7 +1016,7 @@
             if (lastOp != null) {
                 u.getInputs().add(lastOp);
             }
-            lastOp = new MutableObject<ILogicalOperator>(u);
+            lastOp = new MutableObject<>(u);
         }
 
         // We make all the unnest correspond. to quantif. vars. sit on top
@@ -1033,24 +1029,24 @@
         AggregateFunctionCallExpression fAgg;
         SelectOperator s;
         if (qe.getQuantifier() == Quantifier.SOME) {
-            s = new SelectOperator(new MutableObject<ILogicalExpression>(eo2.first), false, null);
+            s = new SelectOperator(new MutableObject<>(eo2.first), false, null);
             s.getInputs().add(eo2.second);
             fAgg = AsterixBuiltinFunctions.makeAggregateFunctionExpression(AsterixBuiltinFunctions.NON_EMPTY_STREAM,
-                    new ArrayList<Mutable<ILogicalExpression>>());
+                    new ArrayList<>());
         } else { // EVERY
-            List<Mutable<ILogicalExpression>> satExprList = new ArrayList<Mutable<ILogicalExpression>>(1);
-            satExprList.add(new MutableObject<ILogicalExpression>(eo2.first));
-            s = new SelectOperator(new MutableObject<ILogicalExpression>(new ScalarFunctionCallExpression(
+            List<Mutable<ILogicalExpression>> satExprList = new ArrayList<>(1);
+            satExprList.add(new MutableObject<>(eo2.first));
+            s = new SelectOperator(new MutableObject<>(new ScalarFunctionCallExpression(
                     FunctionUtil.getFunctionInfo(AlgebricksBuiltinFunctions.NOT), satExprList)), false, null);
             s.getInputs().add(eo2.second);
             fAgg = AsterixBuiltinFunctions.makeAggregateFunctionExpression(AsterixBuiltinFunctions.EMPTY_STREAM,
-                    new ArrayList<Mutable<ILogicalExpression>>());
+                    new ArrayList<>());
         }
         LogicalVariable qeVar = context.newVar();
         AggregateOperator a = new AggregateOperator(mkSingletonArrayList(qeVar),
-                (List) mkSingletonArrayList(new MutableObject<ILogicalExpression>(fAgg)));
-        a.getInputs().add(new MutableObject<ILogicalOperator>(s));
-        return new Pair<ILogicalOperator, LogicalVariable>(a, qeVar);
+                (List) mkSingletonArrayList(new MutableObject<>(fAgg)));
+        a.getInputs().add(new MutableObject<>(s));
+        return new Pair<>(a, qeVar);
     }
 
     @Override
@@ -1065,18 +1061,18 @@
         AbstractFunctionCallExpression f = new ScalarFunctionCallExpression(
                 FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.OPEN_RECORD_CONSTRUCTOR));
         LogicalVariable v1 = context.newVar();
-        AssignOperator a = new AssignOperator(v1, new MutableObject<ILogicalExpression>(f));
+        AssignOperator a = new AssignOperator(v1, new MutableObject<>(f));
         Mutable<ILogicalOperator> topOp = tupSource;
         for (FieldBinding fb : rc.getFbList()) {
             Pair<ILogicalExpression, Mutable<ILogicalOperator>> eo1 = langExprToAlgExpression(fb.getLeftExpr(), topOp);
-            f.getArguments().add(new MutableObject<ILogicalExpression>(eo1.first));
+            f.getArguments().add(new MutableObject<>(eo1.first));
             topOp = eo1.second;
             Pair<ILogicalExpression, Mutable<ILogicalOperator>> eo2 = langExprToAlgExpression(fb.getRightExpr(), topOp);
-            f.getArguments().add(new MutableObject<ILogicalExpression>(eo2.first));
+            f.getArguments().add(new MutableObject<>(eo2.first));
             topOp = eo2.second;
         }
         a.getInputs().add(topOp);
-        return new Pair<ILogicalOperator, LogicalVariable>(a, v1);
+        return new Pair<>(a, v1);
     }
 
     @Override
@@ -1086,15 +1082,15 @@
                 ? AsterixBuiltinFunctions.ORDERED_LIST_CONSTRUCTOR : AsterixBuiltinFunctions.UNORDERED_LIST_CONSTRUCTOR;
         AbstractFunctionCallExpression f = new ScalarFunctionCallExpression(FunctionUtil.getFunctionInfo(fid));
         LogicalVariable v1 = context.newVar();
-        AssignOperator a = new AssignOperator(v1, new MutableObject<ILogicalExpression>(f));
+        AssignOperator a = new AssignOperator(v1, new MutableObject<>(f));
         Mutable<ILogicalOperator> topOp = tupSource;
         for (Expression expr : lc.getExprList()) {
             Pair<ILogicalExpression, Mutable<ILogicalOperator>> eo = langExprToAlgExpression(expr, topOp);
-            f.getArguments().add(new MutableObject<ILogicalExpression>(eo.first));
+            f.getArguments().add(new MutableObject<>(eo.first));
             topOp = eo.second;
         }
         a.getInputs().add(topOp);
-        return new Pair<ILogicalOperator, LogicalVariable>(a, v1);
+        return new Pair<>(a, v1);
     }
 
     @Override
@@ -1105,15 +1101,15 @@
         LogicalVariable v1 = context.newVar();
         AssignOperator a;
         if (u.getSign() == Sign.POSITIVE) {
-            a = new AssignOperator(v1, new MutableObject<ILogicalExpression>(eo.first));
+            a = new AssignOperator(v1, new MutableObject<>(eo.first));
         } else {
             AbstractFunctionCallExpression m = new ScalarFunctionCallExpression(
                     FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.NUMERIC_UNARY_MINUS));
-            m.getArguments().add(new MutableObject<ILogicalExpression>(eo.first));
-            a = new AssignOperator(v1, new MutableObject<ILogicalExpression>(m));
+            m.getArguments().add(new MutableObject<>(eo.first));
+            a = new AssignOperator(v1, new MutableObject<>(m));
         }
         a.getInputs().add(eo.second);
-        return new Pair<ILogicalOperator, LogicalVariable>(a, v1);
+        return new Pair<>(a, v1);
     }
 
     @Override
@@ -1121,19 +1117,18 @@
         // Should we ever get to this method?
         LogicalVariable var = context.newVar();
         LogicalVariable oldV = context.getVar(v.getVar().getId());
-        AssignOperator a = new AssignOperator(var,
-                new MutableObject<ILogicalExpression>(new VariableReferenceExpression(oldV)));
+        AssignOperator a = new AssignOperator(var, new MutableObject<>(new VariableReferenceExpression(oldV)));
         a.getInputs().add(tupSource);
-        return new Pair<ILogicalOperator, LogicalVariable>(a, var);
+        return new Pair<>(a, var);
     }
 
     @Override
     public Pair<ILogicalOperator, LogicalVariable> visit(WhereClause w, Mutable<ILogicalOperator> tupSource)
             throws AsterixException {
         Pair<ILogicalExpression, Mutable<ILogicalOperator>> p = langExprToAlgExpression(w.getWhereExpr(), tupSource);
-        SelectOperator s = new SelectOperator(new MutableObject<ILogicalExpression>(p.first), false, null);
+        SelectOperator s = new SelectOperator(new MutableObject<>(p.first), false, null);
         s.getInputs().add(p.second);
-        return new Pair<ILogicalOperator, LogicalVariable>(s, null);
+        return new Pair<>(s, null);
     }
 
     @Override
@@ -1150,7 +1145,7 @@
             opLim = new LimitOperator(p1.first);
             opLim.getInputs().add(p1.second);
         }
-        return new Pair<ILogicalOperator, LogicalVariable>(opLim, null);
+        return new Pair<>(opLim, null);
     }
 
     protected AbstractFunctionCallExpression createComparisonExpression(OperatorType t) {
@@ -1252,20 +1247,20 @@
             case VARIABLE_EXPRESSION: {
                 VariableReferenceExpression ve = new VariableReferenceExpression(
                         context.getVar(((VariableExpr) expr).getVar().getId()));
-                return new Pair<ILogicalExpression, Mutable<ILogicalOperator>>(ve, topOpRef);
+                return new Pair<>(ve, topOpRef);
             }
             case LITERAL_EXPRESSION: {
                 LiteralExpr val = (LiteralExpr) expr;
-                return new Pair<ILogicalExpression, Mutable<ILogicalOperator>>(new ConstantExpression(
+                return new Pair<>(new ConstantExpression(
                         new AsterixConstantValue(ConstantHelper.objectFromLiteral(val.getValue()))), topOpRef);
             }
             default: {
                 if (expressionNeedsNoNesting(expr)) {
                     Pair<ILogicalOperator, LogicalVariable> p = expr.accept(this, topOpRef);
                     ILogicalExpression exp = ((AssignOperator) p.first).getExpressions().get(0).getValue();
-                    return new Pair<ILogicalExpression, Mutable<ILogicalOperator>>(exp, p.first.getInputs().get(0));
+                    return new Pair<>(exp, p.first.getInputs().get(0));
                 } else {
-                    Mutable<ILogicalOperator> srcRef = new MutableObject<ILogicalOperator>();
+                    Mutable<ILogicalOperator> srcRef = new MutableObject<>();
                     Pair<ILogicalOperator, LogicalVariable> p = expr.accept(this, srcRef);
                     if (p.first.getOperatorTag() == LogicalOperatorTag.SUBPLAN) {
                         if (topOpRef.getValue() != null) {
@@ -1274,17 +1269,15 @@
                             // Re-binds the bottom operator reference to {@code topOpRef}.
                             rebindBottomOpRef(p.first, srcRef, topOpRef);
                         }
-                        Mutable<ILogicalOperator> top2 = new MutableObject<ILogicalOperator>(p.first);
-                        return new Pair<ILogicalExpression, Mutable<ILogicalOperator>>(
-                                new VariableReferenceExpression(p.second), top2);
+                        Mutable<ILogicalOperator> top2 = new MutableObject<>(p.first);
+                        return new Pair<>(new VariableReferenceExpression(p.second), top2);
                     } else {
                         SubplanOperator s = new SubplanOperator();
                         s.getInputs().add(topOpRef);
-                        srcRef.setValue(new NestedTupleSourceOperator(new MutableObject<ILogicalOperator>(s)));
-                        Mutable<ILogicalOperator> planRoot = new MutableObject<ILogicalOperator>(p.first);
+                        srcRef.setValue(new NestedTupleSourceOperator(new MutableObject<>(s)));
+                        Mutable<ILogicalOperator> planRoot = new MutableObject<>(p.first);
                         s.setRootOp(planRoot);
-                        return new Pair<ILogicalExpression, Mutable<ILogicalOperator>>(
-                                new VariableReferenceExpression(p.second), new MutableObject<ILogicalOperator>(s));
+                        return new Pair<>(new VariableReferenceExpression(p.second), new MutableObject<>(s));
                     }
                 }
             }
@@ -1293,23 +1286,23 @@
 
     protected Pair<ILogicalOperator, LogicalVariable> aggListifyForSubquery(LogicalVariable var,
             Mutable<ILogicalOperator> opRef, boolean bProject) {
-        AggregateFunctionCallExpression funAgg = AsterixBuiltinFunctions.makeAggregateFunctionExpression(
-                AsterixBuiltinFunctions.LISTIFY, new ArrayList<Mutable<ILogicalExpression>>());
-        funAgg.getArguments().add(new MutableObject<ILogicalExpression>(new VariableReferenceExpression(var)));
+        AggregateFunctionCallExpression funAgg = AsterixBuiltinFunctions
+                .makeAggregateFunctionExpression(AsterixBuiltinFunctions.LISTIFY, new ArrayList<>());
+        funAgg.getArguments().add(new MutableObject<>(new VariableReferenceExpression(var)));
 
         LogicalVariable varListified = context.newSubplanOutputVar();
         AggregateOperator agg = new AggregateOperator(mkSingletonArrayList(varListified),
-                (List) mkSingletonArrayList(new MutableObject<ILogicalExpression>(funAgg)));
+                (List) mkSingletonArrayList(new MutableObject<>(funAgg)));
         agg.getInputs().add(opRef);
         ILogicalOperator res;
         if (bProject) {
             ProjectOperator pr = new ProjectOperator(varListified);
-            pr.getInputs().add(new MutableObject<ILogicalOperator>(agg));
+            pr.getInputs().add(new MutableObject<>(agg));
             res = pr;
         } else {
             res = agg;
         }
-        return new Pair<ILogicalOperator, LogicalVariable>(res, varListified);
+        return new Pair<>(res, varListified);
     }
 
     protected Pair<ILogicalOperator, LogicalVariable> visitAndOrOperator(OperatorExpr op,
@@ -1334,14 +1327,14 @@
                             "Unexpected operator " + ops.get(i) + " in an OperatorExpr starting with " + opLogical);
                 }
             }
-            f.getArguments().add(new MutableObject<ILogicalExpression>(p.first));
+            f.getArguments().add(new MutableObject<>(p.first));
         }
 
         LogicalVariable assignedVar = context.newVar();
-        AssignOperator a = new AssignOperator(assignedVar, new MutableObject<ILogicalExpression>(f));
+        AssignOperator a = new AssignOperator(assignedVar, new MutableObject<>(f));
         a.getInputs().add(topOp);
 
-        return new Pair<ILogicalOperator, LogicalVariable>(a, assignedVar);
+        return new Pair<>(a, assignedVar);
 
     }
 
@@ -1361,7 +1354,7 @@
 
     protected ILogicalExpression makeUnnestExpression(ILogicalExpression expr) {
         List<Mutable<ILogicalExpression>> argRefs = new ArrayList<>();
-        argRefs.add(new MutableObject<ILogicalExpression>(expr));
+        argRefs.add(new MutableObject<>(expr));
         switch (expr.getExpressionTag()) {
             case CONSTANT:
             case VARIABLE: {
@@ -1463,7 +1456,7 @@
                     varMap.putAll(cloneVarMap);
 
                     // Sets the new child.
-                    childRef = new MutableObject<ILogicalOperator>(newChild);
+                    childRef = new MutableObject<>(newChild);
                     currentOperator.getInputs().set(childIndex, childRef);
                 }
 
@@ -1512,25 +1505,23 @@
             Mutable<ILogicalExpression> selectExpr, Expression branchExpression) throws AsterixException {
         context.enterSubplan();
         SubplanOperator subplanOp = new SubplanOperator();
-        subplanOp.getInputs().add(new MutableObject<ILogicalOperator>(inputOp));
-        Mutable<ILogicalOperator> nestedSource = new MutableObject<ILogicalOperator>(
-                new NestedTupleSourceOperator(new MutableObject<ILogicalOperator>(subplanOp)));
+        subplanOp.getInputs().add(new MutableObject<>(inputOp));
+        Mutable<ILogicalOperator> nestedSource = new MutableObject<>(
+                new NestedTupleSourceOperator(new MutableObject<>(subplanOp)));
         SelectOperator select = new SelectOperator(selectExpr, false, null);
         // The select operator cannot be moved up and down, otherwise it will cause typing issues (ASTERIXDB-1203).
         OperatorPropertiesUtil.markMovable(select, false);
         select.getInputs().add(nestedSource);
-        Pair<ILogicalOperator, LogicalVariable> pBranch = branchExpression.accept(this,
-                new MutableObject<ILogicalOperator>(select));
+        Pair<ILogicalOperator, LogicalVariable> pBranch = branchExpression.accept(this, new MutableObject<>(select));
         LogicalVariable branchVar = context.newVar();
         AggregateOperator aggOp = new AggregateOperator(Collections.singletonList(branchVar),
-                Collections.singletonList(new MutableObject<ILogicalExpression>(new AggregateFunctionCallExpression(
-                        FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.LISTIFY), false,
-                        Collections.singletonList(new MutableObject<ILogicalExpression>(
-                                new VariableReferenceExpression(pBranch.second)))))));
-        aggOp.getInputs().add(new MutableObject<ILogicalOperator>(pBranch.first));
-        ILogicalPlan planForBranch = new ALogicalPlanImpl(new MutableObject<ILogicalOperator>(aggOp));
+                Collections.singletonList(new MutableObject<>(new AggregateFunctionCallExpression(
+                        FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.LISTIFY), false, Collections.singletonList(
+                                new MutableObject<>(new VariableReferenceExpression(pBranch.second)))))));
+        aggOp.getInputs().add(new MutableObject<>(pBranch.first));
+        ILogicalPlan planForBranch = new ALogicalPlanImpl(new MutableObject<>(aggOp));
         subplanOp.getNestedPlans().add(planForBranch);
         context.exitSubplan();
-        return new Pair<ILogicalOperator, LogicalVariable>(subplanOp, branchVar);
+        return new Pair<>(subplanOp, branchVar);
     }
 }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalIndexingOperations.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalIndexingOperations.java
index 37e888d..d4c3b0e 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalIndexingOperations.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalIndexingOperations.java
@@ -20,7 +20,6 @@
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.Date;
 import java.util.Iterator;
@@ -107,13 +106,9 @@
 
 public class ExternalIndexingOperations {
 
-    public static final List<List<String>> FILE_INDEX_FIELD_NAMES = new ArrayList<List<String>>();
-    public static final ArrayList<IAType> FILE_INDEX_FIELD_TYPES = new ArrayList<IAType>();
-
-    static {
-        FILE_INDEX_FIELD_NAMES.add(new ArrayList<String>(Arrays.asList("")));
-        FILE_INDEX_FIELD_TYPES.add(BuiltinType.ASTRING);
-    }
+    public static final List<List<String>> FILE_INDEX_FIELD_NAMES = Collections
+            .singletonList(Collections.singletonList(""));
+    public static final List<IAType> FILE_INDEX_FIELD_TYPES = Collections.singletonList(BuiltinType.ASTRING);
 
     public static boolean isIndexible(ExternalDatasetDetails ds) {
         String adapter = ds.getAdapter();
@@ -151,7 +146,7 @@
 
     public static ArrayList<ExternalFile> getSnapshotFromExternalFileSystem(Dataset dataset)
             throws AlgebricksException {
-        ArrayList<ExternalFile> files = new ArrayList<ExternalFile>();
+        ArrayList<ExternalFile> files = new ArrayList<>();
         ExternalDatasetDetails datasetDetails = (ExternalDatasetDetails) dataset.getDatasetDetails();
         try {
             // Create the file system object
@@ -259,27 +254,27 @@
      * @param files
      * @param indexerDesc
      * @return
+     * @throws AsterixException
      * @throws Exception
      */
     private static Pair<ExternalDataScanOperatorDescriptor, AlgebricksPartitionConstraint> getExternalDataIndexingOperator(
             JobSpecification jobSpec, IAType itemType, Dataset dataset, List<ExternalFile> files,
-            RecordDescriptor indexerDesc, AqlMetadataProvider metadataProvider) throws Exception {
+            RecordDescriptor indexerDesc) throws AsterixException {
         ExternalDatasetDetails externalDatasetDetails = (ExternalDatasetDetails) dataset.getDatasetDetails();
         Map<String, String> configuration = externalDatasetDetails.getProperties();
         IAdapterFactory adapterFactory = AdapterFactoryProvider.getIndexingAdapterFactory(
                 externalDatasetDetails.getAdapter(), configuration, (ARecordType) itemType, files, true, null);
-        return new Pair<ExternalDataScanOperatorDescriptor, AlgebricksPartitionConstraint>(
-                new ExternalDataScanOperatorDescriptor(jobSpec, indexerDesc, adapterFactory),
+        return new Pair<>(new ExternalDataScanOperatorDescriptor(jobSpec, indexerDesc, adapterFactory),
                 adapterFactory.getPartitionConstraint());
     }
 
     public static Pair<ExternalDataScanOperatorDescriptor, AlgebricksPartitionConstraint> createExternalIndexingOp(
             JobSpecification spec, AqlMetadataProvider metadataProvider, Dataset dataset, ARecordType itemType,
-            RecordDescriptor indexerDesc, List<ExternalFile> files) throws Exception {
+            RecordDescriptor indexerDesc, List<ExternalFile> files) throws AsterixException {
         if (files == null) {
             files = MetadataManager.INSTANCE.getDatasetExternalFiles(metadataProvider.getMetadataTxnContext(), dataset);
         }
-        return getExternalDataIndexingOperator(spec, itemType, dataset, files, indexerDesc, metadataProvider);
+        return getExternalDataIndexingOperator(spec, itemType, dataset, files, indexerDesc);
     }
 
     /**
@@ -431,7 +426,7 @@
     public static JobSpecification buildFilesIndexUpdateOp(Dataset ds, List<ExternalFile> metadataFiles,
             List<ExternalFile> deletedFiles, List<ExternalFile> addedFiles, List<ExternalFile> appendedFiles,
             AqlMetadataProvider metadataProvider) throws MetadataException, AlgebricksException {
-        ArrayList<ExternalFile> files = new ArrayList<ExternalFile>();
+        ArrayList<ExternalFile> files = new ArrayList<>();
         for (ExternalFile file : metadataFiles) {
             if (file.getPendingOp() == ExternalFilePendingOp.PENDING_DROP_OP) {
                 files.add(file);
@@ -456,7 +451,7 @@
             List<ExternalFile> deletedFiles, List<ExternalFile> addedFiles, List<ExternalFile> appendedFiles,
             AqlMetadataProvider metadataProvider) throws AsterixException, AlgebricksException {
         // Create files list
-        ArrayList<ExternalFile> files = new ArrayList<ExternalFile>();
+        ArrayList<ExternalFile> files = new ArrayList<>();
 
         for (ExternalFile metadataFile : metadataFiles) {
             if (metadataFile.getPendingOp() != ExternalFilePendingOp.PENDING_APPEND_OP) {
@@ -478,7 +473,7 @@
         CompiledCreateIndexStatement ccis = new CompiledCreateIndexStatement(index.getIndexName(),
                 index.getDataverseName(), index.getDatasetName(), index.getKeyFieldNames(), index.getKeyFieldTypes(),
                 index.isEnforcingKeyFileds(), index.getGramLength(), index.getIndexType());
-        return IndexOperations.buildSecondaryIndexLoadingJobSpec(ccis, null, null, metadataProvider, files);
+        return IndexOperations.buildSecondaryIndexLoadingJobSpec(ccis, null, null, null, null, metadataProvider, files);
     }
 
     public static JobSpecification buildCommitJob(Dataset ds, List<Index> indexes, AqlMetadataProvider metadataProvider)
@@ -500,10 +495,10 @@
         IndexInfoOperatorDescriptor filesIndexInfo = new IndexInfoOperatorDescriptor(filesIndexSplitProvider,
                 AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER);
 
-        ArrayList<ExternalBTreeWithBuddyDataflowHelperFactory> btreeDataflowHelperFactories = new ArrayList<ExternalBTreeWithBuddyDataflowHelperFactory>();
-        ArrayList<IndexInfoOperatorDescriptor> btreeInfos = new ArrayList<IndexInfoOperatorDescriptor>();
-        ArrayList<ExternalRTreeDataflowHelperFactory> rtreeDataflowHelperFactories = new ArrayList<ExternalRTreeDataflowHelperFactory>();
-        ArrayList<IndexInfoOperatorDescriptor> rtreeInfos = new ArrayList<IndexInfoOperatorDescriptor>();
+        ArrayList<ExternalBTreeWithBuddyDataflowHelperFactory> btreeDataflowHelperFactories = new ArrayList<>();
+        ArrayList<IndexInfoOperatorDescriptor> btreeInfos = new ArrayList<>();
+        ArrayList<ExternalRTreeDataflowHelperFactory> rtreeDataflowHelperFactories = new ArrayList<>();
+        ArrayList<IndexInfoOperatorDescriptor> rtreeInfos = new ArrayList<>();
 
         for (Index index : indexes) {
             if (isValidIndexName(index.getDatasetName(), index.getIndexName())) {
@@ -634,10 +629,10 @@
         IndexInfoOperatorDescriptor filesIndexInfo = new IndexInfoOperatorDescriptor(filesIndexSplitProvider,
                 AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER);
 
-        ArrayList<ExternalBTreeWithBuddyDataflowHelperFactory> btreeDataflowHelperFactories = new ArrayList<ExternalBTreeWithBuddyDataflowHelperFactory>();
-        ArrayList<IndexInfoOperatorDescriptor> btreeInfos = new ArrayList<IndexInfoOperatorDescriptor>();
-        ArrayList<ExternalRTreeDataflowHelperFactory> rtreeDataflowHelperFactories = new ArrayList<ExternalRTreeDataflowHelperFactory>();
-        ArrayList<IndexInfoOperatorDescriptor> rtreeInfos = new ArrayList<IndexInfoOperatorDescriptor>();
+        ArrayList<ExternalBTreeWithBuddyDataflowHelperFactory> btreeDataflowHelperFactories = new ArrayList<>();
+        ArrayList<IndexInfoOperatorDescriptor> btreeInfos = new ArrayList<>();
+        ArrayList<ExternalRTreeDataflowHelperFactory> rtreeDataflowHelperFactories = new ArrayList<>();
+        ArrayList<IndexInfoOperatorDescriptor> rtreeInfos = new ArrayList<>();
 
         for (Index index : indexes) {
             if (isValidIndexName(index.getDatasetName(), index.getIndexName())) {
@@ -692,10 +687,10 @@
         IndexInfoOperatorDescriptor filesIndexInfo = new IndexInfoOperatorDescriptor(filesIndexSplitProvider,
                 AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER);
 
-        ArrayList<ExternalBTreeWithBuddyDataflowHelperFactory> btreeDataflowHelperFactories = new ArrayList<ExternalBTreeWithBuddyDataflowHelperFactory>();
-        ArrayList<IndexInfoOperatorDescriptor> btreeInfos = new ArrayList<IndexInfoOperatorDescriptor>();
-        ArrayList<ExternalRTreeDataflowHelperFactory> rtreeDataflowHelperFactories = new ArrayList<ExternalRTreeDataflowHelperFactory>();
-        ArrayList<IndexInfoOperatorDescriptor> rtreeInfos = new ArrayList<IndexInfoOperatorDescriptor>();
+        ArrayList<ExternalBTreeWithBuddyDataflowHelperFactory> btreeDataflowHelperFactories = new ArrayList<>();
+        ArrayList<IndexInfoOperatorDescriptor> btreeInfos = new ArrayList<>();
+        ArrayList<ExternalRTreeDataflowHelperFactory> rtreeDataflowHelperFactories = new ArrayList<>();
+        ArrayList<IndexInfoOperatorDescriptor> rtreeInfos = new ArrayList<>();
 
         for (Index index : indexes) {
             if (isValidIndexName(index.getDatasetName(), index.getIndexName())) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
index 3d91c7d..7656006 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
@@ -861,7 +861,7 @@
                         if (subType.isOpen()) {
                             isOpen = true;
                             break;
-                        };
+                        } ;
                     }
                 }
                 if (fieldExpr.second == null) {
@@ -875,6 +875,9 @@
                         throw new AlgebricksException("Typed index on \"" + fieldExpr.first
                                 + "\" field could be created only for open datatype");
                     }
+                    if (stmtCreateIndex.hasMetaField()) {
+                        throw new AlgebricksException("Typed open index can only be created on the record part");
+                    }
                     Map<TypeSignature, IAType> typeMap = TypeTranslator.computeTypes(mdTxnCtx, fieldExpr.second,
                             indexName, dataverseName);
                     TypeSignature typeSignature = new TypeSignature(dataverseName, indexName);
@@ -1013,7 +1016,8 @@
             CompiledCreateIndexStatement cis = new CompiledCreateIndexStatement(index.getIndexName(), dataverseName,
                     index.getDatasetName(), index.getKeyFieldNames(), index.getKeyFieldTypes(),
                     index.isEnforcingKeyFileds(), index.getGramLength(), index.getIndexType());
-            spec = IndexOperations.buildSecondaryIndexCreationJobSpec(cis, aRecordType, enforcedType, metadataProvider);
+            spec = IndexOperations.buildSecondaryIndexCreationJobSpec(cis, aRecordType, metaRecordType,
+                    keySourceIndicators, enforcedType, metadataProvider);
             if (spec == null) {
                 throw new AsterixException("Failed to create job spec for creating index '"
                         + stmtCreateIndex.getDatasetName() + "." + stmtCreateIndex.getIndexName() + "'");
@@ -1034,7 +1038,9 @@
             cis = new CompiledCreateIndexStatement(index.getIndexName(), dataverseName, index.getDatasetName(),
                     index.getKeyFieldNames(), index.getKeyFieldTypes(), index.isEnforcingKeyFileds(),
                     index.getGramLength(), index.getIndexType());
-            spec = IndexOperations.buildSecondaryIndexLoadingJobSpec(cis, aRecordType, enforcedType, metadataProvider);
+
+            spec = IndexOperations.buildSecondaryIndexLoadingJobSpec(cis, aRecordType, metaRecordType,
+                    keySourceIndicators, enforcedType, metadataProvider);
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
 
@@ -1949,7 +1955,7 @@
 
     private JobSpecification rewriteCompileQuery(AqlMetadataProvider metadataProvider, Query query,
             ICompiledDmlStatement stmt)
-                    throws AsterixException, RemoteException, AlgebricksException, JSONException, ACIDException {
+            throws AsterixException, RemoteException, AlgebricksException, JSONException, ACIDException {
 
         // Query Rewriting (happens under the same ongoing metadata transaction)
         Pair<Query, Integer> reWrittenQuery = apiFramework.reWriteQuery(declaredFunctions, metadataProvider, query,
@@ -2259,7 +2265,7 @@
      */
     private Triple<FeedConnectionRequest, Boolean, List<IFeedJoint>> getFeedConnectionRequest(String dataverse,
             Feed feed, String dataset, FeedPolicyEntity feedPolicy, MetadataTransactionContext mdTxnCtx)
-                    throws MetadataException {
+            throws MetadataException {
         IFeedJoint sourceFeedJoint = null;
         FeedConnectionRequest request = null;
         List<String> functionsToApply = new ArrayList<String>();
@@ -2445,7 +2451,6 @@
         boolean bActiveTxn = true;
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
         MetadataLockManager.INSTANCE.compactBegin(dataverseName, dataverseName + "." + datasetName);
-
         List<JobSpecification> jobsToExecute = new ArrayList<JobSpecification>();
         try {
             Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
@@ -2453,24 +2458,27 @@
                 throw new AlgebricksException(
                         "There is no dataset with this name " + datasetName + " in dataverse " + dataverseName + ".");
             }
-
             String itemTypeName = ds.getItemTypeName();
             Datatype dt = MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(),
                     ds.getItemTypeDataverseName(), itemTypeName);
-
+            ARecordType metaRecordType = null;
+            if (ds.hasMetaPart()) {
+                metaRecordType = (ARecordType) MetadataManager.INSTANCE
+                        .getDatatype(metadataProvider.getMetadataTxnContext(), ds.getMetaItemTypeDataverseName(),
+                                ds.getMetaItemTypeName())
+                        .getDatatype();
+            }
             // Prepare jobs to compact the datatset and its indexes
             List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName);
             if (indexes.size() == 0) {
                 throw new AlgebricksException(
                         "Cannot compact the extrenal dataset " + datasetName + " because it has no indexes");
             }
-
             Dataverse dataverse = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(),
                     dataverseName);
             jobsToExecute.add(DatasetOperations.compactDatasetJobSpec(dataverse, datasetName, metadataProvider));
             ARecordType aRecordType = (ARecordType) dt.getDatatype();
             ARecordType enforcedType = IntroduceSecondaryIndexInsertDeleteRule.createEnforcedType(aRecordType, indexes);
-
             if (ds.getDatasetType() == DatasetType.INTERNAL) {
                 for (int j = 0; j < indexes.size(); j++) {
                     if (indexes.get(j).isSecondaryIndex()) {
@@ -2479,18 +2487,8 @@
                     }
                 }
             } else {
-                for (int j = 0; j < indexes.size(); j++) {
-                    if (!ExternalIndexingOperations.isFileIndex(indexes.get(j))) {
-                        CompiledIndexCompactStatement cics = new CompiledIndexCompactStatement(dataverseName,
-                                datasetName, indexes.get(j).getIndexName(), indexes.get(j).getKeyFieldNames(),
-                                indexes.get(j).getKeyFieldTypes(), indexes.get(j).isEnforcingKeyFileds(),
-                                indexes.get(j).getGramLength(), indexes.get(j).getIndexType());
-                        jobsToExecute.add(IndexOperations.buildSecondaryIndexCompactJobSpec(cics, aRecordType,
-                                enforcedType, metadataProvider, ds));
-                    }
-
-                }
-                jobsToExecute.add(ExternalIndexingOperations.compactFilesIndexJobSpec(ds, metadataProvider));
+                prepareCompactJobsForExternalDataset(indexes, dataverseName, datasetName, ds, jobsToExecute,
+                        aRecordType, metaRecordType, metadataProvider, enforcedType);
             }
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
@@ -2509,6 +2507,28 @@
         }
     }
 
+    private void prepareCompactJobsForExternalDataset(List<Index> indexes, String dataverseName, String datasetName,
+            Dataset ds, List<JobSpecification> jobsToExecute, ARecordType aRecordType, ARecordType metaRecordType,
+            AqlMetadataProvider metadataProvider, ARecordType enforcedType)
+            throws MetadataException, AlgebricksException {
+        for (int j = 0; j < indexes.size(); j++) {
+            if (!ExternalIndexingOperations.isFileIndex(indexes.get(j))) {
+                CompiledIndexCompactStatement cics = new CompiledIndexCompactStatement(dataverseName, datasetName,
+                        indexes.get(j).getIndexName(), indexes.get(j).getKeyFieldNames(),
+                        indexes.get(j).getKeyFieldTypes(), indexes.get(j).isEnforcingKeyFileds(),
+                        indexes.get(j).getGramLength(), indexes.get(j).getIndexType());
+                List<Integer> keySourceIndicators = null;
+                if (ds.hasMetaPart()) {
+                    keySourceIndicators = indexes.get(j).getKeyFieldSourceIndicators();
+                }
+                jobsToExecute.add(IndexOperations.buildSecondaryIndexCompactJobSpec(cics, aRecordType, metaRecordType,
+                        keySourceIndicators, enforcedType, metadataProvider));
+            }
+
+        }
+        jobsToExecute.add(ExternalIndexingOperations.compactFilesIndexJobSpec(ds, metadataProvider));
+    }
+
     private void handleQuery(AqlMetadataProvider metadataProvider, Query query, IHyracksClientConnection hcc,
             IHyracksDataset hdc, ResultDelivery resultDelivery, ResultUtils.Stats stats) throws Exception {
 
@@ -2919,7 +2939,7 @@
     private void prepareRunExternalRuntime(AqlMetadataProvider metadataProvider, IHyracksClientConnection hcc,
             RunStatement pregelixStmt, String dataverseNameFrom, String dataverseNameTo, String datasetNameFrom,
             String datasetNameTo, MetadataTransactionContext mdTxnCtx)
-                    throws AlgebricksException, AsterixException, Exception {
+            throws AlgebricksException, AsterixException, Exception {
         // Validates the source/sink dataverses and datasets.
         Dataset fromDataset = metadataProvider.findDataset(dataverseNameFrom, datasetNameFrom);
         if (fromDataset == null) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/DatasetOperations.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/DatasetOperations.java
index 9052696..46b9c35 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/DatasetOperations.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/DatasetOperations.java
@@ -159,7 +159,7 @@
         }
         JobSpecification spec = JobSpecificationUtils.createJobSpecification();
         IBinaryComparatorFactory[] comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(dataset,
-                itemType, format.getBinaryComparatorFactoryProvider());
+                itemType, metaItemType, format.getBinaryComparatorFactoryProvider());
         ITypeTraits[] typeTraits = DatasetUtils.computeTupleTypeTraits(dataset, itemType, metaItemType);
         int[] bloomFilterKeyFields = DatasetUtils.createBloomFilterKeyFields(dataset);
 
@@ -227,7 +227,7 @@
         ARecordType metaItemType = DatasetUtils.getMetaType(metadata, dataset);
         JobSpecification spec = JobSpecificationUtils.createJobSpecification();
         IBinaryComparatorFactory[] comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(dataset,
-                itemType, format.getBinaryComparatorFactoryProvider());
+                itemType, metaItemType, format.getBinaryComparatorFactoryProvider());
         ITypeTraits[] typeTraits = DatasetUtils.computeTupleTypeTraits(dataset, itemType, metaItemType);
         int[] blooFilterKeyFields = DatasetUtils.createBloomFilterKeyFields(dataset);
         ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, itemType);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/IndexOperations.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/IndexOperations.java
index 4cb8a0f..6ed8db9 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/IndexOperations.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/IndexOperations.java
@@ -51,42 +51,43 @@
 
 public class IndexOperations {
 
-    private static final PhysicalOptimizationConfig physicalOptimizationConfig = OptimizationConfUtil
-            .getPhysicalOptimizationConfig();
+    private static final PhysicalOptimizationConfig physicalOptimizationConfig =
+            OptimizationConfUtil.getPhysicalOptimizationConfig();
 
     public static JobSpecification buildSecondaryIndexCreationJobSpec(CompiledCreateIndexStatement createIndexStmt,
-            ARecordType recType, ARecordType enforcedType, AqlMetadataProvider metadataProvider)
-            throws AsterixException, AlgebricksException {
+            ARecordType recType, ARecordType metaType, List<Integer> keySourceIndicators, ARecordType enforcedType,
+            AqlMetadataProvider metadataProvider) throws AsterixException, AlgebricksException {
         SecondaryIndexOperationsHelper secondaryIndexHelper = SecondaryIndexOperationsHelper
                 .createIndexOperationsHelper(createIndexStmt.getIndexType(), createIndexStmt.getDataverseName(),
                         createIndexStmt.getDatasetName(), createIndexStmt.getIndexName(),
                         createIndexStmt.getKeyFields(), createIndexStmt.getKeyFieldTypes(),
                         createIndexStmt.isEnforced(), createIndexStmt.getGramLength(), metadataProvider,
-                        physicalOptimizationConfig, recType, enforcedType);
+                        physicalOptimizationConfig, recType, metaType, keySourceIndicators, enforcedType);
         return secondaryIndexHelper.buildCreationJobSpec();
     }
 
     public static JobSpecification buildSecondaryIndexLoadingJobSpec(CompiledCreateIndexStatement createIndexStmt,
-            ARecordType recType, ARecordType enforcedType, AqlMetadataProvider metadataProvider)
+            ARecordType recType, ARecordType metaType, List<Integer> keySourceIndicators, ARecordType enforcedType,
+            AqlMetadataProvider metadataProvider) throws AsterixException, AlgebricksException {
+        SecondaryIndexOperationsHelper secondaryIndexHelper = SecondaryIndexOperationsHelper
+                .createIndexOperationsHelper(createIndexStmt.getIndexType(), createIndexStmt.getDataverseName(),
+                        createIndexStmt.getDatasetName(), createIndexStmt.getIndexName(),
+                        createIndexStmt.getKeyFields(), createIndexStmt.getKeyFieldTypes(),
+                        createIndexStmt.isEnforced(), createIndexStmt.getGramLength(), metadataProvider,
+                        physicalOptimizationConfig, recType, metaType, keySourceIndicators, enforcedType);
+        return secondaryIndexHelper.buildLoadingJobSpec();
+    }
+
+    public static JobSpecification buildSecondaryIndexLoadingJobSpec(CompiledCreateIndexStatement createIndexStmt,
+            ARecordType recType, ARecordType metaType, List<Integer> keySourceIndicators, ARecordType enforcedType,
+            AqlMetadataProvider metadataProvider, List<ExternalFile> files)
             throws AsterixException, AlgebricksException {
         SecondaryIndexOperationsHelper secondaryIndexHelper = SecondaryIndexOperationsHelper
                 .createIndexOperationsHelper(createIndexStmt.getIndexType(), createIndexStmt.getDataverseName(),
                         createIndexStmt.getDatasetName(), createIndexStmt.getIndexName(),
                         createIndexStmt.getKeyFields(), createIndexStmt.getKeyFieldTypes(),
                         createIndexStmt.isEnforced(), createIndexStmt.getGramLength(), metadataProvider,
-                        physicalOptimizationConfig, recType, enforcedType);
-        return secondaryIndexHelper.buildLoadingJobSpec();
-    }
-
-    public static JobSpecification buildSecondaryIndexLoadingJobSpec(CompiledCreateIndexStatement createIndexStmt,
-            ARecordType recType, ARecordType enforcedType, AqlMetadataProvider metadataProvider,
-            List<ExternalFile> files) throws AsterixException, AlgebricksException {
-        SecondaryIndexOperationsHelper secondaryIndexHelper = SecondaryIndexOperationsHelper
-                .createIndexOperationsHelper(createIndexStmt.getIndexType(), createIndexStmt.getDataverseName(),
-                        createIndexStmt.getDatasetName(), createIndexStmt.getIndexName(),
-                        createIndexStmt.getKeyFields(), createIndexStmt.getKeyFieldTypes(),
-                        createIndexStmt.isEnforced(), createIndexStmt.getGramLength(), metadataProvider,
-                        physicalOptimizationConfig, recType, enforcedType);
+                        physicalOptimizationConfig, recType, metaType, keySourceIndicators, enforcedType);
         secondaryIndexHelper.setExternalFiles(files);
         return secondaryIndexHelper.buildLoadingJobSpec();
     }
@@ -103,33 +104,34 @@
         Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadataProvider
                 .splitProviderAndPartitionConstraintsForDataset(dataverseName, datasetName, indexName, temp);
         AsterixStorageProperties storageProperties = AsterixAppContextInfo.getInstance().getStorageProperties();
-        Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(dataset,
-                metadataProvider.getMetadataTxnContext());
+        Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
+                DatasetUtils.getMergePolicyFactory(dataset, metadataProvider.getMetadataTxnContext());
 
         // The index drop operation should be persistent regardless of temp datasets or permanent dataset.
         IndexDropOperatorDescriptor btreeDrop = new IndexDropOperatorDescriptor(spec,
                 AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                splitsAndConstraint.first, new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(
-                        dataset.getDatasetId()), compactionInfo.first, compactionInfo.second,
+                splitsAndConstraint.first,
+                new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
+                        compactionInfo.first, compactionInfo.second,
                         new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
                         AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
                         storageProperties.getBloomFilterFalsePositiveRate(), false, null, null, null, null, !temp));
-        AlgebricksPartitionConstraintHelper
-                .setPartitionConstraintInJobSpec(spec, btreeDrop, splitsAndConstraint.second);
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, btreeDrop,
+                splitsAndConstraint.second);
         spec.addRoot(btreeDrop);
 
         return spec;
     }
 
     public static JobSpecification buildSecondaryIndexCompactJobSpec(CompiledIndexCompactStatement indexCompactStmt,
-            ARecordType recType, ARecordType enforcedType, AqlMetadataProvider metadataProvider, Dataset dataset)
-            throws AsterixException, AlgebricksException {
+            ARecordType recType, ARecordType metaType, List<Integer> keySourceIndicators, ARecordType enforcedType,
+            AqlMetadataProvider metadataProvider) throws AsterixException, AlgebricksException {
         SecondaryIndexOperationsHelper secondaryIndexHelper = SecondaryIndexOperationsHelper
                 .createIndexOperationsHelper(indexCompactStmt.getIndexType(), indexCompactStmt.getDataverseName(),
                         indexCompactStmt.getDatasetName(), indexCompactStmt.getIndexName(),
                         indexCompactStmt.getKeyFields(), indexCompactStmt.getKeyTypes(), indexCompactStmt.isEnforced(),
                         indexCompactStmt.getGramLength(), metadataProvider, physicalOptimizationConfig, recType,
-                        enforcedType);
+                        metaType, keySourceIndicators, enforcedType);
         return secondaryIndexHelper.buildCompactJobSpec();
     }
 }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryBTreeOperationsHelper.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryBTreeOperationsHelper.java
index c793f59..15ea2f8 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryBTreeOperationsHelper.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryBTreeOperationsHelper.java
@@ -35,6 +35,7 @@
 import org.apache.asterix.metadata.declared.AqlMetadataProvider;
 import org.apache.asterix.metadata.entities.Index;
 import org.apache.asterix.metadata.utils.ExternalDatasetsRegistry;
+import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexOperationTrackerProvider;
 import org.apache.asterix.transaction.management.resource.ExternalBTreeWithBuddyLocalResourceMetadata;
@@ -146,7 +147,7 @@
             // Assign op.
             AbstractOperatorDescriptor sourceOp = primaryScanOp;
             if (isEnforcingKeyTypes) {
-                sourceOp = createCastOp(spec, primaryScanOp, numSecondaryKeys, dataset.getDatasetType());
+                sourceOp = createCastOp(spec, dataset.getDatasetType());
                 spec.connect(new OneToOneConnectorDescriptor(spec), primaryScanOp, 0, sourceOp, 0);
             }
             AlgebricksMetaOperatorDescriptor asterixAssignOp = createExternalAssignOp(spec, numSecondaryKeys,
@@ -208,7 +209,7 @@
             // Assign op.
             AbstractOperatorDescriptor sourceOp = primaryScanOp;
             if (isEnforcingKeyTypes) {
-                sourceOp = createCastOp(spec, primaryScanOp, numSecondaryKeys, dataset.getDatasetType());
+                sourceOp = createCastOp(spec, dataset.getDatasetType());
                 spec.connect(new OneToOneConnectorDescriptor(spec), primaryScanOp, 0, sourceOp, 0);
             }
             AlgebricksMetaOperatorDescriptor asterixAssignOp = createAssignOp(spec, sourceOp, numSecondaryKeys,
@@ -312,9 +313,11 @@
         secondaryBloomFilterKeyFields = new int[numSecondaryKeys];
         ISerializerDeserializer[] secondaryRecFields = new ISerializerDeserializer[numPrimaryKeys + numSecondaryKeys
                 + numFilterFields];
-        ISerializerDeserializer[] enforcedRecFields = new ISerializerDeserializer[1 + numPrimaryKeys + numFilterFields];
+        ISerializerDeserializer[] enforcedRecFields = new ISerializerDeserializer[1 + numPrimaryKeys
+                + (dataset.hasMetaPart() ? 1 : 0) + numFilterFields];
+        ITypeTraits[] enforcedTypeTraits = new ITypeTraits[1 + numPrimaryKeys + (dataset.hasMetaPart() ? 1 : 0)
+                + numFilterFields];
         secondaryTypeTraits = new ITypeTraits[numSecondaryKeys + numPrimaryKeys];
-        ITypeTraits[] enforcedTypeTraits = new ITypeTraits[1 + numPrimaryKeys];
         ISerializerDeserializerProvider serdeProvider = metadataProvider.getFormat().getSerdeProvider();
         ITypeTraitProvider typeTraitProvider = metadataProvider.getFormat().getTypeTraitProvider();
         IBinaryComparatorFactoryProvider comparatorFactoryProvider = metadataProvider.getFormat()
@@ -322,10 +325,19 @@
         // Record column is 0 for external datasets, numPrimaryKeys for internal ones
         int recordColumn = dataset.getDatasetType() == DatasetType.INTERNAL ? numPrimaryKeys : 0;
         for (int i = 0; i < numSecondaryKeys; i++) {
+            ARecordType sourceType;
+            int sourceColumn;
+            if (keySourceIndicators == null || keySourceIndicators.get(i) == 0) {
+                sourceType = itemType;
+                sourceColumn = recordColumn;
+            } else {
+                sourceType = metaType;
+                sourceColumn = recordColumn + 1;
+            }
             secondaryFieldAccessEvalFactories[i] = metadataProvider.getFormat().getFieldAccessEvaluatorFactory(
-                    isEnforcingKeyTypes ? enforcedItemType : itemType, secondaryKeyFields.get(i), recordColumn);
+                    isEnforcingKeyTypes ? enforcedItemType : sourceType, secondaryKeyFields.get(i), sourceColumn);
             Pair<IAType, Boolean> keyTypePair = Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(i),
-                    secondaryKeyFields.get(i), itemType);
+                    secondaryKeyFields.get(i), sourceType);
             IAType keyType = keyTypePair.first;
             anySecondaryKeyIsNullable = anySecondaryKeyIsNullable || keyTypePair.second;
             ISerializerDeserializer keySerde = serdeProvider.getSerializerDeserializer(keyType);
@@ -354,6 +366,11 @@
             }
         }
         enforcedRecFields[numPrimaryKeys] = serdeProvider.getSerializerDeserializer(itemType);
+        enforcedTypeTraits[numPrimaryKeys] = typeTraitProvider.getTypeTrait(itemType);
+        if (dataset.hasMetaPart()) {
+            enforcedRecFields[numPrimaryKeys + 1] = serdeProvider.getSerializerDeserializer(metaType);
+            enforcedTypeTraits[numPrimaryKeys + 1] = typeTraitProvider.getTypeTrait(metaType);
+        }
 
         if (numFilterFields > 0) {
             secondaryFieldAccessEvalFactories[numSecondaryKeys] = metadataProvider.getFormat()
@@ -362,8 +379,10 @@
             IAType type = keyTypePair.first;
             ISerializerDeserializer serde = serdeProvider.getSerializerDeserializer(type);
             secondaryRecFields[numPrimaryKeys + numSecondaryKeys] = serde;
+            enforcedRecFields[numPrimaryKeys + 1 + (dataset.hasMetaPart() ? 1 : 0)] = serde;
+            enforcedTypeTraits[numPrimaryKeys + 1 + (dataset.hasMetaPart() ? 1 : 0)] = typeTraitProvider
+                    .getTypeTrait(type);
         }
-
         secondaryRecDesc = new RecordDescriptor(secondaryRecFields, secondaryTypeTraits);
         enforcedRecDesc = new RecordDescriptor(enforcedRecFields, enforcedTypeTraits);
 
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryIndexOperationsHelper.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryIndexOperationsHelper.java
index 65afe3c..e677f54 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryIndexOperationsHelper.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryIndexOperationsHelper.java
@@ -47,6 +47,7 @@
 import org.apache.asterix.metadata.MetadataException;
 import org.apache.asterix.metadata.declared.AqlMetadataProvider;
 import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.InternalDatasetDetails;
 import org.apache.asterix.metadata.utils.DatasetUtils;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.IAType;
@@ -107,6 +108,9 @@
     protected String datasetName;
     protected Dataset dataset;
     protected ARecordType itemType;
+    protected ARecordType metaType;
+    protected List<Integer> keySourceIndicators;
+    protected ISerializerDeserializer metaSerde;
     protected ISerializerDeserializer payloadSerde;
     protected IFileSplitProvider primaryFileSplitProvider;
     protected AlgebricksPartitionConstraint primaryPartitionConstraint;
@@ -131,7 +135,7 @@
     protected Map<String, String> mergePolicyFactoryProperties;
     protected RecordDescriptor enforcedRecDesc;
     protected ARecordType enforcedItemType;
-
+    protected ARecordType enforcedMetaType;
     protected int numFilterFields;
     protected List<String> filterFieldName;
     protected ITypeTraits[] filterTypeTraits;
@@ -152,8 +156,8 @@
     public static SecondaryIndexOperationsHelper createIndexOperationsHelper(IndexType indexType, String dataverseName,
             String datasetName, String indexName, List<List<String>> secondaryKeyFields, List<IAType> secondaryKeyTypes,
             boolean isEnforced, int gramLength, AqlMetadataProvider metadataProvider,
-            PhysicalOptimizationConfig physOptConf, ARecordType recType, ARecordType enforcedType)
-            throws AsterixException, AlgebricksException {
+            PhysicalOptimizationConfig physOptConf, ARecordType recType, ARecordType metaType,
+            List<Integer> keySourceIndicators, ARecordType enforcedType) throws AsterixException, AlgebricksException {
         IAsterixPropertiesProvider asterixPropertiesProvider = AsterixAppContextInfo.getInstance();
         SecondaryIndexOperationsHelper indexOperationsHelper = null;
         switch (indexType) {
@@ -178,7 +182,8 @@
             }
         }
         indexOperationsHelper.init(indexType, dataverseName, datasetName, indexName, secondaryKeyFields,
-                secondaryKeyTypes, isEnforced, gramLength, metadataProvider, recType, enforcedType);
+                secondaryKeyTypes, isEnforced, gramLength, metadataProvider, recType, metaType, keySourceIndicators,
+                enforcedType);
         return indexOperationsHelper;
     }
 
@@ -190,7 +195,8 @@
 
     protected void init(IndexType indexType, String dvn, String dsn, String in, List<List<String>> secondaryKeyFields,
             List<IAType> secondaryKeyTypes, boolean isEnforced, int gramLength, AqlMetadataProvider metadataProvider,
-            ARecordType aRecType, ARecordType enforcedType) throws AsterixException, AlgebricksException {
+            ARecordType aRecType, ARecordType metaType, List<Integer> keySourceIndicators, ARecordType enforcedType)
+            throws AsterixException, AlgebricksException {
         this.metadataProvider = metadataProvider;
         dataverseName = dvn == null ? metadataProvider.getDefaultDataverseName() : dvn;
         datasetName = dsn;
@@ -202,8 +208,12 @@
         }
         boolean temp = dataset.getDatasetDetails().isTemp();
         itemType = aRecType;
+        this.metaType = metaType;
+        this.keySourceIndicators = keySourceIndicators;
         enforcedItemType = enforcedType;
         payloadSerde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(itemType);
+        metaSerde = metaType == null ? null
+                : AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(metaType);
         numSecondaryKeys = secondaryKeyFields.size();
         Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint = metadataProvider
                 .splitProviderAndPartitionConstraintsForDataset(dataverseName, datasetName, secondaryIndexName, temp);
@@ -266,13 +276,20 @@
     protected void setPrimaryRecDescAndComparators() throws AlgebricksException {
         List<List<String>> partitioningKeys = DatasetUtils.getPartitioningKeys(dataset);
         int numPrimaryKeys = partitioningKeys.size();
-        ISerializerDeserializer[] primaryRecFields = new ISerializerDeserializer[numPrimaryKeys + 1];
-        ITypeTraits[] primaryTypeTraits = new ITypeTraits[numPrimaryKeys + 1];
+        ISerializerDeserializer[] primaryRecFields = new ISerializerDeserializer[numPrimaryKeys + 1
+                + (dataset.hasMetaPart() ? 1 : 0)];
+        ITypeTraits[] primaryTypeTraits = new ITypeTraits[numPrimaryKeys + 1 + (dataset.hasMetaPart() ? 1 : 0)];
         primaryComparatorFactories = new IBinaryComparatorFactory[numPrimaryKeys];
         primaryBloomFilterKeyFields = new int[numPrimaryKeys];
         ISerializerDeserializerProvider serdeProvider = metadataProvider.getFormat().getSerdeProvider();
+        List<Integer> indicators = null;
+        if (dataset.hasMetaPart()) {
+            indicators = ((InternalDatasetDetails) dataset.getDatasetDetails()).getKeySourceIndicator();
+        }
         for (int i = 0; i < numPrimaryKeys; i++) {
-            IAType keyType = itemType.getSubFieldType(partitioningKeys.get(i));
+            IAType keyType = (indicators == null || indicators.get(i) == 0)
+                    ? itemType.getSubFieldType(partitioningKeys.get(i))
+                    : metaType.getSubFieldType(partitioningKeys.get(i));
             primaryRecFields[i] = serdeProvider.getSerializerDeserializer(keyType);
             primaryComparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE
                     .getBinaryComparatorFactory(keyType, true);
@@ -281,6 +298,10 @@
         }
         primaryRecFields[numPrimaryKeys] = payloadSerde;
         primaryTypeTraits[numPrimaryKeys] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(itemType);
+        if (dataset.hasMetaPart()) {
+            primaryRecFields[numPrimaryKeys + 1] = payloadSerde;
+            primaryTypeTraits[numPrimaryKeys + 1] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(itemType);
+        }
         primaryRecDesc = new RecordDescriptor(primaryRecFields, primaryTypeTraits);
     }
 
@@ -376,14 +397,13 @@
         return asterixAssignOp;
     }
 
-    protected AlgebricksMetaOperatorDescriptor createCastOp(JobSpecification spec,
-            AbstractOperatorDescriptor primaryScanOp, int numSecondaryKeyFields, DatasetType dsType) {
+    protected AlgebricksMetaOperatorDescriptor createCastOp(JobSpecification spec, DatasetType dsType) {
         CastRecordDescriptor castFuncDesc = (CastRecordDescriptor) CastRecordDescriptor.FACTORY
                 .createFunctionDescriptor();
         castFuncDesc.reset(enforcedItemType, itemType);
 
         int[] outColumns = new int[1];
-        int[] projectionList = new int[1 + numPrimaryKeys];
+        int[] projectionList = new int[(dataset.hasMetaPart() ? 2 : 1) + numPrimaryKeys];
         int recordIdx;
         //external datascan operator returns a record as the first field, instead of the last in internal case
         if (dsType == DatasetType.EXTERNAL) {
@@ -396,6 +416,9 @@
         for (int i = 0; i <= numPrimaryKeys; i++) {
             projectionList[i] = i;
         }
+        if (dataset.hasMetaPart()) {
+            projectionList[numPrimaryKeys + 1] = numPrimaryKeys + 1;
+        }
         IScalarEvaluatorFactory[] castEvalFact = new IScalarEvaluatorFactory[] {
                 new ColumnAccessEvalFactory(recordIdx) };
         IScalarEvaluatorFactory[] sefs = new IScalarEvaluatorFactory[1];
@@ -440,10 +463,10 @@
         for (int i = 0; i < numSecondaryKeyFields; i++) {
             // Access column i, and apply 'is not null'.
             ColumnAccessEvalFactory columnAccessEvalFactory = new ColumnAccessEvalFactory(i);
-            IScalarEvaluatorFactory isNullEvalFactory = isUnknownDesc
+            IScalarEvaluatorFactory isUnknownEvalFactory = isUnknownDesc
                     .createEvaluatorFactory(new IScalarEvaluatorFactory[] { columnAccessEvalFactory });
             IScalarEvaluatorFactory notEvalFactory = notDesc
-                    .createEvaluatorFactory(new IScalarEvaluatorFactory[] { isNullEvalFactory });
+                    .createEvaluatorFactory(new IScalarEvaluatorFactory[] { isUnknownEvalFactory });
             andArgsEvalFactories[i] = notEvalFactory;
         }
         IScalarEvaluatorFactory selectCond = null;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryInvertedIndexOperationsHelper.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryInvertedIndexOperationsHelper.java
index e072a63..7044205 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryInvertedIndexOperationsHelper.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryInvertedIndexOperationsHelper.java
@@ -43,8 +43,8 @@
 import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
 import org.apache.hyracks.algebricks.data.ISerializerDeserializerProvider;
 import org.apache.hyracks.algebricks.data.ITypeTraitProvider;
-import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
 import org.apache.hyracks.algebricks.runtime.operators.base.SinkRuntimeFactory;
 import org.apache.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -95,8 +95,8 @@
     @Override
     @SuppressWarnings("rawtypes")
     protected void setSecondaryRecDescAndComparators(IndexType indexType, List<List<String>> secondaryKeyFields,
-            List<IAType> secondaryKeyTypes, int gramLength, AqlMetadataProvider metadata) throws AlgebricksException,
-            AsterixException {
+            List<IAType> secondaryKeyTypes, int gramLength, AqlMetadataProvider metadata)
+            throws AlgebricksException, AsterixException {
         // Sanity checks.
         if (numPrimaryKeys > 1) {
             throw new AsterixException("Cannot create inverted index on dataset with composite primary key.");
@@ -250,10 +250,11 @@
 
         AbstractOperatorDescriptor sourceOp = primaryScanOp;
         if (isEnforcingKeyTypes) {
-            sourceOp = createCastOp(spec, primaryScanOp, numSecondaryKeys, dataset.getDatasetType());
+            sourceOp = createCastOp(spec, dataset.getDatasetType());
             spec.connect(new OneToOneConnectorDescriptor(spec), primaryScanOp, 0, sourceOp, 0);
         }
-        AlgebricksMetaOperatorDescriptor asterixAssignOp = createAssignOp(spec, sourceOp, numSecondaryKeys, secondaryRecDesc);
+        AlgebricksMetaOperatorDescriptor asterixAssignOp = createAssignOp(spec, sourceOp, numSecondaryKeys,
+                secondaryRecDesc);
 
         // If any of the secondary fields are nullable, then add a select op
         // that filters nulls.
@@ -266,7 +267,8 @@
         AbstractOperatorDescriptor tokenizerOp = createTokenizerOp(spec);
 
         // Sort by token + primary keys.
-        ExternalSortOperatorDescriptor sortOp = createSortOp(spec, tokenKeyPairComparatorFactories, tokenKeyPairRecDesc);
+        ExternalSortOperatorDescriptor sortOp = createSortOp(spec, tokenKeyPairComparatorFactories,
+                tokenKeyPairRecDesc);
 
         // Create secondary inverted index bulk load op.
         LSMInvertedIndexBulkLoadOperatorDescriptor invIndexBulkLoadOp = createInvertedIndexBulkLoadOp(spec);
@@ -296,8 +298,8 @@
         for (int i = 0; i < primaryKeyFields.length; i++) {
             primaryKeyFields[i] = numSecondaryKeys + i;
         }
-        BinaryTokenizerOperatorDescriptor tokenizerOp = new BinaryTokenizerOperatorDescriptor(spec,
-                tokenKeyPairRecDesc, tokenizerFactory, docField, primaryKeyFields, isPartitioned, false);
+        BinaryTokenizerOperatorDescriptor tokenizerOp = new BinaryTokenizerOperatorDescriptor(spec, tokenKeyPairRecDesc,
+                tokenizerFactory, docField, primaryKeyFields, isPartitioned, false);
         AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, tokenizerOp,
                 primaryPartitionConstraint);
         return tokenizerOp;
@@ -337,18 +339,18 @@
         AsterixStorageProperties storageProperties = propertiesProvider.getStorageProperties();
         boolean temp = dataset.getDatasetDetails().isTemp();
         if (!isPartitioned) {
-            return new LSMInvertedIndexDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(
-                    dataset.getDatasetId()), mergePolicyFactory, mergePolicyFactoryProperties,
-                    new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+            return new LSMInvertedIndexDataflowHelperFactory(
+                    new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), mergePolicyFactory,
+                    mergePolicyFactoryProperties, new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
                     AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
                     LSMInvertedIndexIOOperationCallbackFactory.INSTANCE,
                     storageProperties.getBloomFilterFalsePositiveRate(), invertedIndexFields, filterTypeTraits,
                     filterCmpFactories, secondaryFilterFields, secondaryFilterFieldsForNonBulkLoadOps,
                     invertedIndexFieldsForNonBulkLoadOps, !temp);
         } else {
-            return new PartitionedLSMInvertedIndexDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(
-                    dataset.getDatasetId()), mergePolicyFactory, mergePolicyFactoryProperties,
-                    new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+            return new PartitionedLSMInvertedIndexDataflowHelperFactory(
+                    new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), mergePolicyFactory,
+                    mergePolicyFactoryProperties, new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
                     AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
                     LSMInvertedIndexIOOperationCallbackFactory.INSTANCE,
                     storageProperties.getBloomFilterFalsePositiveRate(), invertedIndexFields, filterTypeTraits,
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryRTreeOperationsHelper.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryRTreeOperationsHelper.java
index d488d69..7c500cf 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryRTreeOperationsHelper.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryRTreeOperationsHelper.java
@@ -296,7 +296,7 @@
             // Assign op.
             AbstractOperatorDescriptor sourceOp = primaryScanOp;
             if (isEnforcingKeyTypes) {
-                sourceOp = createCastOp(spec, primaryScanOp, numSecondaryKeys, dataset.getDatasetType());
+                sourceOp = createCastOp(spec, dataset.getDatasetType());
                 spec.connect(new OneToOneConnectorDescriptor(spec), primaryScanOp, 0, sourceOp, 0);
             }
             AlgebricksMetaOperatorDescriptor asterixAssignOp = createAssignOp(spec, sourceOp,
@@ -354,7 +354,7 @@
             ExternalDataScanOperatorDescriptor primaryScanOp = createExternalIndexingOp(spec);
             AbstractOperatorDescriptor sourceOp = primaryScanOp;
             if (isEnforcingKeyTypes) {
-                sourceOp = createCastOp(spec, primaryScanOp, numSecondaryKeys, dataset.getDatasetType());
+                sourceOp = createCastOp(spec, dataset.getDatasetType());
                 spec.connect(new OneToOneConnectorDescriptor(spec), primaryScanOp, 0, sourceOp, 0);
             }
             // Assign op.
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/queries/meta/secondary_index-lojoin_with_meta-1.aql b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/meta/secondary_index-lojoin_with_meta-1.aql
new file mode 100644
index 0000000..788db55
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/meta/secondary_index-lojoin_with_meta-1.aql
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+create type AuxiliaryType as open {
+  id:int32,
+  num:int32
+}
+
+create type LineType as open {
+}
+
+create dataset Book(LineType) with meta(AuxiliaryType)
+primary key meta().id;
+
+create index NumIndex1 on Book(meta().num);
+create index NumIndex2 on Book(linenum:int32) enforced;
+create index NumIndex3 on Book(count1:int32) enforced;
+create index NumIndex4 on Book(count2:int32) enforced;
+
+for $t1 in dataset Book
+where $t1.linenum < 10
+order by $t1.linenum
+return {
+"linenum1": $t1.linenum,
+"count1":$t1.count1,
+"t2info": for $t2 in dataset Book
+          where $t1.count1 /* +indexnl */= $t2.count2
+          order by $t2.linenum
+          return {"linenum2": $t2.linenum,
+                  "count2":$t2.count2}
+};
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/queries/meta/secondary_index-nljoin_with_meta-1.aql b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/meta/secondary_index-nljoin_with_meta-1.aql
new file mode 100644
index 0000000..447c25c
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/meta/secondary_index-nljoin_with_meta-1.aql
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+create type AuxiliaryType as open {
+  id:int32,
+  num:int32
+}
+
+create type LineType as open {
+}
+
+create dataset Book(LineType) with meta(AuxiliaryType)
+primary key meta().id;
+
+create index NumIndex on Book(lineid:int64) enforced;
+
+for $x in dataset Book
+for $y in dataset Book
+where $x.lineid /*+ indexnl */ = $y.lineid
+return {"authx":$x.author,"authy":$y.author};
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/queries/meta/secondary_index-nljoin_with_meta-2.aql b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/meta/secondary_index-nljoin_with_meta-2.aql
new file mode 100644
index 0000000..1545f33
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/meta/secondary_index-nljoin_with_meta-2.aql
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+create type AuxiliaryType as open {
+  id:int32,
+  num:int32
+}
+
+create type LineType as open {
+}
+
+create type LineTypeWithNum as open {
+lineid:int64
+}
+create dataset Book1(LineType) with meta(AuxiliaryType)
+primary key meta().id;
+
+create dataset Book2(LineTypeWithNum) with meta(AuxiliaryType)
+primary key meta().id;
+
+create index NumIndex on Book1(lineid:int64) enforced;
+
+for $y in dataset Book2
+for $x in dataset Book1
+where $y.lineid  /*+ indexnl */=  $x.lineid
+return {"authx":$x.author,"authy":$y.author};
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/queries/meta/secondary_index-nljoin_with_meta-3.aql b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/meta/secondary_index-nljoin_with_meta-3.aql
new file mode 100644
index 0000000..63e105d
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/meta/secondary_index-nljoin_with_meta-3.aql
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+create type AuxiliaryType as open {
+  id:int32,
+  num:int32
+}
+
+create type LineType as open {
+}
+
+create dataset Book(LineType) with meta(AuxiliaryType)
+primary key meta().id;
+
+create index NumIndex on Book(meta().num);
+
+for $x in dataset Book
+for $y in dataset Book
+where meta($x).num /*+ indexnl */ = meta($y).num
+return {"authx":$x.author,"authy":$y.author};
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/queries/meta/secondary_index-nljoin_with_meta-4.aql b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/meta/secondary_index-nljoin_with_meta-4.aql
new file mode 100644
index 0000000..1fd6869
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/meta/secondary_index-nljoin_with_meta-4.aql
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+create type AuxiliaryType as open {
+  id:int32,
+  num:int32
+}
+
+create type LineType as open {
+}
+
+create dataset Book1(LineType) with meta(AuxiliaryType)
+primary key meta().id;
+
+create dataset Book2(LineType) with meta(AuxiliaryType)
+primary key meta().id;
+
+create index NumIndex on Book1(meta().num);
+
+for $y in dataset Book2
+for $x in dataset Book1
+where meta($y).num /*+ indexnl */= meta($x).num
+return {"authx":$x.author,"authy":$y.author};
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/queries/meta/secondary_index_with_meta-1.aql b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/meta/secondary_index_with_meta-1.aql
new file mode 100644
index 0000000..bc3dc0a
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/meta/secondary_index_with_meta-1.aql
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+create type AuxiliaryType as open {
+  id:int32,
+  num:int32
+}
+
+create type LineType as open {
+}
+
+create dataset Book(LineType) with meta(AuxiliaryType)
+primary key meta().id;
+
+create index NumIndex on Book(meta().num);
+
+for $x in dataset Book
+where meta($x).num >10
+return $x;
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/queries/meta/secondary_index_with_meta-2.aql b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/meta/secondary_index_with_meta-2.aql
new file mode 100644
index 0000000..7f13ac3
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/meta/secondary_index_with_meta-2.aql
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+create type AuxiliaryType as open {
+  id:int32,
+  num:int32
+}
+
+create type LineType as open {
+}
+
+create dataset Book(LineType) with meta(AuxiliaryType)
+primary key meta().id;
+
+create index NumIndex on Book(lineid:int64) enforced;
+
+for $x in dataset Book
+where $x.lineid >10
+return $x;
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/meta/secondary_index-lojoin_with_meta-1.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/meta/secondary_index-lojoin_with_meta-1.plan
new file mode 100644
index 0000000..0ee7293
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/meta/secondary_index-lojoin_with_meta-1.plan
@@ -0,0 +1,47 @@
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    -- STREAM_PROJECT  |PARTITIONED|
+      -- ASSIGN  |PARTITIONED|
+        -- SORT_MERGE_EXCHANGE [$$18(ASC) ]  |PARTITIONED|
+          -- STABLE_SORT [$$18(ASC)]  |PARTITIONED|
+            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+              -- STREAM_PROJECT  |PARTITIONED|
+                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                  -- PRE_CLUSTERED_GROUP_BY[$$19]  |PARTITIONED|
+                          {
+                            -- AGGREGATE  |LOCAL|
+                              -- STREAM_SELECT  |LOCAL|
+                                -- NESTED_TUPLE_SOURCE  |LOCAL|
+                          }
+                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                      -- STABLE_SORT [$$19(ASC), $$25(ASC)]  |PARTITIONED|
+                        -- HASH_PARTITION_EXCHANGE [$$19]  |PARTITIONED|
+                          -- STREAM_SELECT  |PARTITIONED|
+                            -- STREAM_PROJECT  |PARTITIONED|
+                              -- ASSIGN  |PARTITIONED|
+                                -- STREAM_PROJECT  |PARTITIONED|
+                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                    -- BTREE_SEARCH  |PARTITIONED|
+                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                        -- STABLE_SORT [$$35(ASC)]  |PARTITIONED|
+                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                            -- STREAM_PROJECT  |PARTITIONED|
+                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                -- BTREE_SEARCH  |PARTITIONED|
+                                                  -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                                    -- STREAM_PROJECT  |PARTITIONED|
+                                                      -- ASSIGN  |PARTITIONED|
+                                                        -- STREAM_SELECT  |PARTITIONED|
+                                                          -- ASSIGN  |PARTITIONED|
+                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                -- BTREE_SEARCH  |PARTITIONED|
+                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                    -- STABLE_SORT [$$33(ASC)]  |PARTITIONED|
+                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                        -- STREAM_PROJECT  |PARTITIONED|
+                                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                            -- BTREE_SEARCH  |PARTITIONED|
+                                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                -- ASSIGN  |PARTITIONED|
+                                                                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/meta/secondary_index-nljoin_with_meta-1.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/meta/secondary_index-nljoin_with_meta-1.plan
new file mode 100644
index 0000000..d65bb2a
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/meta/secondary_index-nljoin_with_meta-1.plan
@@ -0,0 +1,23 @@
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    -- STREAM_PROJECT  |PARTITIONED|
+      -- ASSIGN  |PARTITIONED|
+        -- STREAM_PROJECT  |PARTITIONED|
+          -- STREAM_SELECT  |PARTITIONED|
+            -- STREAM_PROJECT  |PARTITIONED|
+              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                -- BTREE_SEARCH  |PARTITIONED|
+                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                    -- STABLE_SORT [$$19(ASC)]  |PARTITIONED|
+                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                        -- STREAM_PROJECT  |PARTITIONED|
+                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                            -- BTREE_SEARCH  |PARTITIONED|
+                              -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                -- STREAM_PROJECT  |PARTITIONED|
+                                  -- ASSIGN  |PARTITIONED|
+                                    -- STREAM_PROJECT  |PARTITIONED|
+                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                        -- DATASOURCE_SCAN  |PARTITIONED|
+                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                            -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/meta/secondary_index-nljoin_with_meta-2.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/meta/secondary_index-nljoin_with_meta-2.plan
new file mode 100644
index 0000000..d65bb2a
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/meta/secondary_index-nljoin_with_meta-2.plan
@@ -0,0 +1,23 @@
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    -- STREAM_PROJECT  |PARTITIONED|
+      -- ASSIGN  |PARTITIONED|
+        -- STREAM_PROJECT  |PARTITIONED|
+          -- STREAM_SELECT  |PARTITIONED|
+            -- STREAM_PROJECT  |PARTITIONED|
+              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                -- BTREE_SEARCH  |PARTITIONED|
+                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                    -- STABLE_SORT [$$19(ASC)]  |PARTITIONED|
+                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                        -- STREAM_PROJECT  |PARTITIONED|
+                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                            -- BTREE_SEARCH  |PARTITIONED|
+                              -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                -- STREAM_PROJECT  |PARTITIONED|
+                                  -- ASSIGN  |PARTITIONED|
+                                    -- STREAM_PROJECT  |PARTITIONED|
+                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                        -- DATASOURCE_SCAN  |PARTITIONED|
+                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                            -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/meta/secondary_index-nljoin_with_meta-3.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/meta/secondary_index-nljoin_with_meta-3.plan
new file mode 100644
index 0000000..4d919c5
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/meta/secondary_index-nljoin_with_meta-3.plan
@@ -0,0 +1,23 @@
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    -- STREAM_PROJECT  |PARTITIONED|
+      -- ASSIGN  |PARTITIONED|
+        -- STREAM_PROJECT  |PARTITIONED|
+          -- STREAM_SELECT  |PARTITIONED|
+            -- STREAM_PROJECT  |PARTITIONED|
+              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                -- BTREE_SEARCH  |PARTITIONED|
+                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                    -- STABLE_SORT [$$21(ASC)]  |PARTITIONED|
+                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                        -- STREAM_PROJECT  |PARTITIONED|
+                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                            -- BTREE_SEARCH  |PARTITIONED|
+                              -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                -- STREAM_PROJECT  |PARTITIONED|
+                                  -- ASSIGN  |PARTITIONED|
+                                    -- STREAM_PROJECT  |PARTITIONED|
+                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                        -- DATASOURCE_SCAN  |PARTITIONED|
+                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                            -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/meta/secondary_index-nljoin_with_meta-4.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/meta/secondary_index-nljoin_with_meta-4.plan
new file mode 100644
index 0000000..4d919c5
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/meta/secondary_index-nljoin_with_meta-4.plan
@@ -0,0 +1,23 @@
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    -- STREAM_PROJECT  |PARTITIONED|
+      -- ASSIGN  |PARTITIONED|
+        -- STREAM_PROJECT  |PARTITIONED|
+          -- STREAM_SELECT  |PARTITIONED|
+            -- STREAM_PROJECT  |PARTITIONED|
+              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                -- BTREE_SEARCH  |PARTITIONED|
+                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                    -- STABLE_SORT [$$21(ASC)]  |PARTITIONED|
+                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                        -- STREAM_PROJECT  |PARTITIONED|
+                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                            -- BTREE_SEARCH  |PARTITIONED|
+                              -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                -- STREAM_PROJECT  |PARTITIONED|
+                                  -- ASSIGN  |PARTITIONED|
+                                    -- STREAM_PROJECT  |PARTITIONED|
+                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                        -- DATASOURCE_SCAN  |PARTITIONED|
+                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                            -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/meta/secondary_index_with_meta-1.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/meta/secondary_index_with_meta-1.plan
new file mode 100644
index 0000000..5a3c313
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/meta/secondary_index_with_meta-1.plan
@@ -0,0 +1,16 @@
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    -- STREAM_PROJECT  |PARTITIONED|
+      -- STREAM_SELECT  |PARTITIONED|
+        -- STREAM_PROJECT  |PARTITIONED|
+          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+            -- BTREE_SEARCH  |PARTITIONED|
+              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                -- STABLE_SORT [$$10(ASC)]  |PARTITIONED|
+                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                    -- STREAM_PROJECT  |PARTITIONED|
+                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                        -- BTREE_SEARCH  |PARTITIONED|
+                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                            -- ASSIGN  |PARTITIONED|
+                              -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/meta/secondary_index_with_meta-2.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/meta/secondary_index_with_meta-2.plan
new file mode 100644
index 0000000..d29358f
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/meta/secondary_index_with_meta-2.plan
@@ -0,0 +1,15 @@
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    -- STREAM_SELECT  |PARTITIONED|
+      -- STREAM_PROJECT  |PARTITIONED|
+        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+          -- BTREE_SEARCH  |PARTITIONED|
+            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+              -- STABLE_SORT [$$9(ASC)]  |PARTITIONED|
+                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                  -- STREAM_PROJECT  |PARTITIONED|
+                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                      -- BTREE_SEARCH  |PARTITIONED|
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          -- ASSIGN  |PARTITIONED|
+                            -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-open-index-in-meta/change-feed-with-meta-open-index-in-meta.1.ddl.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-open-index-in-meta/change-feed-with-meta-open-index-in-meta.1.ddl.aql
new file mode 100644
index 0000000..3289999
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-open-index-in-meta/change-feed-with-meta-open-index-in-meta.1.ddl.aql
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Create a dataset with meta part and attempt to
+ * build a secondary index on an open meta field
+ * Expected Res : Failure
+ * Date         : 18th Jun 2016
+ */
+
+drop dataverse KeyVerse if exists;
+create dataverse KeyVerse;
+use dataverse KeyVerse;
+
+create type DocumentType as open{
+};
+
+create type KVMetaType as open{
+"key":string,
+vbucket:int32,
+seq:int64,
+cas:int64,
+expiration:int32,
+flags:int32,
+revSeq:int64,
+lockTime:int32
+};
+
+create dataset KVStore(DocumentType) with meta(KVMetaType)primary key meta()."key";
+
+create feed KVChangeStream using adapter(
+    ("type-name"="DocumentType"),
+    ("meta-type-name"="KVMetaType"),
+    ("reader"="org.apache.asterix.external.input.record.reader.kv.KVTestReaderFactory"),
+    ("parser"="record-with-metadata"),
+    ("format"="dcp"),
+    ("record-format"="json"),
+    ("change-feed"="true"),
+    ("key-indexes"="0"),
+    ("key-indicators"="1"),
+    ("num-of-records"="1000")
+);
+
+create index OpenIndex on KVStore(meta().id:int32) enforced;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-open-index-in-meta/change-feed-with-meta-open-index-in-meta.2.ddl.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-open-index-in-meta/change-feed-with-meta-open-index-in-meta.2.ddl.aql
new file mode 100644
index 0000000..dec8611
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-open-index-in-meta/change-feed-with-meta-open-index-in-meta.2.ddl.aql
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Create a dataset with meta part and attempt to
+ * build a secondary index on an open meta field
+ * Expected Res : Failure
+ * Date         : 18th Jun 2016
+ */
+
+drop dataverse KeyVerse;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-index-after-ingest/change-feed-with-meta-pk-in-meta-index-after-ingest.1.ddl.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-index-after-ingest/change-feed-with-meta-pk-in-meta-index-after-ingest.1.ddl.aql
new file mode 100644
index 0000000..37e3e90
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-index-after-ingest/change-feed-with-meta-pk-in-meta-index-after-ingest.1.ddl.aql
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Create a change feed with meta-data, ingest some records then
+ * build a secondary index after the dataset has data
+ * Expected Res : Success
+ * Date         : 18th Jun 2016
+ */
+
+drop dataverse KeyVerse if exists;
+create dataverse KeyVerse;
+use dataverse KeyVerse;
+
+create type DocumentType as open{
+};
+
+create type KVMetaType as open{
+"key":string,
+vbucket:int32,
+seq:int64,
+cas:int64,
+expiration:int32,
+flags:int32,
+revSeq:int64,
+lockTime:int32
+};
+
+create dataset KVStore(DocumentType) with meta(KVMetaType)primary key meta()."key";
+
+create feed KVChangeStream using adapter(
+    ("type-name"="DocumentType"),
+    ("meta-type-name"="KVMetaType"),
+    ("reader"="org.apache.asterix.external.input.record.reader.kv.KVTestReaderFactory"),
+    ("parser"="record-with-metadata"),
+    ("format"="dcp"),
+    ("record-format"="json"),
+    ("change-feed"="true"),
+    ("key-indexes"="0"),
+    ("key-indicators"="1"),
+    ("num-of-records"="1000")
+);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-index-after-ingest/change-feed-with-meta-pk-in-meta-index-after-ingest.2.update.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-index-after-ingest/change-feed-with-meta-pk-in-meta-index-after-ingest.2.update.aql
new file mode 100644
index 0000000..be3b808
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-index-after-ingest/change-feed-with-meta-pk-in-meta-index-after-ingest.2.update.aql
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Create a change feed with meta-data, ingest some records then
+ * build a secondary index after the dataset has data
+ * Expected Res : Success
+ * Date         : 18th Jun 2016
+ */
+use dataverse KeyVerse;
+
+set wait-for-completion-feed "true";
+connect feed KVChangeStream to dataset KVStore;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-index-after-ingest/change-feed-with-meta-pk-in-meta-index-after-ingest.3.ddl.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-index-after-ingest/change-feed-with-meta-pk-in-meta-index-after-ingest.3.ddl.aql
new file mode 100644
index 0000000..4a0b32b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-index-after-ingest/change-feed-with-meta-pk-in-meta-index-after-ingest.3.ddl.aql
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Create a change feed with meta-data, ingest some records then
+ * build a secondary index after the dataset has data
+ * Expected Res : Success
+ * Date         : 18th Jun 2016
+ */
+use dataverse KeyVerse;
+
+create index VBucketIndex on KVStore(meta().vbucket);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-index-after-ingest/change-feed-with-meta-pk-in-meta-index-after-ingest.4.query.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-index-after-ingest/change-feed-with-meta-pk-in-meta-index-after-ingest.4.query.aql
new file mode 100644
index 0000000..190e472
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-index-after-ingest/change-feed-with-meta-pk-in-meta-index-after-ingest.4.query.aql
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Create a change feed with meta-data, ingest some records then
+ * build a secondary index after the dataset has data
+ * Expected Res : Success
+ * Date         : 18th Jun 2016
+ */
+use dataverse KeyVerse;
+
+count(
+    for $d in dataset KVStore
+    where meta($d).vbucket = 5
+    return $d
+);
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-index-after-ingest/change-feed-with-meta-pk-in-meta-index-after-ingest.5.ddl.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-index-after-ingest/change-feed-with-meta-pk-in-meta-index-after-ingest.5.ddl.aql
new file mode 100644
index 0000000..bf60e83
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-index-after-ingest/change-feed-with-meta-pk-in-meta-index-after-ingest.5.ddl.aql
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Create a change feed with meta-data, ingest some records then
+ * build a secondary index after the dataset has data
+ * Expected Res : Success
+ * Date         : 18th Jun 2016
+ */
+drop dataverse KeyVerse;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-index-in-meta/change-feed-with-meta-pk-in-meta-index-in-meta.1.ddl.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-index-in-meta/change-feed-with-meta-pk-in-meta-index-in-meta.1.ddl.aql
new file mode 100644
index 0000000..1a1acea
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-index-in-meta/change-feed-with-meta-pk-in-meta-index-in-meta.1.ddl.aql
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Create a change feed with meta-data
+ * build secondary index on a meta field and then test ingestion of records
+ * Expected Res : Success
+ * Date         : 18th Jun 2016
+ */
+
+drop dataverse KeyVerse if exists;
+create dataverse KeyVerse;
+use dataverse KeyVerse;
+
+create type DocumentType as open{
+};
+
+create type KVMetaType as open{
+"key":string,
+vbucket:int32,
+seq:int64,
+cas:int64,
+expiration:int32,
+flags:int32,
+revSeq:int64,
+lockTime:int32
+};
+
+create dataset KVStore(DocumentType) with meta(KVMetaType)primary key meta()."key";
+
+create feed KVChangeStream using adapter(
+    ("type-name"="DocumentType"),
+    ("meta-type-name"="KVMetaType"),
+    ("reader"="org.apache.asterix.external.input.record.reader.kv.KVTestReaderFactory"),
+    ("parser"="record-with-metadata"),
+    ("format"="dcp"),
+    ("record-format"="json"),
+    ("change-feed"="true"),
+    ("key-indexes"="0"),
+    ("key-indicators"="1"),
+    ("num-of-records"="1000")
+);
+
+create index VBucketIndex on KVStore(meta().vbucket);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-index-in-meta/change-feed-with-meta-pk-in-meta-index-in-meta.2.update.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-index-in-meta/change-feed-with-meta-pk-in-meta-index-in-meta.2.update.aql
new file mode 100644
index 0000000..81fffc5
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-index-in-meta/change-feed-with-meta-pk-in-meta-index-in-meta.2.update.aql
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Create a change feed with meta-data
+ * build secondary index on a meta field and then test ingestion of records
+ * Expected Res : Success
+ * Date         : 18th Jun 2016
+ */
+use dataverse KeyVerse;
+
+set wait-for-completion-feed "true";
+connect feed KVChangeStream to dataset KVStore;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-index-in-meta/change-feed-with-meta-pk-in-meta-index-in-meta.3.query.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-index-in-meta/change-feed-with-meta-pk-in-meta-index-in-meta.3.query.aql
new file mode 100644
index 0000000..59f70c8
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-index-in-meta/change-feed-with-meta-pk-in-meta-index-in-meta.3.query.aql
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Create a change feed with meta-data
+ * build secondary index on a meta field and then test ingestion of records
+ * Expected Res : Success
+ * Date         : 18th Jun 2016
+ */
+use dataverse KeyVerse;
+
+count(
+    for $d in dataset KVStore
+    where meta($d).vbucket = 5
+    return $d
+);
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-index-in-meta/change-feed-with-meta-pk-in-meta-index-in-meta.4.ddl.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-index-in-meta/change-feed-with-meta-pk-in-meta-index-in-meta.4.ddl.aql
new file mode 100644
index 0000000..b1cda6d
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-index-in-meta/change-feed-with-meta-pk-in-meta-index-in-meta.4.ddl.aql
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Create a change feed with meta-data
+ * build secondary index on a meta field and then test ingestion of records
+ * Expected Res : Success
+ * Date         : 18th Jun 2016
+ */
+drop dataverse KeyVerse;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-index-with-missing-after-ingest/change-feed-with-meta-pk-in-meta-index-with-missing-after-ingest.1.ddl.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-index-with-missing-after-ingest/change-feed-with-meta-pk-in-meta-index-with-missing-after-ingest.1.ddl.aql
new file mode 100644
index 0000000..6d475bd
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-index-with-missing-after-ingest/change-feed-with-meta-pk-in-meta-index-with-missing-after-ingest.1.ddl.aql
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Create a change feed with meta-data, ingest data then create an index
+ * on a field which sometimes is missing and sometimes is null
+ * Expected Res : Success
+ * Date         : 18th Jun 2016
+ */
+
+drop dataverse KeyVerse if exists;
+create dataverse KeyVerse;
+use dataverse KeyVerse;
+
+create type DocumentType as open{
+};
+
+create type KVMetaType as open{
+"key":string,
+vbucket:int32,
+seq:int64,
+cas:int64,
+expiration:int32,
+flags:int32,
+revSeq:int64,
+lockTime:int32
+};
+
+create dataset KVStore(DocumentType) with meta(KVMetaType)primary key meta()."key";
+
+create feed KVChangeStream using adapter(
+    ("type-name"="DocumentType"),
+    ("meta-type-name"="KVMetaType"),
+    ("reader"="org.apache.asterix.external.input.record.reader.kv.KVTestReaderFactory"),
+    ("parser"="record-with-metadata"),
+    ("format"="dcp"),
+    ("record-format"="json"),
+    ("change-feed"="true"),
+    ("key-indexes"="0"),
+    ("key-indicators"="1"),
+    ("num-of-records"="1000")
+);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-index-with-missing-after-ingest/change-feed-with-meta-pk-in-meta-index-with-missing-after-ingest.2.update.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-index-with-missing-after-ingest/change-feed-with-meta-pk-in-meta-index-with-missing-after-ingest.2.update.aql
new file mode 100644
index 0000000..37d67d0
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-index-with-missing-after-ingest/change-feed-with-meta-pk-in-meta-index-with-missing-after-ingest.2.update.aql
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Create a change feed with meta-data, ingest data then create an index
+ * on a field which sometimes is missing and sometimes is null
+ * Expected Res : Success
+ * Date         : 18th Jun 2016
+ */
+use dataverse KeyVerse;
+
+set wait-for-completion-feed "true";
+connect feed KVChangeStream to dataset KVStore;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-index-with-missing-after-ingest/change-feed-with-meta-pk-in-meta-index-with-missing-after-ingest.3.ddl.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-index-with-missing-after-ingest/change-feed-with-meta-pk-in-meta-index-with-missing-after-ingest.3.ddl.aql
new file mode 100644
index 0000000..5bfc09b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-index-with-missing-after-ingest/change-feed-with-meta-pk-in-meta-index-with-missing-after-ingest.3.ddl.aql
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Create a change feed with meta-data, ingest data then create an index
+ * on a field which sometimes is missing and sometimes is null
+ * Expected Res : Success
+ * Date         : 18th Jun 2016
+ */
+use dataverse KeyVerse;
+
+create index VBucketIndex on KVStore(exp:int32) enforced;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-index-with-missing-after-ingest/change-feed-with-meta-pk-in-meta-index-with-missing-after-ingest.4.query.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-index-with-missing-after-ingest/change-feed-with-meta-pk-in-meta-index-with-missing-after-ingest.4.query.aql
new file mode 100644
index 0000000..883be64
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-index-with-missing-after-ingest/change-feed-with-meta-pk-in-meta-index-with-missing-after-ingest.4.query.aql
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Create a change feed with meta-data, ingest data then create an index
+ * on a field which sometimes is missing and sometimes is null
+ * Expected Res : Success
+ * Date         : 18th Jun 2016
+ */
+use dataverse KeyVerse;
+
+count(
+    for $d in dataset KVStore
+    where $d.exp = 15
+    return $d
+);
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-index-with-missing-after-ingest/change-feed-with-meta-pk-in-meta-index-with-missing-after-ingest.5.ddl.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-index-with-missing-after-ingest/change-feed-with-meta-pk-in-meta-index-with-missing-after-ingest.5.ddl.aql
new file mode 100644
index 0000000..2f043ba
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-index-with-missing-after-ingest/change-feed-with-meta-pk-in-meta-index-with-missing-after-ingest.5.ddl.aql
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Create a change feed with meta-data, ingest data then create an index
+ * on a field which sometimes is missing and sometimes is null
+ * Expected Res : Success
+ * Date         : 18th Jun 2016
+ */
+drop dataverse KeyVerse;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-open-index-in-value/change-feed-with-meta-pk-in-meta-open-index-in-value.1.ddl.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-open-index-in-value/change-feed-with-meta-pk-in-meta-open-index-in-value.1.ddl.aql
new file mode 100644
index 0000000..0ae91fa
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-open-index-in-value/change-feed-with-meta-pk-in-meta-open-index-in-value.1.ddl.aql
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Create a change feed with meta-data, create an index
+ * on an open field in the record part then ingest data
+ * Expected Res : Success
+ * Date         : 18th Jun 2016
+ */
+
+drop dataverse KeyVerse if exists;
+create dataverse KeyVerse;
+use dataverse KeyVerse;
+
+create type DocumentType as open{
+};
+
+create type KVMetaType as open{
+"key":string,
+vbucket:int32,
+seq:int64,
+cas:int64,
+expiration:int32,
+flags:int32,
+revSeq:int64,
+lockTime:int32
+};
+
+create dataset KVStore(DocumentType) with meta(KVMetaType)primary key meta()."key";
+
+create feed KVChangeStream using adapter(
+    ("type-name"="DocumentType"),
+    ("meta-type-name"="KVMetaType"),
+    ("reader"="org.apache.asterix.external.input.record.reader.kv.KVTestReaderFactory"),
+    ("parser"="record-with-metadata"),
+    ("format"="dcp"),
+    ("record-format"="json"),
+    ("change-feed"="true"),
+    ("key-indexes"="0"),
+    ("key-indicators"="1"),
+    ("num-of-records"="1000")
+);
+
+create index OpenIndex on KVStore(id:int32) enforced;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-open-index-in-value/change-feed-with-meta-pk-in-meta-open-index-in-value.2.update.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-open-index-in-value/change-feed-with-meta-pk-in-meta-open-index-in-value.2.update.aql
new file mode 100644
index 0000000..ae5390b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-open-index-in-value/change-feed-with-meta-pk-in-meta-open-index-in-value.2.update.aql
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Create a change feed with meta-data, create an index
+ * on an open field in the record part then ingest data
+ * Expected Res : Success
+ * Date         : 18th Jun 2016
+ */
+use dataverse KeyVerse;
+
+set wait-for-completion-feed "true";
+connect feed KVChangeStream to dataset KVStore;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-open-index-in-value/change-feed-with-meta-pk-in-meta-open-index-in-value.3.query.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-open-index-in-value/change-feed-with-meta-pk-in-meta-open-index-in-value.3.query.aql
new file mode 100644
index 0000000..5e9b3f4
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-open-index-in-value/change-feed-with-meta-pk-in-meta-open-index-in-value.3.query.aql
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Create a change feed with meta-data, create an index
+ * on an open field in the record part then ingest data
+ * Expected Res : Success
+ * Date         : 18th Jun 2016
+ */
+use dataverse KeyVerse;
+
+for $d in dataset KVStore
+where $d.id = 5
+return $d;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-open-index-in-value/change-feed-with-meta-pk-in-meta-open-index-in-value.4.ddl.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-open-index-in-value/change-feed-with-meta-pk-in-meta-open-index-in-value.4.ddl.aql
new file mode 100644
index 0000000..004cebe
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-open-index-in-value/change-feed-with-meta-pk-in-meta-open-index-in-value.4.ddl.aql
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Create a change feed with meta-data, create an index
+ * on an open field in the record part then ingest data
+ * Expected Res : Success
+ * Date         : 18th Jun 2016
+ */
+drop dataverse KeyVerse;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-open-index-with-missing/change-feed-with-meta-pk-in-meta-open-index-with-missing.1.ddl.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-open-index-with-missing/change-feed-with-meta-pk-in-meta-open-index-with-missing.1.ddl.aql
new file mode 100644
index 0000000..3cf9374
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-open-index-with-missing/change-feed-with-meta-pk-in-meta-open-index-with-missing.1.ddl.aql
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Create a change feed with meta-data, create an index
+ * on a field which sometimes is missing and sometimes is null and then ingest data
+ * Expected Res : Success
+ * Date         : 18th Jun 2016
+ */
+
+drop dataverse KeyVerse if exists;
+create dataverse KeyVerse;
+use dataverse KeyVerse;
+
+create type DocumentType as open{
+};
+
+create type KVMetaType as open{
+"key":string,
+vbucket:int32,
+seq:int64,
+cas:int64,
+expiration:int32,
+flags:int32,
+revSeq:int64,
+lockTime:int32
+};
+
+create dataset KVStore(DocumentType) with meta(KVMetaType)primary key meta()."key";
+
+create feed KVChangeStream using adapter(
+    ("type-name"="DocumentType"),
+    ("meta-type-name"="KVMetaType"),
+    ("reader"="org.apache.asterix.external.input.record.reader.kv.KVTestReaderFactory"),
+    ("parser"="record-with-metadata"),
+    ("format"="dcp"),
+    ("record-format"="json"),
+    ("change-feed"="true"),
+    ("key-indexes"="0"),
+    ("key-indicators"="1"),
+    ("num-of-records"="1000")
+);
+
+create index OpenIndex on KVStore(exp:int32) enforced;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-open-index-with-missing/change-feed-with-meta-pk-in-meta-open-index-with-missing.2.update.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-open-index-with-missing/change-feed-with-meta-pk-in-meta-open-index-with-missing.2.update.aql
new file mode 100644
index 0000000..9b34e70
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-open-index-with-missing/change-feed-with-meta-pk-in-meta-open-index-with-missing.2.update.aql
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Create a change feed with meta-data, create an index
+ * on a field which sometimes is missing and sometimes is null and then ingest data
+ * Expected Res : Success
+ * Date         : 18th Jun 2016
+ */
+use dataverse KeyVerse;
+
+set wait-for-completion-feed "true";
+connect feed KVChangeStream to dataset KVStore;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-open-index-with-missing/change-feed-with-meta-pk-in-meta-open-index-with-missing.3.query.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-open-index-with-missing/change-feed-with-meta-pk-in-meta-open-index-with-missing.3.query.aql
new file mode 100644
index 0000000..8c887f1
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-open-index-with-missing/change-feed-with-meta-pk-in-meta-open-index-with-missing.3.query.aql
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Create a change feed with meta-data, create an index
+ * on a field which sometimes is missing and sometimes is null and then ingest data
+ * Expected Res : Success
+ * Date         : 18th Jun 2016
+ */
+use dataverse KeyVerse;
+
+for $d in dataset KVStore
+where $d.exp = 15
+return $d;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-open-index-with-missing/change-feed-with-meta-pk-in-meta-open-index-with-missing.4.ddl.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-open-index-with-missing/change-feed-with-meta-pk-in-meta-open-index-with-missing.4.ddl.aql
new file mode 100644
index 0000000..2fcc497
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta-open-index-with-missing/change-feed-with-meta-pk-in-meta-open-index-with-missing.4.ddl.aql
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Create a change feed with meta-data, create an index
+ * on a field which sometimes is missing and sometimes is null and then ingest data
+ * Expected Res : Success
+ * Date         : 18th Jun 2016
+ */
+drop dataverse KeyVerse;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-with-mixed-index/change-feed-with-meta-with-mixed-index.1.ddl.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-with-mixed-index/change-feed-with-meta-with-mixed-index.1.ddl.aql
new file mode 100644
index 0000000..2471f79
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-with-mixed-index/change-feed-with-meta-with-mixed-index.1.ddl.aql
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Create a change feed with meta-data, create a composite index
+ * on a field from meta and a field from the value and then ingest data
+ * Expected Res : Success
+ * Date         : 18th Jun 2016
+ */
+
+drop dataverse KeyVerse if exists;
+create dataverse KeyVerse;
+use dataverse KeyVerse;
+
+create type DocumentType as open{
+id:int32
+};
+
+create type KVMetaType as open{
+"key":string,
+vbucket:int32,
+seq:int64,
+cas:int64,
+expiration:int32,
+flags:int32,
+revSeq:int64,
+lockTime:int32
+};
+
+create dataset KVStore(DocumentType) with meta(KVMetaType)primary key meta()."key";
+
+create feed KVChangeStream using adapter(
+    ("type-name"="DocumentType"),
+    ("meta-type-name"="KVMetaType"),
+    ("reader"="org.apache.asterix.external.input.record.reader.kv.KVTestReaderFactory"),
+    ("parser"="record-with-metadata"),
+    ("format"="dcp"),
+    ("record-format"="json"),
+    ("change-feed"="true"),
+    ("key-indexes"="0"),
+    ("key-indicators"="1"),
+    ("num-of-records"="1000")
+);
+
+create index MixedIndex on KVStore(meta().vbucket,id);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-with-mixed-index/change-feed-with-meta-with-mixed-index.2.update.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-with-mixed-index/change-feed-with-meta-with-mixed-index.2.update.aql
new file mode 100644
index 0000000..9fa0c3d
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-with-mixed-index/change-feed-with-meta-with-mixed-index.2.update.aql
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Create a change feed with meta-data, create a composite index
+ * on a field from meta and a field from the value and then ingest data
+ * Expected Res : Success
+ * Date         : 18th Jun 2016
+ */
+use dataverse KeyVerse;
+
+set wait-for-completion-feed "true";
+connect feed KVChangeStream to dataset KVStore;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-with-mixed-index/change-feed-with-meta-with-mixed-index.3.query.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-with-mixed-index/change-feed-with-meta-with-mixed-index.3.query.aql
new file mode 100644
index 0000000..5089770
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-with-mixed-index/change-feed-with-meta-with-mixed-index.3.query.aql
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Create a change feed with meta-data, create a composite index
+ * on a field from meta and a field from the value and then ingest data
+ * Expected Res : Success
+ * Date         : 18th Jun 2016
+ */
+use dataverse KeyVerse;
+
+for $d in dataset KVStore
+where $d.id = 5
+and meta($d).vbucket<100
+return $d;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-with-mixed-index/change-feed-with-meta-with-mixed-index.4.ddl.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-with-mixed-index/change-feed-with-meta-with-mixed-index.4.ddl.aql
new file mode 100644
index 0000000..8f688b1
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-with-mixed-index/change-feed-with-meta-with-mixed-index.4.ddl.aql
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Create a change feed with meta-data, create a composite index
+ * on a field from meta and a field from the value and then ingest data
+ * Expected Res : Success
+ * Date         : 18th Jun 2016
+ */
+drop dataverse KeyVerse;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta-index-after-ingest/change-feed-with-meta-pk-in-meta-index-after-ingest.4.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta-index-after-ingest/change-feed-with-meta-pk-in-meta-index-after-ingest.4.adm
new file mode 100644
index 0000000..56a6051
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta-index-after-ingest/change-feed-with-meta-pk-in-meta-index-after-ingest.4.adm
@@ -0,0 +1 @@
+1
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta-index-in-meta/change-feed-with-meta-pk-in-meta-index-in-meta.4.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta-index-in-meta/change-feed-with-meta-pk-in-meta-index-in-meta.4.adm
new file mode 100644
index 0000000..56a6051
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta-index-in-meta/change-feed-with-meta-pk-in-meta-index-in-meta.4.adm
@@ -0,0 +1 @@
+1
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta-index-with-missing-after-ingest/change-feed-with-meta-pk-in-meta-index-with-missing-after-ingest.4.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta-index-with-missing-after-ingest/change-feed-with-meta-pk-in-meta-index-with-missing-after-ingest.4.adm
new file mode 100644
index 0000000..56a6051
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta-index-with-missing-after-ingest/change-feed-with-meta-pk-in-meta-index-with-missing-after-ingest.4.adm
@@ -0,0 +1 @@
+1
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta-open-index-in-value/change-feed-with-meta-pk-in-meta-open-index-in-value.4.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta-open-index-in-value/change-feed-with-meta-pk-in-meta-open-index-in-value.4.adm
new file mode 100644
index 0000000..77d1727
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta-open-index-in-value/change-feed-with-meta-pk-in-meta-open-index-in-value.4.adm
@@ -0,0 +1 @@
+{ "id": 5, "name": "Ian Maxon", "exp": 15 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta-open-index-with-missing/change-feed-with-meta-pk-in-meta-open-index-with-missing.4.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta-open-index-with-missing/change-feed-with-meta-pk-in-meta-open-index-with-missing.4.adm
new file mode 100644
index 0000000..77d1727
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta-open-index-with-missing/change-feed-with-meta-pk-in-meta-open-index-with-missing.4.adm
@@ -0,0 +1 @@
+{ "id": 5, "name": "Ian Maxon", "exp": 15 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-with-mixed-index/change-feed-with-meta-with-mixed-index.3.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-with-mixed-index/change-feed-with-meta-with-mixed-index.3.adm
new file mode 100644
index 0000000..8402b39
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-with-mixed-index/change-feed-with-meta-with-mixed-index.3.adm
@@ -0,0 +1 @@
+{ "id": 5i32, "name": "Ian Maxon", "exp": 15 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
index be663e3..3907677 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
@@ -92,6 +92,42 @@
   </test-group>
   <test-group name="feeds">
     <test-case FilePath="feeds">
+      <compilation-unit name="change-feed-with-meta-with-mixed-index">
+        <output-dir compare="Text">change-feed-with-meta-with-mixed-index</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="feeds">
+      <compilation-unit name="change-feed-with-meta-pk-in-meta-index-with-missing-after-ingest">
+        <output-dir compare="Text">change-feed-with-meta-pk-in-meta-index-with-missing-after-ingest</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="feeds">
+      <compilation-unit name="change-feed-with-meta-pk-in-meta-open-index-with-missing">
+        <output-dir compare="Text">change-feed-with-meta-pk-in-meta-open-index-with-missing</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="feeds">
+      <compilation-unit name="change-feed-with-meta-open-index-in-meta">
+        <output-dir compare="Text">change-feed-with-meta-open-index-in-meta</output-dir>
+        <expected-error>Typed open index can only be created on the record part</expected-error>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="feeds">
+      <compilation-unit name="change-feed-with-meta-pk-in-meta-open-index-in-value">
+        <output-dir compare="Text">change-feed-with-meta-pk-in-meta-open-index-in-value</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="feeds">
+      <compilation-unit name="change-feed-with-meta-pk-in-meta-index-after-ingest">
+        <output-dir compare="Text">change-feed-with-meta-pk-in-meta-index-after-ingest</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="feeds">
+      <compilation-unit name="change-feed-with-meta-pk-in-meta-index-in-meta">
+        <output-dir compare="Text">change-feed-with-meta-pk-in-meta-index-in-meta</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="feeds">
       <compilation-unit name="drop-nonexistent-feed">
         <output-dir compare="Text">drop-nonexistent-feed</output-dir>
       </compilation-unit>
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/kv/KVTestReader.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/kv/KVTestReader.java
index f7fe77f..57f37da 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/kv/KVTestReader.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/kv/KVTestReader.java
@@ -153,8 +153,21 @@
         // reset the string
         strBuilder.setLength(0);
         strBuilder.append("{\"id\":" + (counter + upsertCounter) + ",\"name\":\""
-                + names[(counter + upsertCounter) % names.length] + "\",\"exp\":" + ((counter + upsertCounter) * 3)
-                + "}");
+                + names[(counter + upsertCounter) % names.length] + "\"");
+        switch (counter % 3) {
+            case 0:
+                // Missing
+                break;
+            case 1:
+                strBuilder.append(",\"exp\":null");
+                break;
+            case 2:
+                strBuilder.append(",\"exp\":" + ((counter + upsertCounter) * 3));
+                break;
+            default:
+                break;
+        }
+        strBuilder.append("}");
         byteBuff.clear();
         byteBuff.writeBytes(strBuilder.toString().getBytes(StandardCharsets.UTF_8));
     }
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateIndexStatement.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateIndexStatement.java
index d3d681a..44c249f 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateIndexStatement.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateIndexStatement.java
@@ -123,6 +123,17 @@
         return Kind.CREATE_INDEX;
     }
 
+    public boolean hasMetaField() {
+        if (fieldIndexIndicators != null) {
+            for (Integer indicator : fieldIndexIndicators) {
+                if (indicator.intValue() != 0) {
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
+
     @Override
     public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws AsterixException {
         return visitor.visit(this, arg);
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlDataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlDataSource.java
index c13ee0e..d809f4f 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlDataSource.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlDataSource.java
@@ -209,19 +209,12 @@
     }
 
     public LogicalVariable getDataRecordVariable(List<LogicalVariable> dataScanVariables) {
-        if (hasMeta()) {
-            return dataScanVariables.get(dataScanVariables.size() - 2);
-        } else {
-            return dataScanVariables.get(dataScanVariables.size() - 1);
-        }
+        return hasMeta() ? dataScanVariables.get(dataScanVariables.size() - 2)
+                : dataScanVariables.get(dataScanVariables.size() - 1);
     }
 
     public List<LogicalVariable> getPrimaryKeyVariables(List<LogicalVariable> dataScanVariables) {
-        if (hasMeta()) {
-            return new ArrayList<>(dataScanVariables.subList(0, dataScanVariables.size() - 2));
-        } else {
-            return new ArrayList<>(dataScanVariables.subList(0, dataScanVariables.size() - 1));
-        }
+        return new ArrayList<>(dataScanVariables.subList(0, dataScanVariables.size() - (hasMeta() ? 2 : 1)));
     }
 
 }
\ No newline at end of file
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
index f2e0424..7557b79 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
@@ -581,6 +581,14 @@
 
             ARecordType itemType =
                     (ARecordType) this.findType(dataset.getItemTypeDataverseName(), dataset.getItemTypeName());
+            ARecordType metaType = null;
+            List<Integer> primaryKeyIndicators = null;
+            if (dataset.hasMetaPart()) {
+                metaType =
+                        (ARecordType) findType(dataset.getMetaItemTypeDataverseName(), dataset.getMetaItemTypeName());
+                primaryKeyIndicators = ((InternalDatasetDetails) dataset.getDatasetDetails()).getKeySourceIndicator();
+            }
+
             ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, itemType);
             IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
                     itemType, context.getBinaryComparatorFactoryProvider());
@@ -598,7 +606,9 @@
                 Pair<IBinaryComparatorFactory[], ITypeTraits[]> comparatorFactoriesAndTypeTraits =
                         getComparatorFactoriesAndTypeTraitsOfSecondaryBTreeIndex(secondaryIndex.getIndexType(),
                                 secondaryIndex.getKeyFieldNames(), secondaryIndex.getKeyFieldTypes(),
-                                DatasetUtils.getPartitioningKeys(dataset), itemType, dataset.getDatasetType());
+                                DatasetUtils.getPartitioningKeys(dataset), itemType, dataset.getDatasetType(),
+                                dataset.hasMetaPart(), primaryKeyIndicators,
+                                secondaryIndex.getKeyFieldSourceIndicators(), metaType);
                 comparatorFactories = comparatorFactoriesAndTypeTraits.first;
                 typeTraits = comparatorFactoriesAndTypeTraits.second;
                 if (filterTypeTraits != null) {
@@ -618,7 +628,7 @@
                 // get meta item type
                 ARecordType metaItemType = DatasetUtils.getMetaType(this, dataset);
                 typeTraits = DatasetUtils.computeTupleTypeTraits(dataset, itemType, metaItemType);
-                comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(dataset, itemType,
+                comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(dataset, itemType, metaItemType,
                         context.getBinaryComparatorFactoryProvider());
                 filterFields = DatasetUtils.createFilterFields(dataset);
                 btreeFields = DatasetUtils.createBTreeFieldsWhenThereisAFilter(dataset);
@@ -699,7 +709,9 @@
 
     private Pair<IBinaryComparatorFactory[], ITypeTraits[]> getComparatorFactoriesAndTypeTraitsOfSecondaryBTreeIndex(
             IndexType indexType, List<List<String>> sidxKeyFieldNames, List<IAType> sidxKeyFieldTypes,
-            List<List<String>> pidxKeyFieldNames, ARecordType recType, DatasetType dsType) throws AlgebricksException {
+            List<List<String>> pidxKeyFieldNames, ARecordType recType, DatasetType dsType, boolean hasMeta,
+            List<Integer> primaryIndexKeyIndicators, List<Integer> secondaryIndexIndicators, ARecordType metaType)
+            throws AlgebricksException {
 
         IBinaryComparatorFactory[] comparatorFactories;
         ITypeTraits[] typeTraits;
@@ -711,7 +723,8 @@
         int i = 0;
         for (; i < sidxKeyFieldCount; ++i) {
             Pair<IAType, Boolean> keyPairType =
-                    Index.getNonNullableOpenFieldType(sidxKeyFieldTypes.get(i), sidxKeyFieldNames.get(i), recType);
+                    Index.getNonNullableOpenFieldType(sidxKeyFieldTypes.get(i), sidxKeyFieldNames.get(i),
+                            (hasMeta && secondaryIndexIndicators.get(i).intValue() == 1) ? metaType : recType);
             IAType keyType = keyPairType.first;
             comparatorFactories[i] =
                     AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType, true);
@@ -723,7 +736,9 @@
             try {
                 switch (dsType) {
                     case INTERNAL:
-                        keyType = recType.getSubFieldType(pidxKeyFieldNames.get(j));
+                        keyType = (hasMeta && primaryIndexKeyIndicators.get(j).intValue() == 1)
+                                ? metaType.getSubFieldType(pidxKeyFieldNames.get(j))
+                                : recType.getSubFieldType(pidxKeyFieldNames.get(j));
                         break;
                     case EXTERNAL:
                         keyType = IndexingConstants.getFieldType(j);
@@ -794,9 +809,14 @@
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc =
                     splitProviderAndPartitionConstraintsForDataset(dataset.getDataverseName(), dataset.getDatasetName(),
                             indexName, temp);
+            ARecordType metaType = null;
+            if (dataset.hasMetaPart()) {
+                metaType =
+                        (ARecordType) findType(dataset.getMetaItemTypeDataverseName(), dataset.getMetaItemTypeName());
+            }
 
             IBinaryComparatorFactory[] primaryComparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(
-                    dataset, recType, context.getBinaryComparatorFactoryProvider());
+                    dataset, recType, metaType, context.getBinaryComparatorFactoryProvider());
             int[] btreeFields = new int[primaryComparatorFactories.length];
             for (int i = 0; i < btreeFields.length; i++) {
                 btreeFields[i] = i + numNestedSecondaryKeyFields;
@@ -1015,12 +1035,18 @@
                     dataset.getDatasetName(), dataset.getDatasetName());
             String indexName = primaryIndex.getIndexName();
 
+            ARecordType metaType = null;
+            if (dataset.hasMetaPart()) {
+                metaType =
+                        (ARecordType) findType(dataset.getMetaItemTypeDataverseName(), dataset.getMetaItemTypeName());
+            }
+
             String itemTypeName = dataset.getItemTypeName();
             ARecordType itemType = (ARecordType) MetadataManager.INSTANCE
                     .getDatatype(mdTxnCtx, dataset.getItemTypeDataverseName(), itemTypeName).getDatatype();
             ITypeTraits[] typeTraits = DatasetUtils.computeTupleTypeTraits(dataset, itemType, null);
             IBinaryComparatorFactory[] comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(dataset,
-                    itemType, context.getBinaryComparatorFactoryProvider());
+                    itemType, metaType, context.getBinaryComparatorFactoryProvider());
 
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
                     splitProviderAndPartitionConstraintsForDataset(dataSource.getId().getDataverseName(), datasetName,
@@ -1111,7 +1137,7 @@
 
             IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
             IBinaryComparatorFactory[] comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(dataset,
-                    itemType, context.getBinaryComparatorFactoryProvider());
+                    itemType, metaItemType, context.getBinaryComparatorFactoryProvider());
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
                     splitProviderAndPartitionConstraintsForDataset(dataSource.getId().getDataverseName(), datasetName,
                             indexName, temp);
@@ -1749,11 +1775,12 @@
                 numTokenFields = secondaryKeys.size() + 1;
             }
 
+            ARecordType metaItemType = DatasetUtils.getMetaType(this, dataset);
             ITypeTraits[] tokenTypeTraits = new ITypeTraits[numTokenFields];
             ITypeTraits[] invListsTypeTraits = new ITypeTraits[primaryKeys.size()];
             IBinaryComparatorFactory[] tokenComparatorFactories = new IBinaryComparatorFactory[numTokenFields];
             IBinaryComparatorFactory[] invListComparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(
-                    dataset, recType, context.getBinaryComparatorFactoryProvider());
+                    dataset, recType, metaItemType, context.getBinaryComparatorFactoryProvider());
 
             IAType secondaryKeyType = null;
 
@@ -1936,9 +1963,9 @@
                 typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
                 ++i;
             }
-
+            ARecordType metaItemType = DatasetUtils.getMetaType(this, dataset);
             IBinaryComparatorFactory[] primaryComparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(
-                    dataset, recType, context.getBinaryComparatorFactoryProvider());
+                    dataset, recType, metaItemType, context.getBinaryComparatorFactoryProvider());
             IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
                     splitProviderAndPartitionConstraintsForDataset(dataverseName, datasetName, indexName, temp);
@@ -2276,7 +2303,7 @@
             ITypeTraits[] typeTraits = DatasetUtils.computeTupleTypeTraits(dataset, itemType, metaItemType);
             IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
             IBinaryComparatorFactory[] comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(dataset,
-                    itemType, context.getBinaryComparatorFactoryProvider());
+                    itemType, metaItemType, context.getBinaryComparatorFactoryProvider());
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
                     splitProviderAndPartitionConstraintsForDataset(dataSource.getId().getDataverseName(), datasetName,
                             indexName, temp);
@@ -2315,17 +2342,27 @@
                     btreeFields, filterFields, !temp);
             AsterixLSMTreeUpsertOperatorDescriptor op;
 
-            ITypeTraits[] outputTypeTraits = new ITypeTraits[recordDesc.getFieldCount() + 1 + numFilterFields];
+            ITypeTraits[] outputTypeTraits =
+                    new ITypeTraits[recordDesc.getFieldCount() + (dataset.hasMetaPart() ? 2 : 1) + numFilterFields];
             ISerializerDeserializer[] outputSerDes =
-                    new ISerializerDeserializer[recordDesc.getFieldCount() + 1 + numFilterFields];
+                    new ISerializerDeserializer[recordDesc.getFieldCount() + (dataset.hasMetaPart() ? 2 : 1)
+                            + numFilterFields];
             for (int j = 0; j < recordDesc.getFieldCount(); j++) {
                 outputTypeTraits[j] = recordDesc.getTypeTraits()[j];
                 outputSerDes[j] = recordDesc.getFields()[j];
             }
-            outputSerDes[outputSerDes.length - 1 - numFilterFields] =
+            outputSerDes[outputSerDes.length - (dataset.hasMetaPart() ? 2 : 1) - numFilterFields] =
                     FormatUtils.getDefaultFormat().getSerdeProvider().getSerializerDeserializer(itemType);
-            outputTypeTraits[outputTypeTraits.length - 1 - numFilterFields] =
+            outputTypeTraits[outputTypeTraits.length - (dataset.hasMetaPart() ? 2 : 1) - numFilterFields] =
                     FormatUtils.getDefaultFormat().getTypeTraitProvider().getTypeTrait(itemType);
+
+            if (dataset.hasMetaPart()) {
+                outputSerDes[outputSerDes.length - 1 - numFilterFields] =
+                        FormatUtils.getDefaultFormat().getSerdeProvider().getSerializerDeserializer(metaItemType);
+                outputTypeTraits[outputTypeTraits.length - 1 - numFilterFields] =
+                        FormatUtils.getDefaultFormat().getTypeTraitProvider().getTypeTrait(metaItemType);
+            }
+
             int fieldIdx = -1;
             if (numFilterFields > 0) {
                 String filterField = DatasetUtils.getFilterField(dataset).get(0);
@@ -2531,11 +2568,12 @@
                 numTokenFields = secondaryKeys.size() + 1;
             }
 
+            ARecordType metaItemType = DatasetUtils.getMetaType(this, dataset);
             ITypeTraits[] tokenTypeTraits = new ITypeTraits[numTokenFields];
             ITypeTraits[] invListsTypeTraits = new ITypeTraits[primaryKeys.size()];
             IBinaryComparatorFactory[] tokenComparatorFactories = new IBinaryComparatorFactory[numTokenFields];
             IBinaryComparatorFactory[] invListComparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(
-                    dataset, recType, context.getBinaryComparatorFactoryProvider());
+                    dataset, recType, metaItemType, context.getBinaryComparatorFactoryProvider());
 
             IAType secondaryKeyType = null;
 
@@ -2736,8 +2774,9 @@
                 ++i;
             }
 
+            ARecordType metaItemType = DatasetUtils.getMetaType(this, dataset);
             IBinaryComparatorFactory[] primaryComparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(
-                    dataset, recType, context.getBinaryComparatorFactoryProvider());
+                    dataset, recType, metaItemType, context.getBinaryComparatorFactoryProvider());
             IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
                     splitProviderAndPartitionConstraintsForDataset(dataverseName, datasetName, indexName, temp);
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Index.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Index.java
index 949e6ca..bdf9ed0 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Index.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Index.java
@@ -238,4 +238,15 @@
         }
         return dataverseName.compareTo(otherIndex.getDataverseName());
     }
+
+    public boolean hasMetaFields() {
+        if (keyFieldSourceIndicators != null) {
+            for (Integer indicator : keyFieldSourceIndicators) {
+                if (indicator.intValue() != 0) {
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
 }
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtils.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtils.java
index 0ac4f56..d0af744 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtils.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtils.java
@@ -59,7 +59,8 @@
 
 public class DatasetUtils {
     public static IBinaryComparatorFactory[] computeKeysBinaryComparatorFactories(Dataset dataset, ARecordType itemType,
-            IBinaryComparatorFactoryProvider comparatorFactoryProvider) throws AlgebricksException {
+            ARecordType metaItemType, IBinaryComparatorFactoryProvider comparatorFactoryProvider)
+            throws AlgebricksException {
         List<List<String>> partitioningKeys = getPartitioningKeys(dataset);
         IBinaryComparatorFactory[] bcfs = new IBinaryComparatorFactory[partitioningKeys.size()];
         if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
@@ -72,8 +73,11 @@
                 }
             }
         } else {
+            InternalDatasetDetails dsd = (InternalDatasetDetails) dataset.getDatasetDetails();
             for (int i = 0; i < partitioningKeys.size(); i++) {
-                IAType keyType = itemType.getSubFieldType(partitioningKeys.get(i));
+                IAType keyType = (dataset.hasMetaPart() && dsd.getKeySourceIndicator().get(i).intValue() == 1)
+                        ? metaItemType.getSubFieldType(partitioningKeys.get(i))
+                        : itemType.getSubFieldType(partitioningKeys.get(i));
                 bcfs[i] = comparatorFactoryProvider.getBinaryComparatorFactory(keyType, true);
             }
         }
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/KeyFieldTypeUtils.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/KeyFieldTypeUtils.java
index 60ca42f..108cd33 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/KeyFieldTypeUtils.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/KeyFieldTypeUtils.java
@@ -204,6 +204,6 @@
      */
     public static ARecordType chooseSource(List<Integer> keySourceIndicators, int index, ARecordType recordType,
             ARecordType metaRecordType) {
-        return keySourceIndicators.get(0) == 0 ? recordType : metaRecordType;
+        return keySourceIndicators.get(index) == 0 ? recordType : metaRecordType;
     }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/CastRecordDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/CastRecordDescriptor.java
index 8f21ffc..81e132a 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/CastRecordDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/CastRecordDescriptor.java
@@ -39,6 +39,9 @@
 
 public class CastRecordDescriptor extends AbstractScalarFunctionDynamicDescriptor {
 
+    private CastRecordDescriptor() {
+    }
+
     public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
         @Override
         public IFunctionDescriptor createFunctionDescriptor() {
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java
index 24988d4..857f1e2 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java
@@ -74,29 +74,31 @@
     private int presetFieldIndex = -1;
     private ARecordPointable recPointable;
     private DataOutput prevDos;
+    private final boolean hasMeta;
 
     public AsterixLSMPrimaryUpsertOperatorNodePushable(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
             int partition, int[] fieldPermutation, IRecordDescriptorProvider recordDescProvider, int numOfPrimaryKeys,
             ARecordType recordType, int filterFieldIndex) {
         super(opDesc, ctx, partition, fieldPermutation, recordDescProvider, IndexOperation.UPSERT);
-        // initialize missingWriter
-        this.missingWriter = opDesc.getMissingWriterFactory().createMissingWriter();
-        // The search key should only have the primary index and not use the permutations.
         this.key = new PermutingFrameTupleReference();
+        this.numOfPrimaryKeys = numOfPrimaryKeys;
+        missingWriter = opDesc.getMissingWriterFactory().createMissingWriter();
         int[] searchKeyPermutations = new int[numOfPrimaryKeys];
         for (int i = 0; i < searchKeyPermutations.length; i++) {
             searchKeyPermutations[i] = fieldPermutation[i];
         }
         key.setFieldPermutation(searchKeyPermutations);
-        this.numOfPrimaryKeys = numOfPrimaryKeys;
+        hasMeta = (fieldPermutation.length > numOfPrimaryKeys + 1) && (filterFieldIndex < 0
+                || (filterFieldIndex >= 0 && (fieldPermutation.length > numOfPrimaryKeys + 2)));
         if (filterFieldIndex >= 0) {
             isFiltered = true;
             this.recordType = recordType;
             this.presetFieldIndex = filterFieldIndex;
             this.recPointable = (ARecordPointable) ARecordPointable.FACTORY.createPointable();
-            this.prevRecWithPKWithFilterValue = new ArrayTupleBuilder(fieldPermutation.length);
+            this.prevRecWithPKWithFilterValue = new ArrayTupleBuilder(fieldPermutation.length + (hasMeta ? 1 : 0));
             this.prevDos = prevRecWithPKWithFilterValue.getDataOutput();
         }
+
     }
 
     // we have the permutation which has [pk locations, record location, optional:filter-location]
@@ -129,7 +131,6 @@
             modCallback = opDesc.getModificationOpCallbackFactory().createModificationOperationCallback(
                     indexHelper.getResourcePath(), indexHelper.getResourceID(), indexHelper.getResourcePartition(),
                     index, ctx, this);
-
             indexAccessor = index.createAccessor(modCallback, opDesc.getSearchOpCallbackFactory()
                     .createSearchOperationCallback(indexHelper.getResourceID(), ctx, this));
             cursor = indexAccessor.createSearchCursor(false);
@@ -161,14 +162,24 @@
             dos.write(prevTuple.getFieldData(numOfPrimaryKeys), prevTuple.getFieldStart(numOfPrimaryKeys),
                     prevTuple.getFieldLength(numOfPrimaryKeys));
             tb.addFieldEndOffset();
-            // if with filters, append the filter
-            if (isFiltered) {
+            // if has meta, then append meta
+            if (hasMeta) {
                 dos.write(prevTuple.getFieldData(numOfPrimaryKeys + 1), prevTuple.getFieldStart(numOfPrimaryKeys + 1),
                         prevTuple.getFieldLength(numOfPrimaryKeys + 1));
                 tb.addFieldEndOffset();
             }
+            // if with filters, append the filter
+            if (isFiltered) {
+                dos.write(prevTuple.getFieldData(numOfPrimaryKeys + (hasMeta ? 2 : 1)),
+                        prevTuple.getFieldStart(numOfPrimaryKeys + (hasMeta ? 2 : 1)),
+                        prevTuple.getFieldLength(numOfPrimaryKeys + (hasMeta ? 2 : 1)));
+                tb.addFieldEndOffset();
+            }
         } else {
             addNullField();
+            if (hasMeta) {
+                addNullField();
+            }
             // if with filters, append null
             if (isFiltered) {
                 addNullField();
@@ -191,7 +202,6 @@
     //TODO: use tryDelete/tryInsert in order to prevent deadlocks
     @Override
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-
         accessor.reset(buffer);
         LSMTreeIndexAccessor lsmAccessor = (LSMTreeIndexAccessor) indexAccessor;
         int tupleCount = accessor.getTupleCount();
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IndexInsertDeleteUpsertOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IndexInsertDeleteUpsertOperator.java
index c06cc18..d47492c 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IndexInsertDeleteUpsertOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IndexInsertDeleteUpsertOperator.java
@@ -49,16 +49,19 @@
     // used for upsert operations
     private List<Mutable<ILogicalExpression>> prevSecondaryKeyExprs;
     private Mutable<ILogicalExpression> prevAdditionalFilteringExpression;
+    private final int numberOfAdditionalNonFilteringFields;
 
     public IndexInsertDeleteUpsertOperator(IDataSourceIndex<?, ?> dataSourceIndex,
             List<Mutable<ILogicalExpression>> primaryKeyExprs, List<Mutable<ILogicalExpression>> secondaryKeyExprs,
-            Mutable<ILogicalExpression> filterExpr, Kind operation, boolean bulkload) {
+            Mutable<ILogicalExpression> filterExpr, Kind operation, boolean bulkload,
+            int numberOfAdditionalNonFilteringFields) {
         this.dataSourceIndex = dataSourceIndex;
         this.primaryKeyExprs = primaryKeyExprs;
         this.secondaryKeyExprs = secondaryKeyExprs;
         this.filterExpr = filterExpr;
         this.operation = operation;
         this.bulkload = bulkload;
+        this.numberOfAdditionalNonFilteringFields = numberOfAdditionalNonFilteringFields;
     }
 
     @Override
@@ -181,4 +184,8 @@
     public void setPrevAdditionalFilteringExpression(Mutable<ILogicalExpression> prevAdditionalFilteringExpression) {
         this.prevAdditionalFilteringExpression = prevAdditionalFilteringExpression;
     }
+
+    public int getNumberOfAdditionalNonFilteringFields() {
+        return numberOfAdditionalNonFilteringFields;
+    }
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteUpsertOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteUpsertOperator.java
index 6a74eb9..7d6c299 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteUpsertOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteUpsertOperator.java
@@ -56,6 +56,9 @@
     // previous filter (for UPSERT)
     private LogicalVariable prevFilterVar;
     private Object prevFilterType;
+    // previous additional fields (for UPSERT)
+    private List<LogicalVariable> prevAdditionalNonFilteringVars;
+    private List<Object> prevAdditionalNonFilteringTypes;
 
     public InsertDeleteUpsertOperator(IDataSource<?> dataSource, Mutable<ILogicalExpression> payloadExpr,
             List<Mutable<ILogicalExpression>> primaryKeyExprs,
@@ -85,6 +88,9 @@
         if (operation == Kind.UPSERT) {
             // The upsert case also produces the previous record
             schema.add(prevRecordVar);
+            if (additionalNonFilteringExpressions != null) {
+                schema.addAll(prevAdditionalNonFilteringVars);
+            }
             if (prevFilterVar != null) {
                 schema.add(prevFilterVar);
             }
@@ -95,6 +101,9 @@
         if (prevRecordVar != null) {
             producedVariables.add(prevRecordVar);
         }
+        if (prevAdditionalNonFilteringVars != null) {
+            producedVariables.addAll(prevAdditionalNonFilteringVars);
+        }
         if (prevFilterVar != null) {
             producedVariables.add(prevFilterVar);
         }
@@ -140,6 +149,11 @@
                 target.addAllVariables(sources[0]);
                 if (operation == Kind.UPSERT) {
                     target.addVariable(prevRecordVar);
+                    if (prevAdditionalNonFilteringVars != null) {
+                        for (LogicalVariable var : prevAdditionalNonFilteringVars) {
+                            target.addVariable(var);
+                        }
+                    }
                     if (prevFilterVar != null) {
                         target.addVariable(prevFilterVar);
                     }
@@ -158,6 +172,11 @@
         PropagatingTypeEnvironment env = createPropagatingAllInputsTypeEnvironment(ctx);
         if (operation == Kind.UPSERT) {
             env.setVarType(prevRecordVar, prevRecordType);
+            if (prevAdditionalNonFilteringVars != null) {
+                for (int i = 0; i < prevAdditionalNonFilteringVars.size(); i++) {
+                    env.setVarType(prevAdditionalNonFilteringVars.get(i), prevAdditionalNonFilteringTypes.get(i));
+                }
+            }
             if (prevFilterVar != null) {
                 env.setVarType(prevFilterVar, prevFilterType);
             }
@@ -224,4 +243,20 @@
     public void setPrevFilterType(Object prevFilterType) {
         this.prevFilterType = prevFilterType;
     }
+
+    public List<LogicalVariable> getPrevAdditionalNonFilteringVars() {
+        return prevAdditionalNonFilteringVars;
+    }
+
+    public void setPrevAdditionalNonFilteringVars(List<LogicalVariable> prevAdditionalNonFilteringVars) {
+        this.prevAdditionalNonFilteringVars = prevAdditionalNonFilteringVars;
+    }
+
+    public List<Object> getPrevAdditionalNonFilteringTypes() {
+        return prevAdditionalNonFilteringTypes;
+    }
+
+    public void setPrevAdditionalNonFilteringTypes(List<Object> prevAdditionalNonFilteringTypes) {
+        this.prevAdditionalNonFilteringTypes = prevAdditionalNonFilteringTypes;
+    }
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
index 2450c6c..99123b3 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
@@ -74,8 +74,8 @@
 
     @Override
     public ILogicalOperator visitAggregateOperator(AggregateOperator op, Void arg) throws AlgebricksException {
-        ArrayList<LogicalVariable> newList = new ArrayList<LogicalVariable>();
-        ArrayList<Mutable<ILogicalExpression>> newExpressions = new ArrayList<Mutable<ILogicalExpression>>();
+        ArrayList<LogicalVariable> newList = new ArrayList<>();
+        ArrayList<Mutable<ILogicalExpression>> newExpressions = new ArrayList<>();
         newList.addAll(op.getVariables());
         deepCopyExpressionRefs(newExpressions, op.getExpressions());
         return new AggregateOperator(newList, newExpressions);
@@ -84,8 +84,8 @@
     @Override
     public ILogicalOperator visitRunningAggregateOperator(RunningAggregateOperator op, Void arg)
             throws AlgebricksException {
-        ArrayList<LogicalVariable> newList = new ArrayList<LogicalVariable>();
-        ArrayList<Mutable<ILogicalExpression>> newExpressions = new ArrayList<Mutable<ILogicalExpression>>();
+        ArrayList<LogicalVariable> newList = new ArrayList<>();
+        ArrayList<Mutable<ILogicalExpression>> newExpressions = new ArrayList<>();
         newList.addAll(op.getVariables());
         deepCopyExpressionRefs(newExpressions, op.getExpressions());
         return new RunningAggregateOperator(newList, newExpressions);
@@ -99,16 +99,14 @@
 
     @Override
     public ILogicalOperator visitGroupByOperator(GroupByOperator op, Void arg) throws AlgebricksException {
-        List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> groupByList = new ArrayList<Pair<LogicalVariable, Mutable<ILogicalExpression>>>();
-        List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> decoList = new ArrayList<Pair<LogicalVariable, Mutable<ILogicalExpression>>>();
-        ArrayList<ILogicalPlan> newSubplans = new ArrayList<ILogicalPlan>();
+        List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> groupByList = new ArrayList<>();
+        List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> decoList = new ArrayList<>();
+        ArrayList<ILogicalPlan> newSubplans = new ArrayList<>();
         for (Pair<LogicalVariable, Mutable<ILogicalExpression>> pair : op.getGroupByList()) {
-            groupByList.add(new Pair<LogicalVariable, Mutable<ILogicalExpression>>(pair.first,
-                    deepCopyExpressionRef(pair.second)));
+            groupByList.add(new Pair<>(pair.first, deepCopyExpressionRef(pair.second)));
         }
         for (Pair<LogicalVariable, Mutable<ILogicalExpression>> pair : op.getDecorList()) {
-            decoList.add(new Pair<LogicalVariable, Mutable<ILogicalExpression>>(pair.first,
-                    deepCopyExpressionRef(pair.second)));
+            decoList.add(new Pair<>(pair.first, deepCopyExpressionRef(pair.second)));
         }
         GroupByOperator gbyOp = new GroupByOperator(groupByList, decoList, newSubplans);
         for (ILogicalPlan plan : op.getNestedPlans()) {
@@ -148,8 +146,8 @@
 
     @Override
     public ILogicalOperator visitAssignOperator(AssignOperator op, Void arg) throws AlgebricksException {
-        ArrayList<LogicalVariable> newList = new ArrayList<LogicalVariable>();
-        ArrayList<Mutable<ILogicalExpression>> newExpressions = new ArrayList<Mutable<ILogicalExpression>>();
+        ArrayList<LogicalVariable> newList = new ArrayList<>();
+        ArrayList<Mutable<ILogicalExpression>> newExpressions = new ArrayList<>();
         newList.addAll(op.getVariables());
         deepCopyExpressionRefs(newExpressions, op.getExpressions());
         return new AssignOperator(newList, newExpressions);
@@ -163,7 +161,7 @@
 
     @Override
     public ILogicalOperator visitProjectOperator(ProjectOperator op, Void arg) throws AlgebricksException {
-        ArrayList<LogicalVariable> newList = new ArrayList<LogicalVariable>();
+        ArrayList<LogicalVariable> newList = new ArrayList<>();
         newList.addAll(op.getVariables());
         return new ProjectOperator(newList);
     }
@@ -171,7 +169,7 @@
     @Override
     public ILogicalOperator visitPartitioningSplitOperator(PartitioningSplitOperator op, Void arg)
             throws AlgebricksException {
-        ArrayList<Mutable<ILogicalExpression>> newExpressions = new ArrayList<Mutable<ILogicalExpression>>();
+        ArrayList<Mutable<ILogicalExpression>> newExpressions = new ArrayList<>();
         deepCopyExpressionRefs(newExpressions, op.getExpressions());
         return new PartitioningSplitOperator(newExpressions, op.getDefaultBranchIndex());
     }
@@ -183,8 +181,8 @@
 
     @Override
     public ILogicalOperator visitScriptOperator(ScriptOperator op, Void arg) throws AlgebricksException {
-        ArrayList<LogicalVariable> newInputList = new ArrayList<LogicalVariable>();
-        ArrayList<LogicalVariable> newOutputList = new ArrayList<LogicalVariable>();
+        ArrayList<LogicalVariable> newInputList = new ArrayList<>();
+        ArrayList<LogicalVariable> newOutputList = new ArrayList<>();
         newInputList.addAll(op.getInputVariables());
         newOutputList.addAll(op.getOutputVariables());
         return new ScriptOperator(op.getScriptDescription(), newInputList, newOutputList);
@@ -192,7 +190,7 @@
 
     @Override
     public ILogicalOperator visitSubplanOperator(SubplanOperator op, Void arg) throws AlgebricksException {
-        ArrayList<ILogicalPlan> newSubplans = new ArrayList<ILogicalPlan>();
+        ArrayList<ILogicalPlan> newSubplans = new ArrayList<>();
         SubplanOperator subplanOp = new SubplanOperator(newSubplans);
         for (ILogicalPlan plan : op.getNestedPlans()) {
             newSubplans.add(OperatorManipulationUtil.deepCopy(plan, subplanOp));
@@ -202,11 +200,10 @@
 
     @Override
     public ILogicalOperator visitUnionOperator(UnionAllOperator op, Void arg) throws AlgebricksException {
-        List<Triple<LogicalVariable, LogicalVariable, LogicalVariable>> newVarMap = new ArrayList<Triple<LogicalVariable, LogicalVariable, LogicalVariable>>();
+        List<Triple<LogicalVariable, LogicalVariable, LogicalVariable>> newVarMap = new ArrayList<>();
         List<Triple<LogicalVariable, LogicalVariable, LogicalVariable>> varMap = op.getVariableMappings();
         for (Triple<LogicalVariable, LogicalVariable, LogicalVariable> triple : varMap) {
-            newVarMap.add(new Triple<LogicalVariable, LogicalVariable, LogicalVariable>(triple.first, triple.second,
-                    triple.third));
+            newVarMap.add(new Triple<>(triple.first, triple.second, triple.third));
         }
         return new UnionAllOperator(newVarMap);
     }
@@ -229,31 +226,31 @@
 
     @Override
     public ILogicalOperator visitUnnestMapOperator(UnnestMapOperator op, Void arg) throws AlgebricksException {
-        ArrayList<LogicalVariable> newInputList = new ArrayList<LogicalVariable>();
+        ArrayList<LogicalVariable> newInputList = new ArrayList<>();
         newInputList.addAll(op.getVariables());
         return new UnnestMapOperator(newInputList, deepCopyExpressionRef(op.getExpressionRef()),
-                new ArrayList<Object>(op.getVariableTypes()), op.propagatesInput());
+                new ArrayList<>(op.getVariableTypes()), op.propagatesInput());
     }
 
     @Override
     public ILogicalOperator visitLeftOuterUnnestMapOperator(LeftOuterUnnestMapOperator op, Void arg)
             throws AlgebricksException {
-        ArrayList<LogicalVariable> newInputList = new ArrayList<LogicalVariable>();
+        ArrayList<LogicalVariable> newInputList = new ArrayList<>();
         newInputList.addAll(op.getVariables());
         return new LeftOuterUnnestMapOperator(newInputList, deepCopyExpressionRef(op.getExpressionRef()),
-                new ArrayList<Object>(op.getVariableTypes()), op.propagatesInput());
+                new ArrayList<>(op.getVariableTypes()), op.propagatesInput());
     }
 
     @Override
     public ILogicalOperator visitDataScanOperator(DataSourceScanOperator op, Void arg) throws AlgebricksException {
-        ArrayList<LogicalVariable> newInputList = new ArrayList<LogicalVariable>();
+        ArrayList<LogicalVariable> newInputList = new ArrayList<>();
         newInputList.addAll(op.getVariables());
         return new DataSourceScanOperator(newInputList, op.getDataSource());
     }
 
     @Override
     public ILogicalOperator visitDistinctOperator(DistinctOperator op, Void arg) throws AlgebricksException {
-        ArrayList<Mutable<ILogicalExpression>> newExpressions = new ArrayList<Mutable<ILogicalExpression>>();
+        ArrayList<Mutable<ILogicalExpression>> newExpressions = new ArrayList<>();
         deepCopyExpressionRefs(newExpressions, op.getExpressions());
         return new DistinctOperator(newExpressions);
     }
@@ -265,7 +262,7 @@
 
     @Override
     public ILogicalOperator visitWriteOperator(WriteOperator op, Void arg) throws AlgebricksException {
-        ArrayList<Mutable<ILogicalExpression>> newExpressions = new ArrayList<Mutable<ILogicalExpression>>();
+        ArrayList<Mutable<ILogicalExpression>> newExpressions = new ArrayList<>();
         deepCopyExpressionRefs(newExpressions, op.getExpressions());
         return new WriteOperator(newExpressions, op.getDataSink());
     }
@@ -273,16 +270,16 @@
     @Override
     public ILogicalOperator visitDistributeResultOperator(DistributeResultOperator op, Void arg)
             throws AlgebricksException {
-        ArrayList<Mutable<ILogicalExpression>> newExpressions = new ArrayList<Mutable<ILogicalExpression>>();
+        ArrayList<Mutable<ILogicalExpression>> newExpressions = new ArrayList<>();
         deepCopyExpressionRefs(newExpressions, op.getExpressions());
         return new DistributeResultOperator(newExpressions, op.getDataSink());
     }
 
     @Override
     public ILogicalOperator visitWriteResultOperator(WriteResultOperator op, Void arg) throws AlgebricksException {
-        ArrayList<Mutable<ILogicalExpression>> newKeyExpressions = new ArrayList<Mutable<ILogicalExpression>>();
+        ArrayList<Mutable<ILogicalExpression>> newKeyExpressions = new ArrayList<>();
         deepCopyExpressionRefs(newKeyExpressions, op.getKeyExpressions());
-        List<Mutable<ILogicalExpression>> newLSMComponentFilterExpressions = new ArrayList<Mutable<ILogicalExpression>>();
+        List<Mutable<ILogicalExpression>> newLSMComponentFilterExpressions = new ArrayList<>();
         deepCopyExpressionRefs(newKeyExpressions, op.getAdditionalFilteringExpressions());
         WriteResultOperator writeResultOp = new WriteResultOperator(op.getDataSource(),
                 deepCopyExpressionRef(op.getPayloadExpression()), newKeyExpressions);
@@ -293,13 +290,13 @@
     @Override
     public ILogicalOperator visitInsertDeleteUpsertOperator(InsertDeleteUpsertOperator op, Void arg)
             throws AlgebricksException {
-        List<Mutable<ILogicalExpression>> newKeyExpressions = new ArrayList<Mutable<ILogicalExpression>>();
+        List<Mutable<ILogicalExpression>> newKeyExpressions = new ArrayList<>();
         deepCopyExpressionRefs(newKeyExpressions, op.getPrimaryKeyExpressions());
-        List<Mutable<ILogicalExpression>> newLSMComponentFilterExpressions = new ArrayList<Mutable<ILogicalExpression>>();
+        List<Mutable<ILogicalExpression>> newLSMComponentFilterExpressions = new ArrayList<>();
         deepCopyExpressionRefs(newKeyExpressions, op.getAdditionalFilteringExpressions());
-        InsertDeleteUpsertOperator insertDeleteOp = new InsertDeleteUpsertOperator(op.getDataSource(),
-                deepCopyExpressionRef(op.getPayloadExpression()), newKeyExpressions, op.getOperation(),
-                op.isBulkload());
+        InsertDeleteUpsertOperator insertDeleteOp =
+                new InsertDeleteUpsertOperator(op.getDataSource(), deepCopyExpressionRef(op.getPayloadExpression()),
+                        newKeyExpressions, op.getOperation(), op.isBulkload());
         insertDeleteOp.setAdditionalFilteringExpressions(newLSMComponentFilterExpressions);
         return insertDeleteOp;
     }
@@ -307,32 +304,32 @@
     @Override
     public ILogicalOperator visitIndexInsertDeleteUpsertOperator(IndexInsertDeleteUpsertOperator op, Void arg)
             throws AlgebricksException {
-        List<Mutable<ILogicalExpression>> newPrimaryKeyExpressions = new ArrayList<Mutable<ILogicalExpression>>();
+        List<Mutable<ILogicalExpression>> newPrimaryKeyExpressions = new ArrayList<>();
         deepCopyExpressionRefs(newPrimaryKeyExpressions, op.getPrimaryKeyExpressions());
-        List<Mutable<ILogicalExpression>> newSecondaryKeyExpressions = new ArrayList<Mutable<ILogicalExpression>>();
+        List<Mutable<ILogicalExpression>> newSecondaryKeyExpressions = new ArrayList<>();
         deepCopyExpressionRefs(newSecondaryKeyExpressions, op.getSecondaryKeyExpressions());
-        Mutable<ILogicalExpression> newFilterExpression = new MutableObject<ILogicalExpression>(
-                ((AbstractLogicalExpression) op.getFilterExpression()).cloneExpression());
-        List<Mutable<ILogicalExpression>> newLSMComponentFilterExpressions = new ArrayList<Mutable<ILogicalExpression>>();
+        Mutable<ILogicalExpression> newFilterExpression =
+                new MutableObject<>(((AbstractLogicalExpression) op.getFilterExpression()).cloneExpression());
+        List<Mutable<ILogicalExpression>> newLSMComponentFilterExpressions = new ArrayList<>();
         deepCopyExpressionRefs(newLSMComponentFilterExpressions, op.getAdditionalFilteringExpressions());
         IndexInsertDeleteUpsertOperator indexInsertDeleteOp = new IndexInsertDeleteUpsertOperator(
                 op.getDataSourceIndex(), newPrimaryKeyExpressions, newSecondaryKeyExpressions, newFilterExpression,
-                op.getOperation(), op.isBulkload());
+                op.getOperation(), op.isBulkload(), op.getNumberOfAdditionalNonFilteringFields());
         indexInsertDeleteOp.setAdditionalFilteringExpressions(newLSMComponentFilterExpressions);
         return indexInsertDeleteOp;
     }
 
     @Override
     public ILogicalOperator visitTokenizeOperator(TokenizeOperator op, Void arg) throws AlgebricksException {
-        List<Mutable<ILogicalExpression>> newPrimaryKeyExpressions = new ArrayList<Mutable<ILogicalExpression>>();
+        List<Mutable<ILogicalExpression>> newPrimaryKeyExpressions = new ArrayList<>();
         deepCopyExpressionRefs(newPrimaryKeyExpressions, op.getPrimaryKeyExpressions());
-        List<Mutable<ILogicalExpression>> newSecondaryKeyExpressions = new ArrayList<Mutable<ILogicalExpression>>();
+        List<Mutable<ILogicalExpression>> newSecondaryKeyExpressions = new ArrayList<>();
         deepCopyExpressionRefs(newSecondaryKeyExpressions, op.getSecondaryKeyExpressions());
-        List<LogicalVariable> newTokenizeVars = new ArrayList<LogicalVariable>();
+        List<LogicalVariable> newTokenizeVars = new ArrayList<>();
         deepCopyVars(newTokenizeVars, op.getTokenizeVars());
-        Mutable<ILogicalExpression> newFilterExpression = new MutableObject<ILogicalExpression>(
-                ((AbstractLogicalExpression) op.getFilterExpression()).cloneExpression());
-        List<Object> newTokenizeVarTypes = new ArrayList<Object>();
+        Mutable<ILogicalExpression> newFilterExpression =
+                new MutableObject<>(((AbstractLogicalExpression) op.getFilterExpression()).cloneExpression());
+        List<Object> newTokenizeVarTypes = new ArrayList<>();
         deepCopyObjects(newTokenizeVarTypes, op.getTokenizeVarTypes());
 
         TokenizeOperator tokenizeOp = new TokenizeOperator(op.getDataSourceIndex(), newPrimaryKeyExpressions,
@@ -349,17 +346,16 @@
     private void deepCopyExpressionRefs(List<Mutable<ILogicalExpression>> newExprs,
             List<Mutable<ILogicalExpression>> oldExprs) {
         for (Mutable<ILogicalExpression> oldExpr : oldExprs) {
-            newExprs.add(new MutableObject<ILogicalExpression>(
-                    ((AbstractLogicalExpression) oldExpr.getValue()).cloneExpression()));
+            newExprs.add(new MutableObject<>(((AbstractLogicalExpression) oldExpr.getValue()).cloneExpression()));
         }
     }
 
     private Mutable<ILogicalExpression> deepCopyExpressionRef(Mutable<ILogicalExpression> oldExprRef) {
         ILogicalExpression oldExpr = oldExprRef.getValue();
         if (oldExpr == null) {
-            return new MutableObject<ILogicalExpression>(null);
+            return new MutableObject<>(null);
         }
-        return new MutableObject<ILogicalExpression>(oldExpr.cloneExpression());
+        return new MutableObject<>(oldExpr.cloneExpression());
     }
 
     private List<LogicalVariable> deepCopyVars(List<LogicalVariable> newVars, List<LogicalVariable> oldVars) {
@@ -376,12 +372,11 @@
         return newObjs;
     }
 
-    private List<Pair<IOrder, Mutable<ILogicalExpression>>> deepCopyOrderAndExpression(
-            List<Pair<IOrder, Mutable<ILogicalExpression>>> ordersAndExprs) {
-        List<Pair<IOrder, Mutable<ILogicalExpression>>> newOrdersAndExprs = new ArrayList<Pair<IOrder, Mutable<ILogicalExpression>>>();
+    private List<Pair<IOrder, Mutable<ILogicalExpression>>>
+            deepCopyOrderAndExpression(List<Pair<IOrder, Mutable<ILogicalExpression>>> ordersAndExprs) {
+        List<Pair<IOrder, Mutable<ILogicalExpression>>> newOrdersAndExprs = new ArrayList<>();
         for (Pair<IOrder, Mutable<ILogicalExpression>> pair : ordersAndExprs) {
-            newOrdersAndExprs
-                    .add(new Pair<IOrder, Mutable<ILogicalExpression>>(pair.first, deepCopyExpressionRef(pair.second)));
+            newOrdersAndExprs.add(new Pair<>(pair.first, deepCopyExpressionRef(pair.second)));
         }
         return newOrdersAndExprs;
     }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeleteUpsertPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeleteUpsertPOperator.java
index f29fd6f..ce86e58 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeleteUpsertPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeleteUpsertPOperator.java
@@ -56,11 +56,12 @@
     private final List<LogicalVariable> additionalFilteringKeys;
     private final List<LogicalVariable> prevSecondaryKeys;
     private final LogicalVariable prevAdditionalFilteringKey;
+    private final int numOfAdditionalNonFilteringFields;
 
     public IndexInsertDeleteUpsertPOperator(List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
             List<LogicalVariable> additionalFilteringKeys, Mutable<ILogicalExpression> filterExpr,
             IDataSourceIndex<?, ?> dataSourceIndex, List<LogicalVariable> prevSecondaryKeys,
-            LogicalVariable prevAdditionalFilteringKey) {
+            LogicalVariable prevAdditionalFilteringKey, int numOfAdditionalNonFilteringFields) {
         this.primaryKeys = primaryKeys;
         this.secondaryKeys = secondaryKeys;
         if (filterExpr != null) {
@@ -72,6 +73,7 @@
         this.additionalFilteringKeys = additionalFilteringKeys;
         this.prevSecondaryKeys = prevSecondaryKeys;
         this.prevAdditionalFilteringKey = prevAdditionalFilteringKey;
+        this.numOfAdditionalNonFilteringFields = numOfAdditionalNonFilteringFields;
     }
 
     @Override
@@ -91,6 +93,9 @@
         List<LogicalVariable> scanVariables = new ArrayList<LogicalVariable>();
         scanVariables.addAll(primaryKeys);
         scanVariables.add(new LogicalVariable(-1));
+        for (int i = 0; i < numOfAdditionalNonFilteringFields; i++) {
+            scanVariables.add(new LogicalVariable(-1));
+        }
         IPhysicalPropertiesVector r = dataSourceIndex.getDataSource().getPropertiesProvider()
                 .computePropertiesVector(scanVariables);
         r.getLocalProperties().clear();
@@ -103,7 +108,7 @@
     @Override
     public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
             IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
-                    throws AlgebricksException {
+            throws AlgebricksException {
         IndexInsertDeleteUpsertOperator insertDeleteUpsertOp = (IndexInsertDeleteUpsertOperator) op;
         IMetadataProvider mp = context.getMetadataProvider();
 
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeleteUpsertPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeleteUpsertPOperator.java
index 0bc683c..3c9cddf 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeleteUpsertPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeleteUpsertPOperator.java
@@ -83,8 +83,7 @@
             IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
         List<LogicalVariable> scanVariables = new ArrayList<LogicalVariable>();
         scanVariables.addAll(keys);
-        // Why do we add $$-1 and not the payLoad variable?
-        scanVariables.add(new LogicalVariable(-1));
+        scanVariables.add(payload);
         if (additionalNonFilteringFields != null) {
             scanVariables.addAll(additionalNonFilteringFields);
         }
@@ -99,7 +98,7 @@
     @Override
     public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
             IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
-                    throws AlgebricksException {
+            throws AlgebricksException {
         InsertDeleteUpsertOperator insertDeleteOp = (InsertDeleteUpsertOperator) op;
         IMetadataProvider mp = context.getMetadataProvider();
         IVariableTypeEnvironment typeEnv = context.getTypeEnvironment(op);
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
index f1f6217..e85c35c 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
@@ -45,11 +45,11 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestMapOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
@@ -378,15 +378,19 @@
             throws AlgebricksException {
         StringBuilder buffer = new StringBuilder();
         String header = getIndexOpString(op.getOperation());
-        addIndent(buffer, indent).append(header).append(op.getDataSource()).append(" from ")
+        addIndent(buffer, indent).append(header).append(op.getDataSource()).append(" from record: ")
                 .append(op.getPayloadExpression().getValue().accept(exprVisitor, indent));
         if (op.getAdditionalNonFilteringExpressions() != null) {
+            buffer.append(", meta: ");
             pprintExprList(op.getAdditionalNonFilteringExpressions(), buffer, indent);
         }
         buffer.append(" partitioned by ");
         pprintExprList(op.getPrimaryKeyExpressions(), buffer, indent);
         if (op.getOperation() == Kind.UPSERT) {
-            buffer.append(" out: ([record-before-upsert:" + op.getPrevRecordVar() + "]) ");
+            buffer.append(" out: ([record-before-upsert:" + op.getPrevRecordVar()
+                    + ((op.getPrevAdditionalNonFilteringVars() != null)
+                            ? (", additional-before-upsert: " + op.getPrevAdditionalNonFilteringVars()) : "")
+                    + "]) ");
         }
         if (op.isBulkload()) {
             buffer.append(" [bulkload]");
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
index 6d3692f..8701851 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
@@ -352,7 +352,8 @@
                         }
                         op.setPhysicalOperator(new IndexInsertDeleteUpsertPOperator(primaryKeys, secondaryKeys,
                                 additionalFilteringKeys, opInsDel.getFilterExpression(), opInsDel.getDataSourceIndex(),
-                                prevSecondaryKeys, prevAdditionalFilteringKey));
+                                prevSecondaryKeys, prevAdditionalFilteringKey,
+                                opInsDel.getNumberOfAdditionalNonFilteringFields()));
                     }
                     break;
 
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/tuples/TypeAwareTupleReference.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/tuples/TypeAwareTupleReference.java
index c6a0035..27a767f 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/tuples/TypeAwareTupleReference.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/tuples/TypeAwareTupleReference.java
@@ -19,14 +19,13 @@
 
 package org.apache.hyracks.storage.am.common.tuples;
 
-import static org.apache.hyracks.util.encoding.VarLenIntEncoderDecoder.VarLenIntDecoder;
-
 import java.nio.ByteBuffer;
 
 import org.apache.hyracks.api.dataflow.value.ITypeTraits;
 import org.apache.hyracks.storage.am.common.api.ITreeIndexFrame;
 import org.apache.hyracks.storage.am.common.api.ITreeIndexTupleReference;
 import org.apache.hyracks.util.encoding.VarLenIntEncoderDecoder;
+import org.apache.hyracks.util.encoding.VarLenIntEncoderDecoder.VarLenIntDecoder;
 
 public class TypeAwareTupleReference implements ITreeIndexTupleReference {
     protected ByteBuffer buf;
@@ -36,7 +35,7 @@
     protected int nullFlagsBytes;
     protected int dataStartOff;
 
-    protected ITypeTraits[] typeTraits;
+    protected final ITypeTraits[] typeTraits;
     protected VarLenIntDecoder encDec = VarLenIntEncoderDecoder.createDecoder();
     protected int[] decodedFieldSlots;
 
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java
index 653c451..f62f70c 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java
@@ -184,7 +184,7 @@
         }
         ComparableFileName cmpBTreeFileName = null;
         ComparableFileName cmpBloomFilterFileName = null;
-        while (btreeFileIter.hasNext() && bloomFilterFileIter.hasNext()) {
+        while (btreeFileIter.hasNext() && (hasBloomFilter ? bloomFilterFileIter.hasNext() : true)) {
             cmpBTreeFileName = btreeFileIter.next();
             if (hasBloomFilter) {
                 cmpBloomFilterFileName = bloomFilterFileIter.next();
diff --git a/pom.xml b/pom.xml
index 131ef02..786ce46 100644
--- a/pom.xml
+++ b/pom.xml
@@ -44,4 +44,35 @@
     <module>hyracks-fullstack</module>
     <module>asterixdb</module>
   </modules>
+  <build>
+      <pluginManagement>
+          <plugins>
+              <!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
+              <plugin>
+                  <groupId>org.eclipse.m2e</groupId>
+                  <artifactId>lifecycle-mapping</artifactId>
+                  <version>1.0.0</version>
+                  <configuration>
+                      <lifecycleMappingMetadata>
+                          <pluginExecutions>
+                              <pluginExecution>
+                                  <pluginExecutionFilter>
+                                      <groupId>org.apache.maven.plugins</groupId>
+                                      <artifactId>maven-checkstyle-plugin</artifactId>
+                                      <versionRange>[2.17,)</versionRange>
+                                      <goals>
+                                          <goal>check</goal>
+                                      </goals>
+                                  </pluginExecutionFilter>
+                                  <action>
+                                      <ignore></ignore>
+                                  </action>
+                              </pluginExecution>
+                          </pluginExecutions>
+                      </lifecycleMappingMetadata>
+                  </configuration>
+              </plugin>
+          </plugins>
+      </pluginManagement>
+  </build>
 </project>