ASTERIXDB-1109: Fixed deletion of records from open secondary index
 - Avoided creating calling record constructor in delete pipeline
 - Fixed the case when multiple open indexes enforce the type of the
  same field & covered it with tests

Change-Id: I41bde91401f67918365de7df19dd2f0de20c73d2
Reviewed-on: https://asterix-gerrit.ics.uci.edu/461
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>
diff --git a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceMaterializationForInsertWithSelfScanRule.java b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceMaterializationForInsertWithSelfScanRule.java
index adce8ce..43ec793 100644
--- a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceMaterializationForInsertWithSelfScanRule.java
+++ b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceMaterializationForInsertWithSelfScanRule.java
@@ -42,6 +42,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.MaterializePOperator;
 import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
 
+// TODO: Reconsider if materialization is needed in delete pipeline
 public class IntroduceMaterializationForInsertWithSelfScanRule implements IAlgebraicRewriteRule {
 
     @Override
diff --git a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java
index 6eb3807..1ff97b3 100644
--- a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java
+++ b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java
@@ -22,14 +22,12 @@
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Set;
 import java.util.Stack;
 
-import org.apache.commons.lang3.ArrayUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.mutable.Mutable;
-import org.apache.commons.lang3.mutable.MutableObject;
-
 import org.apache.asterix.aql.util.FunctionUtils;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.config.DatasetConfig.IndexType;
@@ -53,7 +51,12 @@
 import org.apache.asterix.om.types.AUnionType;
 import org.apache.asterix.om.types.BuiltinType;
 import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.types.hierachy.ATypeHierarchy;
 import org.apache.asterix.om.util.NonTaggedFormatUtil;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
@@ -73,6 +76,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator.Kind;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
@@ -82,7 +86,8 @@
 public class IntroduceSecondaryIndexInsertDeleteRule implements IAlgebraicRewriteRule {
 
     @Override
-    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
         return false;
     }
 
@@ -100,11 +105,14 @@
 
         FunctionIdentifier fid = null;
         /** find the record variable */
-        InsertDeleteOperator insertOp = (InsertDeleteOperator) op1;
-        ILogicalExpression recordExpr = insertOp.getPayloadExpression().getValue();
-        List<LogicalVariable> recordVar = new ArrayList<LogicalVariable>();
+        InsertDeleteOperator insertDeleteOp = (InsertDeleteOperator) op1;
+        ILogicalExpression recordExpr = insertDeleteOp.getPayloadExpression().getValue();
+        LogicalVariable recordVar = null;
+        List<LogicalVariable> usedRecordVars = new ArrayList<>();
         /** assume the payload is always a single variable expression */
-        recordExpr.getUsedVariables(recordVar);
+        recordExpr.getUsedVariables(usedRecordVars);
+        if (usedRecordVars.size() == 1)
+            recordVar = usedRecordVars.get(0);
 
         /**
          * op2 is the assign operator which extract primary keys from the record
@@ -112,7 +120,7 @@
          */
         AbstractLogicalOperator op2 = (AbstractLogicalOperator) op1.getInputs().get(0).getValue();
 
-        if (recordVar.size() == 0) {
+        if (recordVar == null) {
             /**
              * For the case primary key-assignment expressions are constant
              * expressions, find assign op that creates record to be
@@ -135,9 +143,9 @@
                 }
             }
             AssignOperator assignOp2 = (AssignOperator) op2;
-            recordVar.addAll(assignOp2.getVariables());
+            recordVar = assignOp2.getVariables().get(0);
         }
-        AqlDataSource datasetSource = (AqlDataSource) insertOp.getDataSource();
+        AqlDataSource datasetSource = (AqlDataSource) insertDeleteOp.getDataSource();
         AqlMetadataProvider mp = (AqlMetadataProvider) context.getMetadataProvider();
         String dataverseName = datasetSource.getId().getDataverseName();
         String datasetName = datasetSource.getId().getDatasourceName();
@@ -182,19 +190,6 @@
             op0.getInputs().clear();
         }
 
-        // Replicate Operator is applied only when doing the bulk-load.
-        AbstractLogicalOperator replicateOp = null;
-
-        if (secondaryIndexTotalCnt > 1 && insertOp.isBulkload()) {
-            // Split the logical plan into "each secondary index update branch"
-            // to replicate each <PK,RECORD> pair.
-            replicateOp = new ReplicateOperator(secondaryIndexTotalCnt);
-            replicateOp.getInputs().add(new MutableObject<ILogicalOperator>(currentTop));
-            replicateOp.setExecutionMode(ExecutionMode.PARTITIONED);
-            context.computeAndSetTypeEnvironmentForOperator(replicateOp);
-            currentTop = replicateOp;
-        }
-
         // Prepare filtering field information
         List<String> additionalFilteringField = ((InternalDatasetDetails) dataset.getDatasetDetails()).getFilterField();
         List<LogicalVariable> additionalFilteringVars = null;
@@ -206,53 +201,70 @@
             additionalFilteringVars = new ArrayList<LogicalVariable>();
             additionalFilteringAssignExpressions = new ArrayList<Mutable<ILogicalExpression>>();
             additionalFilteringExpressions = new ArrayList<Mutable<ILogicalExpression>>();
-            prepareVarAndExpression(additionalFilteringField, recType.getFieldNames(), recordVar.get(0),
+            prepareVarAndExpression(additionalFilteringField, recType.getFieldNames(), recordVar,
                     additionalFilteringAssignExpressions, additionalFilteringVars, context);
             additionalFilteringAssign = new AssignOperator(additionalFilteringVars,
                     additionalFilteringAssignExpressions);
             for (LogicalVariable var : additionalFilteringVars) {
-                additionalFilteringExpressions.add(new MutableObject<ILogicalExpression>(
-                        new VariableReferenceExpression(var)));
+                additionalFilteringExpressions
+                        .add(new MutableObject<ILogicalExpression>(new VariableReferenceExpression(var)));
             }
         }
+        LogicalVariable enforcedRecordVar = recordVar;
 
-        // Iterate each secondary index and applying Index Update operations.
-        for (Index index : indexes) {
-            List<LogicalVariable> projectVars = new ArrayList<LogicalVariable>();
-            VariableUtilities.getUsedVariables(op1, projectVars);
-            if (!index.isSecondaryIndex()) {
-                continue;
-            }
-            LogicalVariable enforcedRecordVar = recordVar.get(0);
-            hasSecondaryIndex = true;
-            //if the index is enforcing field types
-            if (index.isEnforcingKeyFileds()) {
-                try {
-                    DatasetDataSource ds = (DatasetDataSource) (insertOp.getDataSource());
-                    ARecordType insertRecType = (ARecordType) ds.getSchemaTypes()[ds.getSchemaTypes().length - 1];
-                    LogicalVariable castVar = context.newVar();
-                    ARecordType enforcedType = createEnforcedType(insertRecType, index);
+        if (insertDeleteOp.getOperation() == Kind.INSERT) {
+            try {
+                DatasetDataSource ds = (DatasetDataSource) (insertDeleteOp.getDataSource());
+                ARecordType insertRecType = (ARecordType) ds.getSchemaTypes()[ds.getSchemaTypes().length - 1];
+                LogicalVariable castVar = context.newVar();
+                ARecordType enforcedType = createEnforcedType(insertRecType, indexes);
+                if (!enforcedType.equals(insertRecType)) {
                     //introduce casting to enforced type
                     AbstractFunctionCallExpression castFunc = new ScalarFunctionCallExpression(
                             FunctionUtils.getFunctionInfo(AsterixBuiltinFunctions.CAST_RECORD));
 
                     castFunc.getArguments().add(
-                            new MutableObject<ILogicalExpression>(insertOp.getPayloadExpression().getValue()));
+                            new MutableObject<ILogicalExpression>(insertDeleteOp.getPayloadExpression().getValue()));
                     TypeComputerUtilities.setRequiredAndInputTypes(castFunc, enforcedType, insertRecType);
-                    AssignOperator newAssignOperator = new AssignOperator(castVar,
+                    AssignOperator castedRecordAssignOperator = new AssignOperator(castVar,
                             new MutableObject<ILogicalExpression>(castFunc));
-                    newAssignOperator.getInputs().add(new MutableObject<ILogicalOperator>(currentTop));
-                    currentTop = newAssignOperator;
-                    //project out casted record
-                    projectVars.add(castVar);
+                    castedRecordAssignOperator.getInputs().add(new MutableObject<ILogicalOperator>(currentTop));
+                    currentTop = castedRecordAssignOperator;
                     enforcedRecordVar = castVar;
-                    context.computeAndSetTypeEnvironmentForOperator(newAssignOperator);
-                    context.computeAndSetTypeEnvironmentForOperator(currentTop);
                     recType = enforcedType;
-                } catch (AsterixException e) {
-                    throw new AlgebricksException(e);
+                    context.computeAndSetTypeEnvironmentForOperator(castedRecordAssignOperator);
                 }
+            } catch (AsterixException e) {
+                throw new AlgebricksException(e);
             }
+        }
+        Set<LogicalVariable> projectVars = new HashSet<LogicalVariable>();
+        VariableUtilities.getUsedVariables(op1, projectVars);
+        if (enforcedRecordVar != null)
+            projectVars.add(enforcedRecordVar);
+        ProjectOperator project = new ProjectOperator(new ArrayList<LogicalVariable>(projectVars));
+        project.getInputs().add(new MutableObject<ILogicalOperator>(currentTop));
+        context.computeAndSetTypeEnvironmentForOperator(project);
+        currentTop = project;
+
+        // Replicate Operator is applied only when doing the bulk-load.
+        AbstractLogicalOperator replicateOp = null;
+        if (secondaryIndexTotalCnt > 1 && insertDeleteOp.isBulkload()) {
+            // Split the logical plan into "each secondary index update branch"
+            // to replicate each <PK,RECORD> pair.
+            replicateOp = new ReplicateOperator(secondaryIndexTotalCnt);
+            replicateOp.getInputs().add(new MutableObject<ILogicalOperator>(currentTop));
+            replicateOp.setExecutionMode(ExecutionMode.PARTITIONED);
+            context.computeAndSetTypeEnvironmentForOperator(replicateOp);
+            currentTop = replicateOp;
+        }
+
+        // Iterate each secondary index and applying Index Update operations.
+        for (Index index : indexes) {
+            if (!index.isSecondaryIndex()) {
+                continue;
+            }
+            hasSecondaryIndex = true;
 
             List<List<String>> secondaryKeyFields = index.getKeyFieldNames();
             List<IAType> secondaryKeyTypes = index.getKeyFieldTypes();
@@ -266,39 +278,35 @@
             }
 
             AssignOperator assign = new AssignOperator(secondaryKeyVars, expressions);
-            ProjectOperator project = new ProjectOperator(projectVars);
 
+            ILogicalOperator filterOrAssignOp = null;
             if (additionalFilteringAssign != null) {
-                additionalFilteringAssign.getInputs().add(new MutableObject<ILogicalOperator>(project));
+                filterOrAssignOp = additionalFilteringAssign;
                 assign.getInputs().add(new MutableObject<ILogicalOperator>(additionalFilteringAssign));
             } else {
-                assign.getInputs().add(new MutableObject<ILogicalOperator>(project));
+                filterOrAssignOp = assign;
             }
 
             // Only apply replicate operator when doing bulk-load
-            if (secondaryIndexTotalCnt > 1 && insertOp.isBulkload())
-                project.getInputs().add(new MutableObject<ILogicalOperator>(replicateOp));
+            if (secondaryIndexTotalCnt > 1 && insertDeleteOp.isBulkload())
+                filterOrAssignOp.getInputs().add(new MutableObject<ILogicalOperator>(replicateOp));
             else
-                project.getInputs().add(new MutableObject<ILogicalOperator>(currentTop));
-
-            context.computeAndSetTypeEnvironmentForOperator(project);
+                filterOrAssignOp.getInputs().add(new MutableObject<ILogicalOperator>(currentTop));
 
             if (additionalFilteringAssign != null) {
                 context.computeAndSetTypeEnvironmentForOperator(additionalFilteringAssign);
             }
-
             context.computeAndSetTypeEnvironmentForOperator(assign);
             currentTop = assign;
 
             // BTree, Keyword, or n-gram index case
-            if (index.getIndexType() == IndexType.BTREE
-                    || index.getIndexType() == IndexType.SINGLE_PARTITION_WORD_INVIX
+            if (index.getIndexType() == IndexType.BTREE || index.getIndexType() == IndexType.SINGLE_PARTITION_WORD_INVIX
                     || index.getIndexType() == IndexType.SINGLE_PARTITION_NGRAM_INVIX
                     || index.getIndexType() == IndexType.LENGTH_PARTITIONED_WORD_INVIX
                     || index.getIndexType() == IndexType.LENGTH_PARTITIONED_NGRAM_INVIX) {
                 for (LogicalVariable secondaryKeyVar : secondaryKeyVars) {
-                    secondaryExpressions.add(new MutableObject<ILogicalExpression>(new VariableReferenceExpression(
-                            secondaryKeyVar)));
+                    secondaryExpressions.add(
+                            new MutableObject<ILogicalExpression>(new VariableReferenceExpression(secondaryKeyVar)));
                 }
                 Mutable<ILogicalExpression> filterExpression = createFilterExpression(secondaryKeyVars,
                         context.getOutputTypeEnvironment(currentTop), false);
@@ -306,7 +314,7 @@
 
                 // Introduce the TokenizeOperator only when doing bulk-load,
                 // and index type is keyword or n-gram.
-                if (index.getIndexType() != IndexType.BTREE && insertOp.isBulkload()) {
+                if (index.getIndexType() != IndexType.BTREE && insertDeleteOp.isBulkload()) {
 
                     // Check whether the index is length-partitioned or not.
                     // If partitioned, [input variables to TokenizeOperator,
@@ -326,8 +334,8 @@
                     List<Mutable<ILogicalExpression>> tokenizeKeyExprs = new ArrayList<Mutable<ILogicalExpression>>();
                     LogicalVariable tokenVar = context.newVar();
                     tokenizeKeyVars.add(tokenVar);
-                    tokenizeKeyExprs.add(new MutableObject<ILogicalExpression>(
-                            new VariableReferenceExpression(tokenVar)));
+                    tokenizeKeyExprs
+                            .add(new MutableObject<ILogicalExpression>(new VariableReferenceExpression(tokenVar)));
 
                     // Check the field type of the secondary key.
                     IAType secondaryKeyType = null;
@@ -345,21 +353,22 @@
                     if (isPartitioned) {
                         LogicalVariable lengthVar = context.newVar();
                         tokenizeKeyVars.add(lengthVar);
-                        tokenizeKeyExprs.add(new MutableObject<ILogicalExpression>(new VariableReferenceExpression(
-                                lengthVar)));
+                        tokenizeKeyExprs
+                                .add(new MutableObject<ILogicalExpression>(new VariableReferenceExpression(lengthVar)));
                         varTypes.add(BuiltinType.SHORTWITHOUTTYPEINFO);
                     }
 
                     // TokenizeOperator to tokenize [SK, PK] pairs
                     TokenizeOperator tokenUpdate = new TokenizeOperator(dataSourceIndex,
-                            insertOp.getPrimaryKeyExpressions(), secondaryExpressions, tokenizeKeyVars,
-                            filterExpression, insertOp.getOperation(), insertOp.isBulkload(), isPartitioned, varTypes);
+                            insertDeleteOp.getPrimaryKeyExpressions(), secondaryExpressions, tokenizeKeyVars,
+                            filterExpression, insertDeleteOp.getOperation(), insertDeleteOp.isBulkload(), isPartitioned,
+                            varTypes);
                     tokenUpdate.getInputs().add(new MutableObject<ILogicalOperator>(assign));
                     context.computeAndSetTypeEnvironmentForOperator(tokenUpdate);
 
                     IndexInsertDeleteOperator indexUpdate = new IndexInsertDeleteOperator(dataSourceIndex,
-                            insertOp.getPrimaryKeyExpressions(), tokenizeKeyExprs, filterExpression,
-                            insertOp.getOperation(), insertOp.isBulkload());
+                            insertDeleteOp.getPrimaryKeyExpressions(), tokenizeKeyExprs, filterExpression,
+                            insertDeleteOp.getOperation(), insertDeleteOp.isBulkload());
                     indexUpdate.setAdditionalFilteringExpressions(additionalFilteringExpressions);
                     indexUpdate.getInputs().add(new MutableObject<ILogicalOperator>(tokenUpdate));
 
@@ -371,15 +380,15 @@
                 } else {
                     // When TokenizeOperator is not needed
                     IndexInsertDeleteOperator indexUpdate = new IndexInsertDeleteOperator(dataSourceIndex,
-                            insertOp.getPrimaryKeyExpressions(), secondaryExpressions, filterExpression,
-                            insertOp.getOperation(), insertOp.isBulkload());
+                            insertDeleteOp.getPrimaryKeyExpressions(), secondaryExpressions, filterExpression,
+                            insertDeleteOp.getOperation(), insertDeleteOp.isBulkload());
                     indexUpdate.setAdditionalFilteringExpressions(additionalFilteringExpressions);
                     indexUpdate.getInputs().add(new MutableObject<ILogicalOperator>(currentTop));
 
                     currentTop = indexUpdate;
                     context.computeAndSetTypeEnvironmentForOperator(indexUpdate);
 
-                    if (insertOp.isBulkload())
+                    if (insertDeleteOp.isBulkload())
                         op0.getInputs().add(new MutableObject<ILogicalOperator>(currentTop));
 
                 }
@@ -397,20 +406,17 @@
                     keyVarList.add(keyVar);
                     AbstractFunctionCallExpression createMBR = new ScalarFunctionCallExpression(
                             FunctionUtils.getFunctionInfo(AsterixBuiltinFunctions.CREATE_MBR));
-                    createMBR.getArguments().add(
-                            new MutableObject<ILogicalExpression>(new VariableReferenceExpression(secondaryKeyVars
-                                    .get(0))));
-                    createMBR.getArguments().add(
-                            new MutableObject<ILogicalExpression>(new ConstantExpression(new AsterixConstantValue(
-                                    new AInt32(dimension)))));
-                    createMBR.getArguments().add(
-                            new MutableObject<ILogicalExpression>(new ConstantExpression(new AsterixConstantValue(
-                                    new AInt32(i)))));
+                    createMBR.getArguments().add(new MutableObject<ILogicalExpression>(
+                            new VariableReferenceExpression(secondaryKeyVars.get(0))));
+                    createMBR.getArguments().add(new MutableObject<ILogicalExpression>(
+                            new ConstantExpression(new AsterixConstantValue(new AInt32(dimension)))));
+                    createMBR.getArguments().add(new MutableObject<ILogicalExpression>(
+                            new ConstantExpression(new AsterixConstantValue(new AInt32(i)))));
                     keyExprList.add(new MutableObject<ILogicalExpression>(createMBR));
                 }
                 for (LogicalVariable secondaryKeyVar : keyVarList) {
-                    secondaryExpressions.add(new MutableObject<ILogicalExpression>(new VariableReferenceExpression(
-                            secondaryKeyVar)));
+                    secondaryExpressions.add(
+                            new MutableObject<ILogicalExpression>(new VariableReferenceExpression(secondaryKeyVar)));
                 }
                 AssignOperator assignCoordinates = new AssignOperator(keyVarList, keyExprList);
                 assignCoordinates.getInputs().add(new MutableObject<ILogicalOperator>(currentTop));
@@ -422,14 +428,14 @@
                         context.getOutputTypeEnvironment(assignCoordinates), forceFilter);
                 AqlIndex dataSourceIndex = new AqlIndex(index, dataverseName, datasetName, mp);
                 IndexInsertDeleteOperator indexUpdate = new IndexInsertDeleteOperator(dataSourceIndex,
-                        insertOp.getPrimaryKeyExpressions(), secondaryExpressions, filterExpression,
-                        insertOp.getOperation(), insertOp.isBulkload());
+                        insertDeleteOp.getPrimaryKeyExpressions(), secondaryExpressions, filterExpression,
+                        insertDeleteOp.getOperation(), insertDeleteOp.isBulkload());
                 indexUpdate.setAdditionalFilteringExpressions(additionalFilteringExpressions);
                 indexUpdate.getInputs().add(new MutableObject<ILogicalOperator>(assignCoordinates));
                 currentTop = indexUpdate;
                 context.computeAndSetTypeEnvironmentForOperator(indexUpdate);
 
-                if (insertOp.isBulkload())
+                if (insertDeleteOp.isBulkload())
                     op0.getInputs().add(new MutableObject<ILogicalOperator>(currentTop));
 
             }
@@ -439,79 +445,107 @@
             return false;
         }
 
-        if (!insertOp.isBulkload()) {
+        if (!insertDeleteOp.isBulkload()) {
             op0.getInputs().clear();
             op0.getInputs().add(new MutableObject<ILogicalOperator>(currentTop));
         }
         return true;
     }
 
-    public static ARecordType createEnforcedType(ARecordType initialType, Index index) throws AsterixException,
-            AlgebricksException {
+    // Merges typed index fields with specified recordType, allowing indexed fields to be optional.
+    // I.e. the type { "personId":int32, "name": string, "address" : { "street": string } } with typed indexes on age:int32, address.state:string
+    //      will be merged into type { "personId":int32, "name": string, "age": int32? "address" : { "street": string, "state": string? } }
+    // Used by open indexes to enforce the type of an indexed record
+    public static ARecordType createEnforcedType(ARecordType initialType, List<Index> indexes)
+            throws AsterixException, AlgebricksException {
         ARecordType enforcedType = initialType;
-        for (int i = 0; i < index.getKeyFieldNames().size(); i++) {
-            try {
-                Stack<Pair<ARecordType, String>> nestedTypeStack = new Stack<Pair<ARecordType, String>>();
-                List<String> splits = index.getKeyFieldNames().get(i);
-                ARecordType nestedFieldType = enforcedType;
-                boolean openRecords = false;
-                String bridgeName = nestedFieldType.getTypeName();
-                int j;
-                //Build the stack for the enforced type
-                for (j = 1; j < splits.size(); j++) {
-                    nestedTypeStack.push(new Pair<ARecordType, String>(nestedFieldType, splits.get(j - 1)));
-                    bridgeName = nestedFieldType.getTypeName();
-                    nestedFieldType = (ARecordType) enforcedType.getSubFieldType(splits.subList(0, j));
-                    if (nestedFieldType == null) {
-                        openRecords = true;
-                        break;
+        for (Index index : indexes) {
+            if (!index.isSecondaryIndex() || !index.isEnforcingKeyFileds()) {
+                continue;
+            }
+            for (int i = 0; i < index.getKeyFieldNames().size(); i++) {
+                try {
+                    Stack<Pair<ARecordType, String>> nestedTypeStack = new Stack<Pair<ARecordType, String>>();
+                    List<String> splits = index.getKeyFieldNames().get(i);
+                    ARecordType nestedFieldType = enforcedType;
+                    boolean openRecords = false;
+                    String bridgeName = nestedFieldType.getTypeName();
+                    int j;
+                    //Build the stack for the enforced type
+                    for (j = 1; j < splits.size(); j++) {
+                        nestedTypeStack.push(new Pair<ARecordType, String>(nestedFieldType, splits.get(j - 1)));
+                        bridgeName = nestedFieldType.getTypeName();
+                        nestedFieldType = (ARecordType) enforcedType.getSubFieldType(splits.subList(0, j));
+                        if (nestedFieldType == null) {
+                            openRecords = true;
+                            break;
+                        }
                     }
-                }
-                if (openRecords == true) {
-                    //create the smallest record
-                    enforcedType = new ARecordType(splits.get(splits.size() - 2), new String[] { splits.get(splits
-                            .size() - 1) }, new IAType[] { AUnionType.createNullableType(index.getKeyFieldTypes()
-                            .get(i)) }, true);
-                    //create the open part of the nested field
-                    for (int k = splits.size() - 3; k > (j - 2); k--) {
-                        enforcedType = new ARecordType(splits.get(k), new String[] { splits.get(k + 1) },
-                                new IAType[] { AUnionType.createNullableType(enforcedType) }, true);
+                    if (openRecords == true) {
+                        //create the smallest record
+                        enforcedType = new ARecordType(splits.get(splits.size() - 2),
+                                new String[] { splits.get(splits.size() - 1) },
+                                new IAType[] { AUnionType.createNullableType(index.getKeyFieldTypes().get(i)) }, true);
+                        //create the open part of the nested field
+                        for (int k = splits.size() - 3; k > (j - 2); k--) {
+                            enforcedType = new ARecordType(splits.get(k), new String[] { splits.get(k + 1) },
+                                    new IAType[] { AUnionType.createNullableType(enforcedType) }, true);
+                        }
+                        //Bridge the gap
+                        Pair<ARecordType, String> gapPair = nestedTypeStack.pop();
+                        ARecordType parent = gapPair.first;
+
+                        IAType[] parentFieldTypes = ArrayUtils.addAll(parent.getFieldTypes().clone(),
+                                new IAType[] { AUnionType.createNullableType(enforcedType) });
+                        enforcedType = new ARecordType(bridgeName,
+                                ArrayUtils.addAll(parent.getFieldNames(), enforcedType.getTypeName()), parentFieldTypes,
+                                true);
+
+                    } else {
+                        //Schema is closed all the way to the field
+                        //enforced fields are either null or strongly typed
+                        LinkedHashMap<String, IAType> recordNameTypesMap = new LinkedHashMap<String, IAType>();
+                        for (j = 0; j < nestedFieldType.getFieldNames().length; j++) {
+                            recordNameTypesMap.put(nestedFieldType.getFieldNames()[j],
+                                    nestedFieldType.getFieldTypes()[j]);
+                        }
+                        // if a an enforced field already exists and the type is correct
+                        IAType enforcedFieldType = recordNameTypesMap.get(splits.get(splits.size() - 1));
+                        if (enforcedFieldType != null && enforcedFieldType.getTypeTag() == ATypeTag.UNION
+                                && ((AUnionType) enforcedFieldType).isNullableType())
+                            enforcedFieldType = ((AUnionType) enforcedFieldType).getNullableType();
+                        if (enforcedFieldType != null && !ATypeHierarchy.canPromote(enforcedFieldType.getTypeTag(),
+                                index.getKeyFieldTypes().get(i).getTypeTag()))
+                            throw new AlgebricksException("Cannot enforce field " + index.getKeyFieldNames().get(i)
+                                    + " to have type " + index.getKeyFieldTypes().get(i));
+                        if (enforcedFieldType == null)
+                            recordNameTypesMap.put(splits.get(splits.size() - 1),
+                                    AUnionType.createNullableType(index.getKeyFieldTypes().get(i)));
+                        enforcedType = new ARecordType(nestedFieldType.getTypeName(),
+                                recordNameTypesMap.keySet().toArray(new String[recordNameTypesMap.size()]),
+                                recordNameTypesMap.values().toArray(new IAType[recordNameTypesMap.size()]),
+                                nestedFieldType.isOpen());
                     }
-                    //Bridge the gap
-                    Pair<ARecordType, String> gapPair = nestedTypeStack.pop();
-                    ARecordType parent = gapPair.first;
 
-                    IAType[] parentFieldTypes = ArrayUtils.addAll(parent.getFieldTypes().clone(),
-                            new IAType[] { AUnionType.createNullableType(enforcedType) });
-                    enforcedType = new ARecordType(bridgeName, ArrayUtils.addAll(parent.getFieldNames(),
-                            enforcedType.getTypeName()), parentFieldTypes, true);
-
-                } else {
-                    //Schema is closed all the way to the field
-                    //enforced fields are either null or strongly typed
-                    enforcedType = new ARecordType(nestedFieldType.getTypeName(), ArrayUtils.addAll(
-                            nestedFieldType.getFieldNames(), splits.get(splits.size() - 1)), ArrayUtils.addAll(
-                            nestedFieldType.getFieldTypes(),
-                            AUnionType.createNullableType(index.getKeyFieldTypes().get(i))), nestedFieldType.isOpen());
-                }
-
-                //Create the enforcedtype for the nested fields in the schema, from the ground up
-                if (nestedTypeStack.size() > 0) {
-                    while (!nestedTypeStack.isEmpty()) {
-                        Pair<ARecordType, String> nestedTypePair = nestedTypeStack.pop();
-                        ARecordType nestedRecType = nestedTypePair.first;
-                        IAType[] nestedRecTypeFieldTypes = nestedRecType.getFieldTypes().clone();
-                        nestedRecTypeFieldTypes[nestedRecType.findFieldPosition(nestedTypePair.second)] = enforcedType;
-                        enforcedType = new ARecordType(nestedRecType.getTypeName(), nestedRecType.getFieldNames(),
-                                nestedRecTypeFieldTypes, nestedRecType.isOpen());
+                    //Create the enforcedtype for the nested fields in the schema, from the ground up
+                    if (nestedTypeStack.size() > 0) {
+                        while (!nestedTypeStack.isEmpty()) {
+                            Pair<ARecordType, String> nestedTypePair = nestedTypeStack.pop();
+                            ARecordType nestedRecType = nestedTypePair.first;
+                            IAType[] nestedRecTypeFieldTypes = nestedRecType.getFieldTypes().clone();
+                            nestedRecTypeFieldTypes[nestedRecType
+                                    .findFieldPosition(nestedTypePair.second)] = enforcedType;
+                            enforcedType = new ARecordType(nestedRecType.getTypeName() + "_enforced",
+                                    nestedRecType.getFieldNames(), nestedRecTypeFieldTypes, nestedRecType.isOpen());
+                        }
                     }
-                }
 
-            } catch (AsterixException e) {
-                throw new AlgebricksException("Cannot enforce typed fields "
-                        + StringUtils.join(index.getKeyFieldNames()), e);
-            } catch (IOException e) {
-                throw new AsterixException(e);
+                } catch (AsterixException e) {
+                    throw new AlgebricksException(
+                            "Cannot enforce typed fields " + StringUtils.join(index.getKeyFieldNames()), e);
+                } catch (IOException e) {
+                    throw new AsterixException(e);
+                }
             }
         }
         return enforcedType;
@@ -520,9 +554,9 @@
     @SuppressWarnings("unchecked")
     private void prepareVarAndExpression(List<String> field, String[] fieldNames, LogicalVariable recordVar,
             List<Mutable<ILogicalExpression>> expressions, List<LogicalVariable> vars, IOptimizationContext context)
-            throws AlgebricksException {
-        Mutable<ILogicalExpression> varRef = new MutableObject<ILogicalExpression>(new VariableReferenceExpression(
-                recordVar));
+                    throws AlgebricksException {
+        Mutable<ILogicalExpression> varRef = new MutableObject<ILogicalExpression>(
+                new VariableReferenceExpression(recordVar));
         int pos = -1;
         if (field.size() == 1) {
             for (int j = 0; j < fieldNames.length; j++) {
@@ -539,14 +573,14 @@
                 for (int i = 0; i < field.size(); i++) {
                     fieldList.add(new AString(field.get(i)));
                 }
-                Mutable<ILogicalExpression> fieldRef = new MutableObject<ILogicalExpression>(new ConstantExpression(
-                        new AsterixConstantValue(fieldList)));
+                Mutable<ILogicalExpression> fieldRef = new MutableObject<ILogicalExpression>(
+                        new ConstantExpression(new AsterixConstantValue(fieldList)));
                 //Create an expression for the nested case
                 func = new ScalarFunctionCallExpression(
                         FunctionUtils.getFunctionInfo(AsterixBuiltinFunctions.FIELD_ACCESS_NESTED), varRef, fieldRef);
             } else {
-                Mutable<ILogicalExpression> fieldRef = new MutableObject<ILogicalExpression>(new ConstantExpression(
-                        new AsterixConstantValue(new AString(field.get(0)))));
+                Mutable<ILogicalExpression> fieldRef = new MutableObject<ILogicalExpression>(
+                        new ConstantExpression(new AsterixConstantValue(new AString(field.get(0)))));
                 //Create an expression for the open field case (By name)
                 func = new ScalarFunctionCallExpression(
                         FunctionUtils.getFunctionInfo(AsterixBuiltinFunctions.FIELD_ACCESS_BY_NAME), varRef, fieldRef);
@@ -556,8 +590,8 @@
             vars.add(newVar);
         } else {
             // Assumes the indexed field is in the closed portion of the type.
-            Mutable<ILogicalExpression> indexRef = new MutableObject<ILogicalExpression>(new ConstantExpression(
-                    new AsterixConstantValue(new AInt32(pos))));
+            Mutable<ILogicalExpression> indexRef = new MutableObject<ILogicalExpression>(
+                    new ConstantExpression(new AsterixConstantValue(new AInt32(pos))));
             AbstractFunctionCallExpression func = new ScalarFunctionCallExpression(
                     FunctionUtils.getFunctionInfo(AsterixBuiltinFunctions.FIELD_ACCESS_BY_INDEX), varRef, indexRef);
             expressions.add(new MutableObject<ILogicalExpression>(func));
@@ -581,8 +615,8 @@
                     FunctionUtils.getFunctionInfo(AsterixBuiltinFunctions.IS_NULL),
                     new MutableObject<ILogicalExpression>(new VariableReferenceExpression(secondaryKeyVar)));
             ScalarFunctionCallExpression notFuncExpr = new ScalarFunctionCallExpression(
-                    FunctionUtils.getFunctionInfo(AsterixBuiltinFunctions.NOT), new MutableObject<ILogicalExpression>(
-                            isNullFuncExpr));
+                    FunctionUtils.getFunctionInfo(AsterixBuiltinFunctions.NOT),
+                    new MutableObject<ILogicalExpression>(isNullFuncExpr));
             filterExpressions.add(new MutableObject<ILogicalExpression>(notFuncExpr));
         }
         // No nullable secondary keys.
diff --git a/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java b/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java
index 25649ee..26f6bd3 100644
--- a/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java
+++ b/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java
@@ -27,13 +27,9 @@
 import org.apache.asterix.aql.base.Statement.Kind;
 import org.apache.asterix.aql.expression.CallExpr;
 import org.apache.asterix.aql.expression.FLWOGRExpression;
-import org.apache.asterix.aql.expression.FieldAccessor;
-import org.apache.asterix.aql.expression.FieldBinding;
 import org.apache.asterix.aql.expression.ForClause;
-import org.apache.asterix.aql.expression.Identifier;
 import org.apache.asterix.aql.expression.LiteralExpr;
 import org.apache.asterix.aql.expression.Query;
-import org.apache.asterix.aql.expression.RecordConstructor;
 import org.apache.asterix.aql.expression.VariableExpr;
 import org.apache.asterix.aql.expression.WhereClause;
 import org.apache.asterix.aql.literal.StringLiteral;
@@ -42,8 +38,6 @@
 import org.apache.asterix.common.functions.FunctionConstants;
 import org.apache.asterix.common.functions.FunctionSignature;
 import org.apache.asterix.metadata.declared.AqlMetadataProvider;
-import org.apache.asterix.metadata.entities.Dataset;
-import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.IAType;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 
@@ -83,8 +77,8 @@
 
     // added by yasser
     public static class CompiledCreateDataverseStatement implements ICompiledStatement {
-        private String dataverseName;
-        private String format;
+        private final String dataverseName;
+        private final String format;
 
         public CompiledCreateDataverseStatement(String dataverseName, String format) {
             this.dataverseName = dataverseName;
@@ -106,7 +100,7 @@
     }
 
     public static class CompiledNodeGroupDropStatement implements ICompiledStatement {
-        private String nodeGroupName;
+        private final String nodeGroupName;
 
         public CompiledNodeGroupDropStatement(String nodeGroupName) {
             this.nodeGroupName = nodeGroupName;
@@ -123,9 +117,9 @@
     }
 
     public static class CompiledIndexDropStatement implements ICompiledStatement {
-        private String dataverseName;
-        private String datasetName;
-        private String indexName;
+        private final String dataverseName;
+        private final String datasetName;
+        private final String indexName;
 
         public CompiledIndexDropStatement(String dataverseName, String datasetName, String indexName) {
             this.dataverseName = dataverseName;
@@ -152,8 +146,8 @@
     }
 
     public static class CompiledDataverseDropStatement implements ICompiledStatement {
-        private String dataverseName;
-        private boolean ifExists;
+        private final String dataverseName;
+        private final boolean ifExists;
 
         public CompiledDataverseDropStatement(String dataverseName, boolean ifExists) {
             this.dataverseName = dataverseName;
@@ -175,7 +169,7 @@
     }
 
     public static class CompiledTypeDropStatement implements ICompiledStatement {
-        private String typeName;
+        private final String typeName;
 
         public CompiledTypeDropStatement(String nodeGroupName) {
             this.typeName = nodeGroupName;
@@ -211,7 +205,8 @@
         private final int gramLength;
 
         public CompiledCreateIndexStatement(String indexName, String dataverseName, String datasetName,
-                List<List<String>> keyFields, List<IAType> keyTypes, boolean isEnforced, int gramLength, IndexType indexType) {
+                List<List<String>> keyFields, List<IAType> keyTypes, boolean isEnforced, int gramLength,
+                IndexType indexType) {
             this.indexName = indexName;
             this.dataverseName = dataverseName;
             this.datasetName = datasetName;
@@ -222,10 +217,12 @@
             this.indexType = indexType;
         }
 
+        @Override
         public String getDatasetName() {
             return datasetName;
         }
 
+        @Override
         public String getDataverseName() {
             return dataverseName;
         }
@@ -261,11 +258,11 @@
     }
 
     public static class CompiledLoadFromFileStatement implements ICompiledDmlStatement {
-        private String dataverseName;
-        private String datasetName;
-        private boolean alreadySorted;
-        private String adapter;
-        private Map<String, String> properties;
+        private final String dataverseName;
+        private final String datasetName;
+        private final boolean alreadySorted;
+        private final String adapter;
+        private final Map<String, String> properties;
 
         public CompiledLoadFromFileStatement(String dataverseName, String datasetName, String adapter,
                 Map<String, String> properties, boolean alreadySorted) {
@@ -276,10 +273,12 @@
             this.properties = properties;
         }
 
+        @Override
         public String getDataverseName() {
             return dataverseName;
         }
 
+        @Override
         public String getDatasetName() {
             return datasetName;
         }
@@ -315,10 +314,12 @@
             this.varCounter = varCounter;
         }
 
+        @Override
         public String getDataverseName() {
             return dataverseName;
         }
 
+        @Override
         public String getDatasetName() {
             return datasetName;
         }
@@ -338,12 +339,12 @@
     }
 
     public static class CompiledConnectFeedStatement implements ICompiledDmlStatement {
-        private String dataverseName;
-        private String feedName;
-        private String datasetName;
-        private String policyName;
+        private final String dataverseName;
+        private final String feedName;
+        private final String datasetName;
+        private final String policyName;
         private Query query;
-        private int varCounter;
+        private final int varCounter;
 
         public CompiledConnectFeedStatement(String dataverseName, String feedName, String datasetName,
                 String policyName, Query query, int varCounter) {
@@ -390,7 +391,7 @@
             return policyName;
         }
     }
-    
+
     public static class CompiledSubscribeFeedStatement implements ICompiledDmlStatement {
 
         private final FeedConnectionRequest request;
@@ -432,11 +433,10 @@
 
     }
 
-
     public static class CompiledDisconnectFeedStatement implements ICompiledDmlStatement {
-        private String dataverseName;
-        private String datasetName;
-        private String feedName;
+        private final String dataverseName;
+        private final String datasetName;
+        private final String feedName;
         private Query query;
         private int varCounter;
 
@@ -476,15 +476,15 @@
     }
 
     public static class CompiledDeleteStatement implements ICompiledDmlStatement {
-        private VariableExpr var;
-        private String dataverseName;
-        private String datasetName;
-        private Expression condition;
-        private int varCounter;
-        private AqlMetadataProvider metadataProvider;
+        private final VariableExpr var;
+        private final String dataverseName;
+        private final String datasetName;
+        private final Expression condition;
+        private final int varCounter;
+        private final AqlMetadataProvider metadataProvider;
 
-        public CompiledDeleteStatement(VariableExpr var, String dataverseName, String datasetName,
-                Expression condition, int varCounter, AqlMetadataProvider metadataProvider) {
+        public CompiledDeleteStatement(VariableExpr var, String dataverseName, String datasetName, Expression condition,
+                int varCounter, AqlMetadataProvider metadataProvider) {
             this.var = var;
             this.dataverseName = dataverseName;
             this.datasetName = datasetName;
@@ -529,23 +529,7 @@
                 clauseList.add(whereClause);
             }
 
-            Dataset dataset = metadataProvider.findDataset(dataverseName, datasetName);
-            if (dataset == null) {
-                throw new AlgebricksException("Unknown dataset " + datasetName);
-            }
-            String itemTypeName = dataset.getItemTypeName();
-            IAType itemType = metadataProvider.findType(dataset.getDataverseName(), itemTypeName);
-            ARecordType recType = (ARecordType) itemType;
-            String[] fieldNames = recType.getFieldNames();
-            List<FieldBinding> fieldBindings = new ArrayList<FieldBinding>();
-            for (int i = 0; i < fieldNames.length; i++) {
-                FieldAccessor fa = new FieldAccessor(var, new Identifier(fieldNames[i]));
-                FieldBinding fb = new FieldBinding(new LiteralExpr(new StringLiteral(fieldNames[i])), fa);
-                fieldBindings.add(fb);
-            }
-            RecordConstructor rc = new RecordConstructor(fieldBindings);
-
-            FLWOGRExpression flowgr = new FLWOGRExpression(clauseList, rc);
+            FLWOGRExpression flowgr = new FLWOGRExpression(clauseList, var);
             Query query = new Query();
             query.setBody(flowgr);
             return query;
@@ -592,7 +576,8 @@
         private final int gramLength;
 
         public CompiledIndexCompactStatement(String dataverseName, String datasetName, String indexName,
-                List<List<String>> keyFields, List<IAType> keyTypes, boolean isEnforced, int gramLength, IndexType indexType) {
+                List<List<String>> keyFields, List<IAType> keyTypes, boolean isEnforced, int gramLength,
+                IndexType indexType) {
             super(dataverseName, datasetName);
             this.indexName = indexName;
             this.keyFields = keyFields;
diff --git a/asterix-app/src/main/java/org/apache/asterix/aql/translator/AqlTranslator.java b/asterix-app/src/main/java/org/apache/asterix/aql/translator/AqlTranslator.java
index dcfbc98..034e1f5 100644
--- a/asterix-app/src/main/java/org/apache/asterix/aql/translator/AqlTranslator.java
+++ b/asterix-app/src/main/java/org/apache/asterix/aql/translator/AqlTranslator.java
@@ -39,11 +39,6 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.apache.commons.lang3.StringUtils;
-import org.json.JSONArray;
-import org.json.JSONException;
-import org.json.JSONObject;
-
 import org.apache.asterix.api.common.APIFramework;
 import org.apache.asterix.api.common.Job;
 import org.apache.asterix.api.common.SessionConfig;
@@ -170,10 +165,12 @@
 import org.apache.asterix.translator.CompiledStatements.CompiledSubscribeFeedStatement;
 import org.apache.asterix.translator.CompiledStatements.ICompiledDmlStatement;
 import org.apache.asterix.translator.TypeTranslator;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.common.utils.Triple;
 import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression.FunctionKind;
 import org.apache.hyracks.algebricks.data.IAWriterFactory;
 import org.apache.hyracks.algebricks.data.IResultSerializerFactoryProvider;
@@ -191,11 +188,15 @@
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
-import org.apache.hyracks.algebricks.common.utils.Triple;
 import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
 import org.apache.hyracks.dataflow.std.file.FileSplit;
 import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import com.google.common.collect.Lists;
 
 /*
  * Provides functionality for executing a batch of AQL statements (queries included)
@@ -240,7 +241,7 @@
 
     /**
      * Compiles and submits for execution a list of AQL statements.
-     * 
+     *
      * @param hcc
      *            A Hyracks client connection that is used to submit a jobspec to Hyracks.
      * @param hdc
@@ -380,8 +381,8 @@
 
                 case QUERY: {
                     metadataProvider.setResultSetId(new ResultSetId(resultSetIdCounter++));
-                    metadataProvider.setResultAsyncMode(resultDelivery == ResultDelivery.ASYNC
-                            || resultDelivery == ResultDelivery.ASYNC_DEFERRED);
+                    metadataProvider.setResultAsyncMode(
+                            resultDelivery == ResultDelivery.ASYNC || resultDelivery == ResultDelivery.ASYNC_DEFERRED);
                     handleQuery(metadataProvider, (Query) stmt, hcc, hdc, resultDelivery);
                     break;
                 }
@@ -473,8 +474,8 @@
                     throw new AlgebricksException("A dataverse with this name " + dvName + " already exists.");
                 }
             }
-            MetadataManager.INSTANCE.addDataverse(metadataProvider.getMetadataTxnContext(), new Dataverse(dvName,
-                    stmtCreateDataverse.getFormat(), IMetadataEntity.PENDING_NO_OP));
+            MetadataManager.INSTANCE.addDataverse(metadataProvider.getMetadataTxnContext(),
+                    new Dataverse(dvName, stmtCreateDataverse.getFormat(), IMetadataEntity.PENDING_NO_OP));
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
         } catch (Exception e) {
             abort(e, e, mdTxnCtx);
@@ -492,8 +493,8 @@
             throw new AsterixException("Unknown compaction policy: " + compactionPolicy);
         }
         String compactionPolicyFactoryClassName = compactionPolicyEntity.getClassName();
-        ILSMMergePolicyFactory mergePolicyFactory = (ILSMMergePolicyFactory) Class.forName(
-                compactionPolicyFactoryClassName).newInstance();
+        ILSMMergePolicyFactory mergePolicyFactory = (ILSMMergePolicyFactory) Class
+                .forName(compactionPolicyFactoryClassName).newInstance();
         if (isExternalDataset && mergePolicyFactory.getName().compareTo("correlated-prefix") == 0) {
             throw new AsterixException("The correlated-prefix merge policy cannot be used with external dataset.");
         }
@@ -556,8 +557,8 @@
             if (dt == null) {
                 throw new AlgebricksException(": type " + itemTypeName + " could not be found.");
             }
-            String ngName = ngNameId != null ? ngNameId.getValue() : configureNodegroupForDataset(dd, dataverseName,
-                    mdTxnCtx);
+            String ngName = ngNameId != null ? ngNameId.getValue()
+                    : configureNodegroupForDataset(dd, dataverseName, mdTxnCtx);
 
             if (compactionPolicy == null) {
                 compactionPolicy = GlobalConfig.DEFAULT_COMPACTION_POLICY_NAME;
@@ -805,8 +806,8 @@
             ds = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
                     datasetName);
             if (ds == null) {
-                throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse "
-                        + dataverseName);
+                throw new AlgebricksException(
+                        "There is no dataset with this name " + datasetName + " in dataverse " + dataverseName);
             }
 
             indexName = stmtCreateIndex.getIndexName().getValue();
@@ -895,9 +896,9 @@
                 // External dataset
                 // Check if the dataset is indexible
                 if (!ExternalIndexingOperations.isIndexible((ExternalDatasetDetails) ds.getDatasetDetails())) {
-                    throw new AlgebricksException("dataset using "
-                            + ((ExternalDatasetDetails) ds.getDatasetDetails()).getAdapter()
-                            + " Adapter can't be indexed");
+                    throw new AlgebricksException(
+                            "dataset using " + ((ExternalDatasetDetails) ds.getDatasetDetails()).getAdapter()
+                                    + " Adapter can't be indexed");
                 }
                 // check if the name of the index is valid
                 if (!ExternalIndexingOperations.isValidIndexName(datasetName, indexName)) {
@@ -949,14 +950,14 @@
 
             //check whether there exists another enforced index on the same field
             if (stmtCreateIndex.isEnforced()) {
-                List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(
-                        metadataProvider.getMetadataTxnContext(), dataverseName, datasetName);
+                List<Index> indexes = MetadataManager.INSTANCE
+                        .getDatasetIndexes(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName);
                 for (Index index : indexes) {
                     if (index.getKeyFieldNames().equals(indexFields)
                             && !index.getKeyFieldTypes().equals(indexFieldTypes) && index.isEnforcingKeyFileds())
-                        throw new AsterixException("Cannot create index " + indexName + " , enforced index "
-                                + index.getIndexName() + " on field \"" + StringUtils.join(indexFields, ',')
-                                + "\" already exist");
+                        throw new AsterixException(
+                                "Cannot create index " + indexName + " , enforced index " + index.getIndexName()
+                                        + " on field \"" + StringUtils.join(indexFields, ',') + "\" already exist");
                 }
             }
 
@@ -968,7 +969,8 @@
 
             ARecordType enforcedType = null;
             if (stmtCreateIndex.isEnforced()) {
-                enforcedType = IntroduceSecondaryIndexInsertDeleteRule.createEnforcedType(aRecordType, index);
+                enforcedType = IntroduceSecondaryIndexInsertDeleteRule.createEnforcedType(aRecordType,
+                        Lists.newArrayList(index));
             }
 
             //#. prepare to create the index artifact in NC.
@@ -1014,8 +1016,8 @@
             MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), index);
             // add another new files index with PendingNoOp after deleting the index with PendingAddOp
             if (firstExternalDatasetIndex) {
-                MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName,
-                        datasetName, filesIndex.getIndexName());
+                MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName,
+                        filesIndex.getIndexName());
                 filesIndex.setPendingOp(IMetadataEntity.PENDING_NO_OP);
                 MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), filesIndex);
                 // update transaction timestamp
@@ -1056,8 +1058,8 @@
                 metadataProvider.setMetadataTxnContext(mdTxnCtx);
                 CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName, indexName);
                 try {
-                    JobSpecification jobSpec = IndexOperations
-                            .buildDropSecondaryIndexJobSpec(cds, metadataProvider, ds);
+                    JobSpecification jobSpec = IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider,
+                            ds);
 
                     MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                     bActiveTxn = false;
@@ -1189,8 +1191,8 @@
             for (FeedConnectionId connection : activeFeedConnections) {
                 FeedId feedId = connection.getFeedId();
                 if (feedId.getDataverse().equals(dataverseName)) {
-                    disStmt = new DisconnectFeedStatement(dvId, new Identifier(feedId.getFeedName()), new Identifier(
-                            connection.getDatasetName()));
+                    disStmt = new DisconnectFeedStatement(dvId, new Identifier(feedId.getFeedName()),
+                            new Identifier(connection.getDatasetName()));
                     try {
                         handleDisconnectFeedStatement(metadataProvider, disStmt, hcc);
                         if (LOGGER.isLoggable(Level.INFO)) {
@@ -1251,8 +1253,8 @@
             //   first, deleting the dataverse record from the DATAVERSE_DATASET
             //   second, inserting the dataverse record with the PendingDropOp value into the DATAVERSE_DATASET
             MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, dataverseName);
-            MetadataManager.INSTANCE.addDataverse(mdTxnCtx, new Dataverse(dataverseName, dv.getDataFormat(),
-                    IMetadataEntity.PENDING_DROP_OP));
+            MetadataManager.INSTANCE.addDataverse(mdTxnCtx,
+                    new Dataverse(dataverseName, dv.getDataFormat(), IMetadataEntity.PENDING_DROP_OP));
 
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
@@ -1333,8 +1335,8 @@
                     MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                     return;
                 } else {
-                    throw new AlgebricksException("There is no dataset with this name " + datasetName
-                            + " in dataverse " + dataverseName + ".");
+                    throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse "
+                            + dataverseName + ".");
                 }
             }
 
@@ -1368,11 +1370,11 @@
 
                 //#. mark the existing dataset as PendingDropOp
                 MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName, datasetName);
-                MetadataManager.INSTANCE.addDataset(
-                        mdTxnCtx,
-                        new Dataset(dataverseName, datasetName, ds.getItemTypeName(), ds.getNodeGroupName(), ds
-                                .getCompactionPolicy(), ds.getCompactionPolicyProperties(), ds.getDatasetDetails(), ds
-                                .getHints(), ds.getDatasetType(), ds.getDatasetId(), IMetadataEntity.PENDING_DROP_OP));
+                MetadataManager.INSTANCE.addDataset(mdTxnCtx,
+                        new Dataset(dataverseName, datasetName, ds.getItemTypeName(), ds.getNodeGroupName(),
+                                ds.getCompactionPolicy(), ds.getCompactionPolicyProperties(), ds.getDatasetDetails(),
+                                ds.getHints(), ds.getDatasetType(), ds.getDatasetId(),
+                                IMetadataEntity.PENDING_DROP_OP));
 
                 MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                 bActiveTxn = false;
@@ -1404,18 +1406,18 @@
                     } else {
                         CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName,
                                 indexes.get(j).getIndexName());
-                        jobsToExecute.add(ExternalIndexingOperations.buildDropFilesIndexJobSpec(cds, metadataProvider,
-                                ds));
+                        jobsToExecute
+                                .add(ExternalIndexingOperations.buildDropFilesIndexJobSpec(cds, metadataProvider, ds));
                     }
                 }
 
                 //#. mark the existing dataset as PendingDropOp
                 MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName, datasetName);
-                MetadataManager.INSTANCE.addDataset(
-                        mdTxnCtx,
-                        new Dataset(dataverseName, datasetName, ds.getItemTypeName(), ds.getNodeGroupName(), ds
-                                .getCompactionPolicy(), ds.getCompactionPolicyProperties(), ds.getDatasetDetails(), ds
-                                .getHints(), ds.getDatasetType(), ds.getDatasetId(), IMetadataEntity.PENDING_DROP_OP));
+                MetadataManager.INSTANCE.addDataset(mdTxnCtx,
+                        new Dataset(dataverseName, datasetName, ds.getItemTypeName(), ds.getNodeGroupName(),
+                                ds.getCompactionPolicy(), ds.getCompactionPolicyProperties(), ds.getDatasetDetails(),
+                                ds.getHints(), ds.getDatasetType(), ds.getDatasetId(),
+                                IMetadataEntity.PENDING_DROP_OP));
 
                 MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                 bActiveTxn = false;
@@ -1501,8 +1503,8 @@
 
             Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
             if (ds == null) {
-                throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse "
-                        + dataverseName);
+                throw new AlgebricksException(
+                        "There is no dataset with this name " + datasetName + " in dataverse " + dataverseName);
             }
 
             List<FeedConnectionId> feedConnections = FeedLifecycleListener.INSTANCE.getActiveFeedConnections(null);
@@ -1516,9 +1518,9 @@
                     }
                 }
                 if (resourceInUse) {
-                    throw new AsterixException("Dataset" + datasetName
-                            + " is currently being fed into by the following feeds " + "." + builder.toString()
-                            + "\nOperation not supported.");
+                    throw new AsterixException(
+                            "Dataset" + datasetName + " is currently being fed into by the following feeds " + "."
+                                    + builder.toString() + "\nOperation not supported.");
                 }
             }
 
@@ -1539,11 +1541,10 @@
 
                 //#. mark PendingDropOp on the existing index
                 MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
-                MetadataManager.INSTANCE.addIndex(
-                        mdTxnCtx,
-                        new Index(dataverseName, datasetName, indexName, index.getIndexType(),
-                                index.getKeyFieldNames(), index.getKeyFieldTypes(), index.isEnforcingKeyFileds(), index
-                                        .isPrimaryIndex(), IMetadataEntity.PENDING_DROP_OP));
+                MetadataManager.INSTANCE.addIndex(mdTxnCtx,
+                        new Index(dataverseName, datasetName, indexName, index.getIndexType(), index.getKeyFieldNames(),
+                                index.getKeyFieldTypes(), index.isEnforcingKeyFileds(), index.isPrimaryIndex(),
+                                IMetadataEntity.PENDING_DROP_OP));
 
                 //#. commit the existing transaction before calling runJob.
                 MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -1587,28 +1588,26 @@
                         if (ExternalIndexingOperations.isFileIndex(externalIndex)) {
                             cds = new CompiledIndexDropStatement(dataverseName, datasetName,
                                     externalIndex.getIndexName());
-                            jobsToExecute.add(ExternalIndexingOperations.buildDropFilesIndexJobSpec(cds,
-                                    metadataProvider, ds));
+                            jobsToExecute.add(
+                                    ExternalIndexingOperations.buildDropFilesIndexJobSpec(cds, metadataProvider, ds));
                             //#. mark PendingDropOp on the existing files index
                             MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName,
                                     externalIndex.getIndexName());
-                            MetadataManager.INSTANCE.addIndex(
-                                    mdTxnCtx,
-                                    new Index(dataverseName, datasetName, externalIndex.getIndexName(), externalIndex
-                                            .getIndexType(), externalIndex.getKeyFieldNames(),
-                                            index.getKeyFieldTypes(), index.isEnforcingKeyFileds(), externalIndex
-                                                    .isPrimaryIndex(), IMetadataEntity.PENDING_DROP_OP));
+                            MetadataManager.INSTANCE.addIndex(mdTxnCtx,
+                                    new Index(dataverseName, datasetName, externalIndex.getIndexName(),
+                                            externalIndex.getIndexType(), externalIndex.getKeyFieldNames(),
+                                            index.getKeyFieldTypes(), index.isEnforcingKeyFileds(),
+                                            externalIndex.isPrimaryIndex(), IMetadataEntity.PENDING_DROP_OP));
                         }
                     }
                 }
 
                 //#. mark PendingDropOp on the existing index
                 MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
-                MetadataManager.INSTANCE.addIndex(
-                        mdTxnCtx,
-                        new Index(dataverseName, datasetName, indexName, index.getIndexType(),
-                                index.getKeyFieldNames(), index.getKeyFieldTypes(), index.isEnforcingKeyFileds(), index
-                                        .isPrimaryIndex(), IMetadataEntity.PENDING_DROP_OP));
+                MetadataManager.INSTANCE.addIndex(mdTxnCtx,
+                        new Index(dataverseName, datasetName, indexName, index.getIndexType(), index.getKeyFieldNames(),
+                                index.getKeyFieldTypes(), index.isEnforcingKeyFileds(), index.isPrimaryIndex(),
+                                IMetadataEntity.PENDING_DROP_OP));
 
                 //#. commit the existing transaction before calling runJob.
                 MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -1667,8 +1666,8 @@
                 } catch (Exception e2) {
                     e.addSuppressed(e2);
                     abort(e, e2, mdTxnCtx);
-                    throw new IllegalStateException("System is inconsistent state: pending index(" + dataverseName
-                            + "." + datasetName + "." + indexName + ") couldn't be removed from the metadata", e);
+                    throw new IllegalStateException("System is inconsistent state: pending index(" + dataverseName + "."
+                            + datasetName + "." + indexName + ") couldn't be removed from the metadata", e);
                 }
             }
 
@@ -1764,8 +1763,8 @@
         FunctionSignature signature = stmtDropFunction.getFunctionSignature();
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        MetadataLockManager.INSTANCE.functionStatementBegin(signature.getNamespace(), signature.getNamespace() + "."
-                + signature.getName());
+        MetadataLockManager.INSTANCE.functionStatementBegin(signature.getNamespace(),
+                signature.getNamespace() + "." + signature.getName());
         try {
             Function function = MetadataManager.INSTANCE.getFunction(mdTxnCtx, signature);
             if (function == null) {
@@ -1779,8 +1778,8 @@
             abort(e, e, mdTxnCtx);
             throw e;
         } finally {
-            MetadataLockManager.INSTANCE.functionStatementEnd(signature.getNamespace(), signature.getNamespace() + "."
-                    + signature.getName());
+            MetadataLockManager.INSTANCE.functionStatementEnd(signature.getNamespace(),
+                    signature.getNamespace() + "." + signature.getName());
         }
     }
 
@@ -1794,11 +1793,11 @@
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
         MetadataLockManager.INSTANCE.modifyDatasetBegin(dataverseName, dataverseName + "." + datasetName);
         try {
-            CompiledLoadFromFileStatement cls = new CompiledLoadFromFileStatement(dataverseName, loadStmt
-                    .getDatasetName().getValue(), loadStmt.getAdapter(), loadStmt.getProperties(),
+            CompiledLoadFromFileStatement cls = new CompiledLoadFromFileStatement(dataverseName,
+                    loadStmt.getDatasetName().getValue(), loadStmt.getAdapter(), loadStmt.getProperties(),
                     loadStmt.dataIsAlreadySorted());
-            JobSpecification spec = APIFramework
-                    .compileQuery(null, metadataProvider, null, 0, null, sessionConfig, cls);
+            JobSpecification spec = APIFramework.compileQuery(null, metadataProvider, null, 0, null, sessionConfig,
+                    cls);
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
             if (spec != null) {
@@ -1823,13 +1822,13 @@
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         boolean bActiveTxn = true;
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        MetadataLockManager.INSTANCE.insertDeleteBegin(dataverseName,
-                dataverseName + "." + stmtInsert.getDatasetName(), query.getDataverses(), query.getDatasets());
+        MetadataLockManager.INSTANCE.insertDeleteBegin(dataverseName, dataverseName + "." + stmtInsert.getDatasetName(),
+                query.getDataverses(), query.getDatasets());
 
         try {
             metadataProvider.setWriteTransaction(true);
-            CompiledInsertStatement clfrqs = new CompiledInsertStatement(dataverseName, stmtInsert.getDatasetName()
-                    .getValue(), query, stmtInsert.getVarCounter());
+            CompiledInsertStatement clfrqs = new CompiledInsertStatement(dataverseName,
+                    stmtInsert.getDatasetName().getValue(), query, stmtInsert.getVarCounter());
             JobSpecification compiled = rewriteCompileQuery(metadataProvider, query, clfrqs);
 
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -1858,9 +1857,8 @@
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         boolean bActiveTxn = true;
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        MetadataLockManager.INSTANCE
-                .insertDeleteBegin(dataverseName, dataverseName + "." + stmtDelete.getDatasetName(),
-                        stmtDelete.getDataverses(), stmtDelete.getDatasets());
+        MetadataLockManager.INSTANCE.insertDeleteBegin(dataverseName, dataverseName + "." + stmtDelete.getDatasetName(),
+                stmtDelete.getDataverses(), stmtDelete.getDatasets());
 
         try {
             metadataProvider.setWriteTransaction(true);
@@ -1889,8 +1887,8 @@
     }
 
     private JobSpecification rewriteCompileQuery(AqlMetadataProvider metadataProvider, Query query,
-            ICompiledDmlStatement stmt) throws AsterixException, RemoteException, AlgebricksException, JSONException,
-            ACIDException {
+            ICompiledDmlStatement stmt)
+                    throws AsterixException, RemoteException, AlgebricksException, JSONException, ACIDException {
 
         // Query Rewriting (happens under the same ongoing metadata transaction)
         Pair<Query, Integer> reWrittenQuery = APIFramework.reWriteQuery(declaredFunctions, metadataProvider, query,
@@ -1977,8 +1975,8 @@
             boolean extendingExisting = cfps.getSourcePolicyName() != null;
             String description = cfps.getDescription() == null ? "" : cfps.getDescription();
             if (extendingExisting) {
-                FeedPolicy sourceFeedPolicy = MetadataManager.INSTANCE.getFeedPolicy(
-                        metadataProvider.getMetadataTxnContext(), dataverse, cfps.getSourcePolicyName());
+                FeedPolicy sourceFeedPolicy = MetadataManager.INSTANCE
+                        .getFeedPolicy(metadataProvider.getMetadataTxnContext(), dataverse, cfps.getSourcePolicyName());
                 if (sourceFeedPolicy == null) {
                     sourceFeedPolicy = MetadataManager.INSTANCE.getFeedPolicy(metadataProvider.getMetadataTxnContext(),
                             MetadataConstants.METADATA_DATAVERSE_NAME, cfps.getSourcePolicyName());
@@ -2101,13 +2099,13 @@
         IFeedLifecycleEventSubscriber eventSubscriber = new FeedLifecycleEventSubscriber();
         FeedConnectionId feedConnId = null;
 
-        MetadataLockManager.INSTANCE.connectFeedBegin(dataverseName, dataverseName + "." + datasetName, dataverseName
-                + "." + feedName);
+        MetadataLockManager.INSTANCE.connectFeedBegin(dataverseName, dataverseName + "." + datasetName,
+                dataverseName + "." + feedName);
         try {
             metadataProvider.setWriteTransaction(true);
 
-            CompiledConnectFeedStatement cbfs = new CompiledConnectFeedStatement(dataverseName, cfs.getFeedName(), cfs
-                    .getDatasetName().getValue(), cfs.getPolicy(), cfs.getQuery(), cfs.getVarCounter());
+            CompiledConnectFeedStatement cbfs = new CompiledConnectFeedStatement(dataverseName, cfs.getFeedName(),
+                    cfs.getDatasetName().getValue(), cfs.getPolicy(), cfs.getQuery(), cfs.getVarCounter());
 
             FeedUtil.validateIfDatasetExists(dataverseName, cfs.getDatasetName().getValue(),
                     metadataProvider.getMetadataTxnContext());
@@ -2165,8 +2163,8 @@
                 eventSubscriber.assertEvent(FeedLifecycleEvent.FEED_ENDED); // blocking call
             }
             String waitForCompletionParam = metadataProvider.getConfig().get(ConnectFeedStatement.WAIT_FOR_COMPLETION);
-            boolean waitForCompletion = waitForCompletionParam == null ? false : Boolean
-                    .valueOf(waitForCompletionParam);
+            boolean waitForCompletion = waitForCompletionParam == null ? false
+                    : Boolean.valueOf(waitForCompletionParam);
             if (waitForCompletion) {
                 MetadataLockManager.INSTANCE.connectFeedEnd(dataverseName, dataverseName + "." + datasetName,
                         dataverseName + "." + feedName);
@@ -2191,7 +2189,7 @@
     /**
      * Generates a subscription request corresponding to a connect feed request. In addition, provides a boolean
      * flag indicating if feed intake job needs to be started (source primary feed not found to be active).
-     * 
+     *
      * @param dataverse
      * @param feed
      * @param dataset
@@ -2202,7 +2200,7 @@
      */
     private Triple<FeedConnectionRequest, Boolean, List<IFeedJoint>> getFeedConnectionRequest(String dataverse,
             Feed feed, String dataset, FeedPolicy feedPolicy, MetadataTransactionContext mdTxnCtx)
-            throws MetadataException {
+                    throws MetadataException {
         IFeedJoint sourceFeedJoint = null;
         FeedConnectionRequest request = null;
         List<String> functionsToApply = new ArrayList<String>();
@@ -2217,7 +2215,7 @@
             sourceFeedJoint = FeedLifecycleListener.INSTANCE.getAvailableFeedJoint(feedJointKey);
             if (sourceFeedJoint == null) { // the feed is currently not being ingested, i.e., it is unavailable.
                 connectionLocation = ConnectionLocation.SOURCE_FEED_INTAKE_STAGE;
-                FeedId sourceFeedId = feedJointKey.getFeedId(); // the root/primary feedId 
+                FeedId sourceFeedId = feedJointKey.getFeedId(); // the root/primary feedId
                 Feed primaryFeed = MetadataManager.INSTANCE.getFeed(mdTxnCtx, dataverse, sourceFeedId.getFeedName());
                 FeedJointKey intakeFeedJointKey = new FeedJointKey(sourceFeedId, new ArrayList<String>());
                 sourceFeedJoint = new FeedJoint(intakeFeedJointKey, primaryFeed.getFeedId(), connectionLocation,
@@ -2236,7 +2234,7 @@
                 }
             }
             // register the compute feed point that represents the final output from the collection of
-            // functions that will be applied. 
+            // functions that will be applied.
             if (!functionsToApply.isEmpty()) {
                 FeedJointKey computeFeedJointKey = new FeedJointKey(feed.getFeedId(), functionsToApply);
                 IFeedJoint computeFeedJoint = new FeedJoint(computeFeedJointKey, feed.getFeedId(),
@@ -2259,7 +2257,7 @@
     }
 
     /*
-     * Gets the feed joint corresponding to the feed definition. Tuples constituting the feed are 
+     * Gets the feed joint corresponding to the feed definition. Tuples constituting the feed are
      * available at this feed joint.
      */
     private FeedJointKey getFeedJointKey(Feed feed, MetadataTransactionContext ctx) throws MetadataException {
@@ -2307,12 +2305,12 @@
             Dataset dataset = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(),
                     dataverseName, cfs.getDatasetName().getValue());
             if (dataset == null) {
-                throw new AsterixException("Unknown dataset :" + cfs.getDatasetName().getValue() + " in dataverse "
-                        + dataverseName);
+                throw new AsterixException(
+                        "Unknown dataset :" + cfs.getDatasetName().getValue() + " in dataverse " + dataverseName);
             }
 
-            Pair<JobSpecification, Boolean> specDisconnectType = FeedOperations.buildDisconnectFeedJobSpec(
-                    metadataProvider, connectionId);
+            Pair<JobSpecification, Boolean> specDisconnectType = FeedOperations
+                    .buildDisconnectFeedJobSpec(metadataProvider, connectionId);
             JobSpecification jobSpec = specDisconnectType.first;
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
@@ -2356,17 +2354,17 @@
                 StringUtils.join(bfs.getLocations(), ','));
 
         JobSpecification compiled = rewriteCompileQuery(metadataProvider, bfs.getQuery(), csfs);
-        FeedConnectionId feedConnectionId = new FeedConnectionId(bfs.getSubscriptionRequest().getReceivingFeedId(), bfs
-                .getSubscriptionRequest().getTargetDataset());
+        FeedConnectionId feedConnectionId = new FeedConnectionId(bfs.getSubscriptionRequest().getReceivingFeedId(),
+                bfs.getSubscriptionRequest().getTargetDataset());
         String dataverse = feedConnectionId.getFeedId().getDataverse();
         String dataset = feedConnectionId.getDatasetName();
-        MetadataLockManager.INSTANCE.subscribeFeedBegin(dataverse, dataverse + "." + dataset, dataverse + "."
-                + feedConnectionId.getFeedId().getFeedName());
+        MetadataLockManager.INSTANCE.subscribeFeedBegin(dataverse, dataverse + "." + dataset,
+                dataverse + "." + feedConnectionId.getFeedId().getFeedName());
 
         try {
 
-            JobSpecification alteredJobSpec = FeedUtil.alterJobSpecificationForFeed(compiled, feedConnectionId, bfs
-                    .getSubscriptionRequest().getPolicyParameters());
+            JobSpecification alteredJobSpec = FeedUtil.alterJobSpecificationForFeed(compiled, feedConnectionId,
+                    bfs.getSubscriptionRequest().getPolicyParameters());
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
 
@@ -2381,8 +2379,8 @@
             }
             throw e;
         } finally {
-            MetadataLockManager.INSTANCE.subscribeFeedEnd(dataverse, dataverse + "." + dataset, dataverse + "."
-                    + feedConnectionId.getFeedId().getFeedName());
+            MetadataLockManager.INSTANCE.subscribeFeedEnd(dataverse, dataverse + "." + dataset,
+                    dataverse + "." + feedConnectionId.getFeedId().getFeedName());
         }
     }
 
@@ -2400,8 +2398,8 @@
         try {
             Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
             if (ds == null) {
-                throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse "
-                        + dataverseName + ".");
+                throw new AlgebricksException(
+                        "There is no dataset with this name " + datasetName + " in dataverse " + dataverseName + ".");
             }
 
             String itemTypeName = ds.getItemTypeName();
@@ -2411,45 +2409,26 @@
             // Prepare jobs to compact the datatset and its indexes
             List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName);
             if (indexes.size() == 0) {
-                throw new AlgebricksException("Cannot compact the extrenal dataset " + datasetName
-                        + " because it has no indexes");
+                throw new AlgebricksException(
+                        "Cannot compact the extrenal dataset " + datasetName + " because it has no indexes");
             }
 
-            if (ds.getDatasetType() == DatasetType.INTERNAL) {
-                for (int j = 0; j < indexes.size(); j++) {
-                    if (indexes.get(j).isSecondaryIndex()) {
-                        CompiledIndexCompactStatement cics = new CompiledIndexCompactStatement(dataverseName,
-                                datasetName, indexes.get(j).getIndexName(), indexes.get(j).getKeyFieldNames(), indexes
-                                        .get(j).getKeyFieldTypes(), indexes.get(j).isEnforcingKeyFileds(), indexes.get(
-                                        j).getGramLength(), indexes.get(j).getIndexType());
-
-                        Dataverse dataverse = MetadataManager.INSTANCE.getDataverse(
-                                metadataProvider.getMetadataTxnContext(), dataverseName);
-                        jobsToExecute.add(DatasetOperations.compactDatasetJobSpec(dataverse, datasetName,
-                                metadataProvider));
-
-                    }
+            Dataverse dataverse = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(),
+                    dataverseName);
+            jobsToExecute.add(DatasetOperations.compactDatasetJobSpec(dataverse, datasetName, metadataProvider));
+            ARecordType aRecordType = (ARecordType) dt.getDatatype();
+            ARecordType enforcedType = IntroduceSecondaryIndexInsertDeleteRule.createEnforcedType(aRecordType, indexes);
+            for (int j = 0; j < indexes.size(); j++) {
+                if (ds.getDatasetType() == DatasetType.INTERNAL && indexes.get(j).isSecondaryIndex()
+                        || ds.getDatasetType() == DatasetType.EXTERNAL
+                                && !ExternalIndexingOperations.isFileIndex(indexes.get(j))) {
+                    CompiledIndexCompactStatement cics = new CompiledIndexCompactStatement(dataverseName, datasetName,
+                            indexes.get(j).getIndexName(), indexes.get(j).getKeyFieldNames(),
+                            indexes.get(j).getKeyFieldTypes(), indexes.get(j).isEnforcingKeyFileds(),
+                            indexes.get(j).getGramLength(), indexes.get(j).getIndexType());
+                    jobsToExecute.add(IndexOperations.buildSecondaryIndexCompactJobSpec(cics, aRecordType, enforcedType,
+                            metadataProvider, ds));
                 }
-            } else {
-                for (int j = 0; j < indexes.size(); j++) {
-                    if (!ExternalIndexingOperations.isFileIndex(indexes.get(j))) {
-                        CompiledIndexCompactStatement cics = new CompiledIndexCompactStatement(dataverseName,
-                                datasetName, indexes.get(j).getIndexName(), indexes.get(j).getKeyFieldNames(), indexes
-                                        .get(j).getKeyFieldTypes(), indexes.get(j).isEnforcingKeyFileds(), indexes.get(
-                                        j).getGramLength(), indexes.get(j).getIndexType());
-                        ARecordType aRecordType = (ARecordType) dt.getDatatype();
-                        ARecordType enforcedType = null;
-                        if (cics.isEnforced()) {
-                            enforcedType = IntroduceSecondaryIndexInsertDeleteRule.createEnforcedType(aRecordType,
-                                    indexes.get(j));
-                        }
-                        jobsToExecute.add(IndexOperations.buildSecondaryIndexCompactJobSpec(cics, aRecordType,
-                                enforcedType, metadataProvider, ds));
-
-                    }
-
-                }
-                jobsToExecute.add(ExternalIndexingOperations.compactFilesIndexJobSpec(ds, metadataProvider));
             }
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
@@ -2596,13 +2575,13 @@
 
             // Dataset exists ?
             if (ds == null) {
-                throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse "
-                        + dataverseName);
+                throw new AlgebricksException(
+                        "There is no dataset with this name " + datasetName + " in dataverse " + dataverseName);
             }
             // Dataset external ?
             if (ds.getDatasetType() != DatasetType.EXTERNAL) {
-                throw new AlgebricksException("dataset " + datasetName + " in dataverse " + dataverseName
-                        + " is not an external dataset");
+                throw new AlgebricksException(
+                        "dataset " + datasetName + " in dataverse " + dataverseName + " is not an external dataset");
             }
             // Dataset has indexes ?
             indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName);
@@ -2626,8 +2605,8 @@
 
             // Compute delta
             // Now we compare snapshot with external file system
-            if (ExternalIndexingOperations
-                    .isDatasetUptodate(ds, metadataFiles, addedFiles, deletedFiles, appendedFiles)) {
+            if (ExternalIndexingOperations.isDatasetUptodate(ds, metadataFiles, addedFiles, deletedFiles,
+                    appendedFiles)) {
                 ((ExternalDatasetDetails) ds.getDatasetDetails()).setRefreshTimestamp(txnTime);
                 MetadataManager.INSTANCE.updateDataset(mdTxnCtx, ds);
                 MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -2803,8 +2782,8 @@
                 handlePregelixStatement(metadataProvider, runStmt, hcc);
                 break;
             default:
-                throw new AlgebricksException("The system \"" + runStmt.getSystem()
-                        + "\" specified in your run statement is not supported.");
+                throw new AlgebricksException(
+                        "The system \"" + runStmt.getSystem() + "\" specified in your run statement is not supported.");
         }
 
     }
@@ -2833,8 +2812,8 @@
 
             // construct input paths
             Index fromIndex = null;
-            List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseNameFrom, pregelixStmt
-                    .getDatasetNameFrom().getValue());
+            List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseNameFrom,
+                    pregelixStmt.getDatasetNameFrom().getValue());
             for (Index ind : indexes) {
                 if (ind.isPrimaryIndex())
                     fromIndex = ind;
@@ -2846,8 +2825,8 @@
 
             Dataset datasetFrom = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseNameFrom, datasetNameFrom);
             IFileSplitProvider fromSplits = metadataProvider.splitProviderAndPartitionConstraintsForDataset(
-                    dataverseNameFrom, datasetNameFrom, fromIndex.getIndexName(), datasetFrom.getDatasetDetails()
-                            .isTemp()).first;
+                    dataverseNameFrom, datasetNameFrom, fromIndex.getIndexName(),
+                    datasetFrom.getDatasetDetails().isTemp()).first;
             StringBuilder fromSplitsPaths = new StringBuilder();
 
             for (FileSplit f : fromSplits.getFileSplits()) {
@@ -2858,8 +2837,8 @@
 
             // Construct output paths
             Index toIndex = null;
-            indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseNameTo, pregelixStmt
-                    .getDatasetNameTo().getValue());
+            indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseNameTo,
+                    pregelixStmt.getDatasetNameTo().getValue());
             for (Index ind : indexes) {
                 if (ind.isPrimaryIndex())
                     toIndex = ind;
@@ -2871,7 +2850,8 @@
 
             Dataset datasetTo = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseNameTo, datasetNameTo);
             IFileSplitProvider toSplits = metadataProvider.splitProviderAndPartitionConstraintsForDataset(
-                    dataverseNameTo, datasetNameTo, toIndex.getIndexName(), datasetTo.getDatasetDetails().isTemp()).first;
+                    dataverseNameTo, datasetNameTo, toIndex.getIndexName(),
+                    datasetTo.getDatasetDetails().isTemp()).first;
             StringBuilder toSplitsPaths = new StringBuilder();
 
             for (FileSplit f : toSplits.getFileSplits()) {
@@ -2886,11 +2866,11 @@
                         pregelixStmt.getDatasetNameTo(), true);
                 this.handleDatasetDropStatement(metadataProvider, dropStmt, hcc);
 
-                IDatasetDetailsDecl idd = new InternalDetailsDecl(toIndex.getKeyFieldNames(), false, null, toDataset
-                        .getDatasetDetails().isTemp());
+                IDatasetDetailsDecl idd = new InternalDetailsDecl(toIndex.getKeyFieldNames(), false, null,
+                        toDataset.getDatasetDetails().isTemp());
                 DatasetDecl createToDataset = new DatasetDecl(new Identifier(dataverseNameTo),
-                        pregelixStmt.getDatasetNameTo(), new Identifier(toDataset.getItemTypeName()), new Identifier(
-                                toDataset.getNodeGroupName()), toDataset.getCompactionPolicy(),
+                        pregelixStmt.getDatasetNameTo(), new Identifier(toDataset.getItemTypeName()),
+                        new Identifier(toDataset.getNodeGroupName()), toDataset.getCompactionPolicy(),
                         toDataset.getCompactionPolicyProperties(), toDataset.getHints(), toDataset.getDatasetType(),
                         idd, false);
                 this.handleCreateDatasetStatement(metadataProvider, createToDataset, hcc);
@@ -2997,7 +2977,7 @@
 
     private void flushDataset(IHyracksClientConnection hcc, AqlMetadataProvider metadataProvider,
             MetadataTransactionContext mdTxnCtx, String dataverseName, String datasetName, String indexName)
-            throws Exception {
+                    throws Exception {
         AsterixCompilerProperties compilerProperties = AsterixAppContextInfo.getInstance().getCompilerProperties();
         int frameSize = compilerProperties.getFrameSize();
         JobSpecification spec = new JobSpecification(frameSize);
@@ -3014,8 +2994,8 @@
         spec.connect(new OneToOneConnectorDescriptor(spec), emptySource, 0, flushOperator, 0);
 
         Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primarySplitsAndConstraint = metadataProvider
-                .splitProviderAndPartitionConstraintsForDataset(dataverseName, datasetName, indexName, dataset
-                        .getDatasetDetails().isTemp());
+                .splitProviderAndPartitionConstraintsForDataset(dataverseName, datasetName, indexName,
+                        dataset.getDatasetDetails().isTemp());
         AlgebricksPartitionConstraint primaryPartitionConstraint = primarySplitsAndConstraint.second;
 
         AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, emptySource,
diff --git a/asterix-app/src/test/resources/optimizerts/results/disjunction-to-join-delete-1.plan b/asterix-app/src/test/resources/optimizerts/results/disjunction-to-join-delete-1.plan
index 563a75b..4c2d144 100644
--- a/asterix-app/src/test/resources/optimizerts/results/disjunction-to-join-delete-1.plan
+++ b/asterix-app/src/test/resources/optimizerts/results/disjunction-to-join-delete-1.plan
@@ -12,12 +12,10 @@
                       -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                         -- ASSIGN  |PARTITIONED|
                           -- STREAM_PROJECT  |PARTITIONED|
-                            -- ASSIGN  |PARTITIONED|
-                              -- STREAM_PROJECT  |PARTITIONED|
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              -- BTREE_SEARCH  |PARTITIONED|
                                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                  -- BTREE_SEARCH  |PARTITIONED|
-                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                      -- STABLE_SORT [$$15(ASC)]  |PARTITIONED|
-                                        -- HASH_PARTITION_EXCHANGE [$$15]  |PARTITIONED|
-                                          -- UNNEST  |UNPARTITIONED|
-                                            -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
+                                  -- STABLE_SORT [$$11(ASC)]  |PARTITIONED|
+                                    -- HASH_PARTITION_EXCHANGE [$$11]  |PARTITIONED|
+                                      -- UNNEST  |UNPARTITIONED|
+                                        -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/optimizerts/results/disjunction-to-join-delete-2.plan b/asterix-app/src/test/resources/optimizerts/results/disjunction-to-join-delete-2.plan
index ba4c536..4b9e8a2 100644
--- a/asterix-app/src/test/resources/optimizerts/results/disjunction-to-join-delete-2.plan
+++ b/asterix-app/src/test/resources/optimizerts/results/disjunction-to-join-delete-2.plan
@@ -9,22 +9,19 @@
                 -- INSERT_DELETE  |PARTITIONED|
                   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                     -- MATERIALIZE  |PARTITIONED|
-                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                      -- HASH_PARTITION_EXCHANGE [$$8]  |PARTITIONED|
                         -- ASSIGN  |PARTITIONED|
                           -- STREAM_PROJECT  |PARTITIONED|
-                            -- ASSIGN  |PARTITIONED|
+                            -- STREAM_SELECT  |PARTITIONED|
                               -- STREAM_PROJECT  |PARTITIONED|
-                                -- STREAM_SELECT  |PARTITIONED|
-                                  -- ASSIGN  |PARTITIONED|
-                                    -- STREAM_PROJECT  |PARTITIONED|
-                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                        -- BTREE_SEARCH  |PARTITIONED|
-                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                            -- STABLE_SORT [$$19(ASC)]  |PARTITIONED|
-                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                -- STREAM_PROJECT  |PARTITIONED|
-                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                    -- BTREE_SEARCH  |PARTITIONED|
-                                                      -- BROADCAST_EXCHANGE  |PARTITIONED|
-                                                        -- UNNEST  |UNPARTITIONED|
-                                                          -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  -- BTREE_SEARCH  |PARTITIONED|
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      -- STABLE_SORT [$$13(ASC)]  |PARTITIONED|
+                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                          -- STREAM_PROJECT  |PARTITIONED|
+                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                              -- BTREE_SEARCH  |PARTITIONED|
+                                                -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                                  -- UNNEST  |UNPARTITIONED|
+                                                    -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
diff --git a/asterix-app/src/test/resources/optimizerts/results/disjunction-to-join-delete-3.plan b/asterix-app/src/test/resources/optimizerts/results/disjunction-to-join-delete-3.plan
index 55fc23c..7d38039 100644
--- a/asterix-app/src/test/resources/optimizerts/results/disjunction-to-join-delete-3.plan
+++ b/asterix-app/src/test/resources/optimizerts/results/disjunction-to-join-delete-3.plan
@@ -9,20 +9,18 @@
                 -- INSERT_DELETE  |PARTITIONED|
                   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                     -- MATERIALIZE  |PARTITIONED|
-                      -- HASH_PARTITION_EXCHANGE [$$12]  |PARTITIONED|
+                      -- HASH_PARTITION_EXCHANGE [$$10]  |PARTITIONED|
                         -- ASSIGN  |PARTITIONED|
                           -- STREAM_PROJECT  |PARTITIONED|
-                            -- ASSIGN  |PARTITIONED|
-                              -- STREAM_PROJECT  |PARTITIONED|
-                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                  -- HYBRID_HASH_JOIN [$$15][$$13]  |PARTITIONED|
-                                    -- HASH_PARTITION_EXCHANGE [$$15]  |PARTITIONED|
-                                      -- UNNEST  |UNPARTITIONED|
-                                        -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
-                                    -- HASH_PARTITION_EXCHANGE [$$13]  |PARTITIONED|
-                                      -- STREAM_PROJECT  |PARTITIONED|
-                                        -- ASSIGN  |PARTITIONED|
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              -- HYBRID_HASH_JOIN [$$11][$$9]  |PARTITIONED|
+                                -- HASH_PARTITION_EXCHANGE [$$11]  |PARTITIONED|
+                                  -- UNNEST  |UNPARTITIONED|
+                                    -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
+                                -- HASH_PARTITION_EXCHANGE [$$9]  |PARTITIONED|
+                                  -- STREAM_PROJECT  |PARTITIONED|
+                                    -- ASSIGN  |PARTITIONED|
+                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                        -- DATASOURCE_SCAN  |PARTITIONED|
                                           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                            -- DATASOURCE_SCAN  |PARTITIONED|
-                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                            -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/optimizerts/results/scan-delete-all.plan b/asterix-app/src/test/resources/optimizerts/results/scan-delete-all.plan
index c2173fb..3e031e9 100644
--- a/asterix-app/src/test/resources/optimizerts/results/scan-delete-all.plan
+++ b/asterix-app/src/test/resources/optimizerts/results/scan-delete-all.plan
@@ -7,8 +7,7 @@
             -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
               -- ASSIGN  |PARTITIONED|
                 -- STREAM_PROJECT  |PARTITIONED|
-                  -- ASSIGN  |PARTITIONED|
-                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                      -- DATASOURCE_SCAN  |PARTITIONED|
-                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                    -- DATASOURCE_SCAN  |PARTITIONED|
+                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/optimizerts/results/scan-delete-rtree-secondary-index.plan b/asterix-app/src/test/resources/optimizerts/results/scan-delete-rtree-secondary-index.plan
index 691761d..ca4a6c2 100644
--- a/asterix-app/src/test/resources/optimizerts/results/scan-delete-rtree-secondary-index.plan
+++ b/asterix-app/src/test/resources/optimizerts/results/scan-delete-rtree-secondary-index.plan
@@ -25,9 +25,8 @@
                           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                             -- ASSIGN  |PARTITIONED|
                               -- STREAM_PROJECT  |PARTITIONED|
-                                -- ASSIGN  |PARTITIONED|
-                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                    -- BTREE_SEARCH  |PARTITIONED|
-                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                        -- ASSIGN  |PARTITIONED|
-                                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  -- BTREE_SEARCH  |PARTITIONED|
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      -- ASSIGN  |PARTITIONED|
+                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterix-app/src/test/resources/optimizerts/results/scan-delete.plan b/asterix-app/src/test/resources/optimizerts/results/scan-delete.plan
index cfe5d35..790ac4f 100644
--- a/asterix-app/src/test/resources/optimizerts/results/scan-delete.plan
+++ b/asterix-app/src/test/resources/optimizerts/results/scan-delete.plan
@@ -6,11 +6,9 @@
           -- MATERIALIZE  |PARTITIONED|
             -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
               -- ASSIGN  |PARTITIONED|
-                -- STREAM_PROJECT  |PARTITIONED|
-                  -- ASSIGN  |PARTITIONED|
-                    -- STREAM_SELECT  |PARTITIONED|
-                      -- ASSIGN  |PARTITIONED|
+                -- STREAM_SELECT  |PARTITIONED|
+                  -- STREAM_PROJECT  |PARTITIONED|
+                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                      -- DATASOURCE_SCAN  |PARTITIONED|
                         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                          -- DATASOURCE_SCAN  |PARTITIONED|
-                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                              -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterix-app/src/test/resources/optimizerts/results/skip-index/dont-skip-primary-index-search-in-delete.plan b/asterix-app/src/test/resources/optimizerts/results/skip-index/dont-skip-primary-index-search-in-delete.plan
index e2e6dff..bc9f1d1 100644
--- a/asterix-app/src/test/resources/optimizerts/results/skip-index/dont-skip-primary-index-search-in-delete.plan
+++ b/asterix-app/src/test/resources/optimizerts/results/skip-index/dont-skip-primary-index-search-in-delete.plan
@@ -7,9 +7,8 @@
             -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
               -- ASSIGN  |PARTITIONED|
                 -- STREAM_PROJECT  |PARTITIONED|
-                  -- ASSIGN  |PARTITIONED|
-                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                      -- BTREE_SEARCH  |PARTITIONED|
-                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                          -- ASSIGN  |PARTITIONED|
-                            -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                    -- BTREE_SEARCH  |PARTITIONED|
+                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                        -- ASSIGN  |PARTITIONED|
+                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterix-app/src/test/resources/optimizerts/results/skip-index/skip-ngram-index-search-in-delete.plan b/asterix-app/src/test/resources/optimizerts/results/skip-index/skip-ngram-index-search-in-delete.plan
index c882d81..31de832 100644
--- a/asterix-app/src/test/resources/optimizerts/results/skip-index/skip-ngram-index-search-in-delete.plan
+++ b/asterix-app/src/test/resources/optimizerts/results/skip-index/skip-ngram-index-search-in-delete.plan
@@ -11,11 +11,9 @@
                     -- MATERIALIZE  |PARTITIONED|
                       -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                         -- ASSIGN  |PARTITIONED|
-                          -- STREAM_PROJECT  |PARTITIONED|
-                            -- ASSIGN  |PARTITIONED|
-                              -- STREAM_SELECT  |PARTITIONED|
-                                -- ASSIGN  |PARTITIONED|
+                          -- STREAM_SELECT  |PARTITIONED|
+                            -- STREAM_PROJECT  |PARTITIONED|
+                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                -- DATASOURCE_SCAN  |PARTITIONED|
                                   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                    -- DATASOURCE_SCAN  |PARTITIONED|
-                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterix-app/src/test/resources/optimizerts/results/skip-index/skip-rtree-index-search-in-delete.plan b/asterix-app/src/test/resources/optimizerts/results/skip-index/skip-rtree-index-search-in-delete.plan
index b69dfa3..40461a8 100644
--- a/asterix-app/src/test/resources/optimizerts/results/skip-index/skip-rtree-index-search-in-delete.plan
+++ b/asterix-app/src/test/resources/optimizerts/results/skip-index/skip-rtree-index-search-in-delete.plan
@@ -13,11 +13,9 @@
                         -- MATERIALIZE  |PARTITIONED|
                           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                             -- ASSIGN  |PARTITIONED|
-                              -- STREAM_PROJECT  |PARTITIONED|
-                                -- ASSIGN  |PARTITIONED|
-                                  -- STREAM_SELECT  |PARTITIONED|
-                                    -- ASSIGN  |PARTITIONED|
+                              -- STREAM_SELECT  |PARTITIONED|
+                                -- STREAM_PROJECT  |PARTITIONED|
+                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                    -- DATASOURCE_SCAN  |PARTITIONED|
                                       -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                        -- DATASOURCE_SCAN  |PARTITIONED|
-                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                            -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
\ No newline at end of file
+                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/optimizerts/results/skip-index/skip-secondary-btree-index-search-in-delete.plan b/asterix-app/src/test/resources/optimizerts/results/skip-index/skip-secondary-btree-index-search-in-delete.plan
index c882d81..31de832 100644
--- a/asterix-app/src/test/resources/optimizerts/results/skip-index/skip-secondary-btree-index-search-in-delete.plan
+++ b/asterix-app/src/test/resources/optimizerts/results/skip-index/skip-secondary-btree-index-search-in-delete.plan
@@ -11,11 +11,9 @@
                     -- MATERIALIZE  |PARTITIONED|
                       -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                         -- ASSIGN  |PARTITIONED|
-                          -- STREAM_PROJECT  |PARTITIONED|
-                            -- ASSIGN  |PARTITIONED|
-                              -- STREAM_SELECT  |PARTITIONED|
-                                -- ASSIGN  |PARTITIONED|
+                          -- STREAM_SELECT  |PARTITIONED|
+                            -- STREAM_PROJECT  |PARTITIONED|
+                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                -- DATASOURCE_SCAN  |PARTITIONED|
                                   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                    -- DATASOURCE_SCAN  |PARTITIONED|
-                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterix-app/src/test/resources/optimizerts/results/skip-index/skip-word-index-search-in-delete.plan b/asterix-app/src/test/resources/optimizerts/results/skip-index/skip-word-index-search-in-delete.plan
index c882d81..31de832 100644
--- a/asterix-app/src/test/resources/optimizerts/results/skip-index/skip-word-index-search-in-delete.plan
+++ b/asterix-app/src/test/resources/optimizerts/results/skip-index/skip-word-index-search-in-delete.plan
@@ -11,11 +11,9 @@
                     -- MATERIALIZE  |PARTITIONED|
                       -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                         -- ASSIGN  |PARTITIONED|
-                          -- STREAM_PROJECT  |PARTITIONED|
-                            -- ASSIGN  |PARTITIONED|
-                              -- STREAM_SELECT  |PARTITIONED|
-                                -- ASSIGN  |PARTITIONED|
+                          -- STREAM_SELECT  |PARTITIONED|
+                            -- STREAM_PROJECT  |PARTITIONED|
+                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                -- DATASOURCE_SCAN  |PARTITIONED|
                                   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                    -- DATASOURCE_SCAN  |PARTITIONED|
-                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterix-app/src/test/resources/runtimets/queries/open-index-enforced/error-checking/enforced-field-name-collision/enforced-field-name-collision.1.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/open-index-enforced/error-checking/index-type-collision/index-type-collision.1.ddl.aql
similarity index 87%
rename from asterix-app/src/test/resources/runtimets/queries/open-index-enforced/error-checking/enforced-field-name-collision/enforced-field-name-collision.1.ddl.aql
rename to asterix-app/src/test/resources/runtimets/queries/open-index-enforced/error-checking/index-type-collision/index-type-collision.1.ddl.aql
index 853386d..84d35b7 100644
--- a/asterix-app/src/test/resources/runtimets/queries/open-index-enforced/error-checking/enforced-field-name-collision/enforced-field-name-collision.1.ddl.aql
+++ b/asterix-app/src/test/resources/runtimets/queries/open-index-enforced/error-checking/index-type-collision/index-type-collision.1.ddl.aql
@@ -21,9 +21,9 @@
 use dataverse test;
 
 create type testType as open {
-   "id": int32,
-   "value": string
+   "id": int32
 }
 
 create dataset testDS(testType) primary key id;
-create index testIdx on testDS(value: string) enforced;
+create index testIdx1 on testDS(value: int32) enforced;
+create index testIdx2 on testDS(value: string) enforced;
diff --git a/asterix-app/src/test/resources/runtimets/queries/open-index-enforced/error-checking/enforced-field-name-collision/enforced-field-name-collision.1.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/open-index-enforced/error-checking/index-type-promotion-collision/index-type-promotion-collision.1.ddl.aql
similarity index 87%
copy from asterix-app/src/test/resources/runtimets/queries/open-index-enforced/error-checking/enforced-field-name-collision/enforced-field-name-collision.1.ddl.aql
copy to asterix-app/src/test/resources/runtimets/queries/open-index-enforced/error-checking/index-type-promotion-collision/index-type-promotion-collision.1.ddl.aql
index 853386d..6c47032 100644
--- a/asterix-app/src/test/resources/runtimets/queries/open-index-enforced/error-checking/enforced-field-name-collision/enforced-field-name-collision.1.ddl.aql
+++ b/asterix-app/src/test/resources/runtimets/queries/open-index-enforced/error-checking/index-type-promotion-collision/index-type-promotion-collision.1.ddl.aql
@@ -21,9 +21,9 @@
 use dataverse test;
 
 create type testType as open {
-   "id": int32,
-   "value": string
+   "id": int32
 }
 
 create dataset testDS(testType) primary key id;
-create index testIdx on testDS(value: string) enforced;
+create index testIdx1 on testDS(value: int64) enforced;
+create index testIdx2 on testDS(value: int32) enforced;
diff --git a/asterix-app/src/test/resources/runtimets/queries/open-index-enforced/error-checking/enforced-field-type-collision/enforced-field-name-collision.1.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/open-index-enforced/error-checking/record-type-collision/record-collision.1.ddl.aql
similarity index 100%
rename from asterix-app/src/test/resources/runtimets/queries/open-index-enforced/error-checking/enforced-field-type-collision/enforced-field-name-collision.1.ddl.aql
rename to asterix-app/src/test/resources/runtimets/queries/open-index-enforced/error-checking/record-type-collision/record-collision.1.ddl.aql
diff --git a/asterix-app/src/test/resources/runtimets/queries/open-index-enforced/index-selection/multi-index-composite-key/multi-index-composite-key.1.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/open-index-enforced/index-selection/multi-index-composite-key/multi-index-composite-key.1.ddl.aql
new file mode 100644
index 0000000..10a3530
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/open-index-enforced/index-selection/multi-index-composite-key/multi-index-composite-key.1.ddl.aql
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description     : Test that BTree open index is used in query plan
+ *                 : define the BTree open index on a composite key (fname,lanme)
+ *                 : predicate => where $l.fname="Julio" and $l.lname="Isa"
+ * Expected Result : Success
+ * Issue           : Issue 162
+ * Date            : 27th March 2014
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+create type Emp as closed {
+id:int64,
+fname:string,
+lname:string,
+age:int64,
+dept:string
+}
+
+create type EmpOpen as open {
+id:int64,
+fname:string,
+age:int64,
+dept:string
+}
+
+create dataset employee(Emp) primary key id;
+
+create dataset employeeOpen(EmpOpen) primary key id;
+
diff --git a/asterix-app/src/test/resources/runtimets/queries/open-index-enforced/index-selection/multi-index-composite-key/multi-index-composite-key.2.update.aql b/asterix-app/src/test/resources/runtimets/queries/open-index-enforced/index-selection/multi-index-composite-key/multi-index-composite-key.2.update.aql
new file mode 100644
index 0000000..d28adff
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/open-index-enforced/index-selection/multi-index-composite-key/multi-index-composite-key.2.update.aql
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description     : Test that BTree enforced open index is used in query plan
+ *                 : define the BTree enforced open index on a composite key (fname,lanme)
+ *                 : predicate => where $l.fname="Julio" and $l.lname="Isa"
+ * Expected Result : Success
+ * Issue           : Issue 162
+ * Date            : 27th March 2014
+ */
+
+use dataverse test;
+
+load dataset employee
+using "org.apache.asterix.external.dataset.adapter.NCFileSystemAdapter"
+(("path"="nc1://data/names.adm"),("format"="delimited-text"),("delimiter"="|"));
+
+insert into dataset employeeOpen (
+  for $x in dataset employee return $x
+);
diff --git a/asterix-app/src/test/resources/runtimets/queries/open-index-enforced/error-checking/enforced-field-name-collision/enforced-field-name-collision.1.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/open-index-enforced/index-selection/multi-index-composite-key/multi-index-composite-key.3.ddl.aql
similarity index 60%
copy from asterix-app/src/test/resources/runtimets/queries/open-index-enforced/error-checking/enforced-field-name-collision/enforced-field-name-collision.1.ddl.aql
copy to asterix-app/src/test/resources/runtimets/queries/open-index-enforced/index-selection/multi-index-composite-key/multi-index-composite-key.3.ddl.aql
index 853386d..11b0baa 100644
--- a/asterix-app/src/test/resources/runtimets/queries/open-index-enforced/error-checking/enforced-field-name-collision/enforced-field-name-collision.1.ddl.aql
+++ b/asterix-app/src/test/resources/runtimets/queries/open-index-enforced/index-selection/multi-index-composite-key/multi-index-composite-key.3.ddl.aql
@@ -16,14 +16,18 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-drop dataverse test if exists;
-create dataverse test;
+/*
+ * Description     : Test that BTree enforced open index is used in query plan
+ *                 : define the BTree enforced open index on a composite key (fname,lanme)
+ *                 : predicate => where $l.fname="Julio" and $l.lname="Isa"
+ * Expected Result : Success
+ * Issue           : Issue 162
+ * Date            : 27th March 2014
+ */
+
 use dataverse test;
 
-create type testType as open {
-   "id": int32,
-   "value": string
-}
+// create secondary index
 
-create dataset testDS(testType) primary key id;
-create index testIdx on testDS(value: string) enforced;
+create index idx_employee_f_l_name on employeeOpen(fname,lname:string) enforced;
+create index idx_employee_l_f_name on employeeOpen(lname:string, fname) enforced;
diff --git a/asterix-app/src/test/resources/runtimets/queries/open-index-enforced/error-checking/enforced-field-name-collision/enforced-field-name-collision.1.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/open-index-enforced/index-selection/multi-index-composite-key/multi-index-composite-key.4.query.aql
similarity index 60%
copy from asterix-app/src/test/resources/runtimets/queries/open-index-enforced/error-checking/enforced-field-name-collision/enforced-field-name-collision.1.ddl.aql
copy to asterix-app/src/test/resources/runtimets/queries/open-index-enforced/index-selection/multi-index-composite-key/multi-index-composite-key.4.query.aql
index 853386d..b1f2adf 100644
--- a/asterix-app/src/test/resources/runtimets/queries/open-index-enforced/error-checking/enforced-field-name-collision/enforced-field-name-collision.1.ddl.aql
+++ b/asterix-app/src/test/resources/runtimets/queries/open-index-enforced/index-selection/multi-index-composite-key/multi-index-composite-key.4.query.aql
@@ -16,14 +16,23 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-drop dataverse test if exists;
-create dataverse test;
+/*
+ * Description     : Test that BTree enforced open index is used in query plan
+ *                 : define the BTree enforced open index on a composite key (fname,lanme)
+ *                 : predicate => where $l.fname="Julio" and $l.lname="Isa"
+ * Expected Result : Success
+ * Issue           : Issue 162
+ * Date            : 27th March 2014
+ */
+
 use dataverse test;
 
-create type testType as open {
-   "id": int32,
-   "value": string
+for $l in dataset('employeeOpen')
+where $l.fname="Julio" and $l.lname="Isa"
+return {
+  "id": $l.id,
+  "fname": $l.fname,
+  "lname": $l.lname,
+  "age": $l.age,
+  "dept": $l.dept
 }
-
-create dataset testDS(testType) primary key id;
-create index testIdx on testDS(value: string) enforced;
diff --git a/asterix-app/src/test/resources/runtimets/queries/open-index-enforced/error-checking/enforced-field-name-collision/enforced-field-name-collision.1.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/open-index-enforced/index-selection/multi-index-composite-key/multi-index-composite-key.5.ddl.aql
similarity index 65%
copy from asterix-app/src/test/resources/runtimets/queries/open-index-enforced/error-checking/enforced-field-name-collision/enforced-field-name-collision.1.ddl.aql
copy to asterix-app/src/test/resources/runtimets/queries/open-index-enforced/index-selection/multi-index-composite-key/multi-index-composite-key.5.ddl.aql
index 853386d..4eaf8e8 100644
--- a/asterix-app/src/test/resources/runtimets/queries/open-index-enforced/error-checking/enforced-field-name-collision/enforced-field-name-collision.1.ddl.aql
+++ b/asterix-app/src/test/resources/runtimets/queries/open-index-enforced/index-selection/multi-index-composite-key/multi-index-composite-key.5.ddl.aql
@@ -16,14 +16,17 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-drop dataverse test if exists;
-create dataverse test;
+/*
+ * Description     : Test that BTree enforced open index is used in query plan
+ *                 : define the BTree enforced open index on a composite key (fname,lanme)
+ *                 : predicate => where $l.fname="Julio" and $l.lname="Isa"
+ * Expected Result : Success
+ * Issue           : Issue 162
+ * Date            : 27th March 2014
+ */
+
 use dataverse test;
 
-create type testType as open {
-   "id": int32,
-   "value": string
-}
+// create secondary index
 
-create dataset testDS(testType) primary key id;
-create index testIdx on testDS(value: string) enforced;
+drop index employeeOpen.idx_employee_f_l_name;
diff --git a/asterix-app/src/test/resources/runtimets/queries/open-index-enforced/error-checking/enforced-field-name-collision/enforced-field-name-collision.1.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/open-index-enforced/index-selection/multi-index-composite-key/multi-index-composite-key.6.query.aql
similarity index 60%
copy from asterix-app/src/test/resources/runtimets/queries/open-index-enforced/error-checking/enforced-field-name-collision/enforced-field-name-collision.1.ddl.aql
copy to asterix-app/src/test/resources/runtimets/queries/open-index-enforced/index-selection/multi-index-composite-key/multi-index-composite-key.6.query.aql
index 853386d..b1f2adf 100644
--- a/asterix-app/src/test/resources/runtimets/queries/open-index-enforced/error-checking/enforced-field-name-collision/enforced-field-name-collision.1.ddl.aql
+++ b/asterix-app/src/test/resources/runtimets/queries/open-index-enforced/index-selection/multi-index-composite-key/multi-index-composite-key.6.query.aql
@@ -16,14 +16,23 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-drop dataverse test if exists;
-create dataverse test;
+/*
+ * Description     : Test that BTree enforced open index is used in query plan
+ *                 : define the BTree enforced open index on a composite key (fname,lanme)
+ *                 : predicate => where $l.fname="Julio" and $l.lname="Isa"
+ * Expected Result : Success
+ * Issue           : Issue 162
+ * Date            : 27th March 2014
+ */
+
 use dataverse test;
 
-create type testType as open {
-   "id": int32,
-   "value": string
+for $l in dataset('employeeOpen')
+where $l.fname="Julio" and $l.lname="Isa"
+return {
+  "id": $l.id,
+  "fname": $l.fname,
+  "lname": $l.lname,
+  "age": $l.age,
+  "dept": $l.dept
 }
-
-create dataset testDS(testType) primary key id;
-create index testIdx on testDS(value: string) enforced;
diff --git a/asterix-app/src/test/resources/runtimets/queries/open-index-enforced/error-checking/enforced-field-name-collision/enforced-field-name-collision.1.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/open-index-enforced/index-selection/multi-index/multi-index.1.ddl.aql
similarity index 68%
copy from asterix-app/src/test/resources/runtimets/queries/open-index-enforced/error-checking/enforced-field-name-collision/enforced-field-name-collision.1.ddl.aql
copy to asterix-app/src/test/resources/runtimets/queries/open-index-enforced/index-selection/multi-index/multi-index.1.ddl.aql
index 853386d..0efffb2 100644
--- a/asterix-app/src/test/resources/runtimets/queries/open-index-enforced/error-checking/enforced-field-name-collision/enforced-field-name-collision.1.ddl.aql
+++ b/asterix-app/src/test/resources/runtimets/queries/open-index-enforced/index-selection/multi-index/multi-index.1.ddl.aql
@@ -20,10 +20,24 @@
 create dataverse test;
 use dataverse test;
 
-create type testType as open {
-   "id": int32,
-   "value": string
+create type DBLPType as closed {
+  id: int64,
+  dblpid: string,
+  title: string,
+  authors: string,
+  misc: string
 }
 
-create dataset testDS(testType) primary key id;
-create index testIdx on testDS(value: string) enforced;
+create type DBLPOpenType as open {
+  id: int64,
+  dblpid: string,
+  authors: string,
+  misc: string
+}
+
+create nodegroup group1 if not exists on nc1, nc2;
+
+create dataset DBLP(DBLPType)
+  primary key id on group1;
+create dataset DBLPOpen(DBLPOpenType)
+  primary key id on group1;
diff --git a/asterix-app/src/test/resources/runtimets/queries/open-index-enforced/error-checking/enforced-field-name-collision/enforced-field-name-collision.1.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/open-index-enforced/index-selection/multi-index/multi-index.2.update.aql
similarity index 62%
copy from asterix-app/src/test/resources/runtimets/queries/open-index-enforced/error-checking/enforced-field-name-collision/enforced-field-name-collision.1.ddl.aql
copy to asterix-app/src/test/resources/runtimets/queries/open-index-enforced/index-selection/multi-index/multi-index.2.update.aql
index 853386d..3ee5d4e 100644
--- a/asterix-app/src/test/resources/runtimets/queries/open-index-enforced/error-checking/enforced-field-name-collision/enforced-field-name-collision.1.ddl.aql
+++ b/asterix-app/src/test/resources/runtimets/queries/open-index-enforced/index-selection/multi-index/multi-index.2.update.aql
@@ -16,14 +16,25 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-drop dataverse test if exists;
-create dataverse test;
 use dataverse test;
 
-create type testType as open {
-   "id": int32,
-   "value": string
-}
+load dataset DBLP
+using "org.apache.asterix.external.dataset.adapter.NCFileSystemAdapter"
+(("path"="nc1://data/dblp-small/dblp-small-id.txt"),("format"="delimited-text"),("delimiter"=":")) pre-sorted;
 
-create dataset testDS(testType) primary key id;
-create index testIdx on testDS(value: string) enforced;
+insert into dataset test.DBLPOpen (
+	for $x in dataset test.DBLP
+	where $x.id <= 50
+	return $x
+);
+
+insert into dataset test.DBLPOpen (
+  for $c in dataset test.DBLP
+  where $c.id > 50
+  return {
+    "id": $c.id,
+    "dblpid": $c.dblpid,
+    "authors": $c.authors,
+    "misc": $c.misc
+  }
+);
diff --git a/asterix-app/src/test/resources/runtimets/queries/open-index-enforced/error-checking/enforced-field-name-collision/enforced-field-name-collision.1.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/open-index-enforced/index-selection/multi-index/multi-index.3.ddl.aql
similarity index 78%
copy from asterix-app/src/test/resources/runtimets/queries/open-index-enforced/error-checking/enforced-field-name-collision/enforced-field-name-collision.1.ddl.aql
copy to asterix-app/src/test/resources/runtimets/queries/open-index-enforced/index-selection/multi-index/multi-index.3.ddl.aql
index 853386d..98338b2 100644
--- a/asterix-app/src/test/resources/runtimets/queries/open-index-enforced/error-checking/enforced-field-name-collision/enforced-field-name-collision.1.ddl.aql
+++ b/asterix-app/src/test/resources/runtimets/queries/open-index-enforced/index-selection/multi-index/multi-index.3.ddl.aql
@@ -16,14 +16,9 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-drop dataverse test if exists;
-create dataverse test;
+
 use dataverse test;
 
-create type testType as open {
-   "id": int32,
-   "value": string
-}
-
-create dataset testDS(testType) primary key id;
-create index testIdx on testDS(value: string) enforced;
+create index ngram_index on DBLPOpen(title:string) type ngram(3) enforced;
+create index keyword_index on DBLPOpen(title:string) type keyword enforced;
+create index btree_index on DBLPOpen(title:string) enforced;
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/open-index-enforced/error-checking/enforced-field-name-collision/enforced-field-name-collision.1.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/open-index-enforced/index-selection/multi-index/multi-index.4.query.aql
similarity index 78%
copy from asterix-app/src/test/resources/runtimets/queries/open-index-enforced/error-checking/enforced-field-name-collision/enforced-field-name-collision.1.ddl.aql
copy to asterix-app/src/test/resources/runtimets/queries/open-index-enforced/index-selection/multi-index/multi-index.4.query.aql
index 853386d..775e97d 100644
--- a/asterix-app/src/test/resources/runtimets/queries/open-index-enforced/error-checking/enforced-field-name-collision/enforced-field-name-collision.1.ddl.aql
+++ b/asterix-app/src/test/resources/runtimets/queries/open-index-enforced/index-selection/multi-index/multi-index.4.query.aql
@@ -16,14 +16,16 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-drop dataverse test if exists;
-create dataverse test;
 use dataverse test;
 
-create type testType as open {
-   "id": int32,
-   "value": string
+for $o in dataset('DBLPOpen')
+where contains($o.title, "Multimedia")
+order by $o.id
+return {
+  "id": $o.id,
+  "dblpid": $o.dblpid,
+  "title": $o.title,
+  "authors": $o.authors,
+  "misc": $o.misc
 }
 
-create dataset testDS(testType) primary key id;
-create index testIdx on testDS(value: string) enforced;
diff --git a/asterix-app/src/test/resources/runtimets/queries/open-index-enforced/error-checking/enforced-field-name-collision/enforced-field-name-collision.1.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/open-index-enforced/index-selection/multi-index/multi-index.5.query.aql
similarity index 74%
copy from asterix-app/src/test/resources/runtimets/queries/open-index-enforced/error-checking/enforced-field-name-collision/enforced-field-name-collision.1.ddl.aql
copy to asterix-app/src/test/resources/runtimets/queries/open-index-enforced/index-selection/multi-index/multi-index.5.query.aql
index 853386d..b7423e2 100644
--- a/asterix-app/src/test/resources/runtimets/queries/open-index-enforced/error-checking/enforced-field-name-collision/enforced-field-name-collision.1.ddl.aql
+++ b/asterix-app/src/test/resources/runtimets/queries/open-index-enforced/index-selection/multi-index/multi-index.5.query.aql
@@ -16,14 +16,16 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-drop dataverse test if exists;
-create dataverse test;
+
 use dataverse test;
 
-create type testType as open {
-   "id": int32,
-   "value": string
+for $o in dataset('DBLPOpen')
+let $jacc := similarity-jaccard-check(word-tokens($o.title), word-tokens("Transactions for Cooperative Environments"), 0.5f)
+where $jacc[0]
+return {
+  "id": $o.id,
+  "dblpid": $o.dblpid,
+  "title": $o.title,
+  "authors": $o.authors,
+  "misc": $o.misc
 }
-
-create dataset testDS(testType) primary key id;
-create index testIdx on testDS(value: string) enforced;
diff --git a/asterix-app/src/test/resources/runtimets/queries/open-index-enforced/error-checking/enforced-field-name-collision/enforced-field-name-collision.1.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/open-index-enforced/index-selection/multi-index/multi-index.6.query.aql
similarity index 77%
copy from asterix-app/src/test/resources/runtimets/queries/open-index-enforced/error-checking/enforced-field-name-collision/enforced-field-name-collision.1.ddl.aql
copy to asterix-app/src/test/resources/runtimets/queries/open-index-enforced/index-selection/multi-index/multi-index.6.query.aql
index 853386d..0e1ef29 100644
--- a/asterix-app/src/test/resources/runtimets/queries/open-index-enforced/error-checking/enforced-field-name-collision/enforced-field-name-collision.1.ddl.aql
+++ b/asterix-app/src/test/resources/runtimets/queries/open-index-enforced/index-selection/multi-index/multi-index.6.query.aql
@@ -16,14 +16,15 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-drop dataverse test if exists;
-create dataverse test;
 use dataverse test;
 
-create type testType as open {
-   "id": int32,
-   "value": string
+for $o in dataset('DBLPOpen')
+where $o.title = "Multimedia Information Systems  Issues and Approaches."
+order by $o.title
+return {
+  "id": $o.id,
+  "dblpid": $o.dblpid,
+  "title": $o.title,
+  "authors": $o.authors,
+  "misc": $o.misc
 }
-
-create dataset testDS(testType) primary key id;
-create index testIdx on testDS(value: string) enforced;
diff --git a/asterix-app/src/test/resources/runtimets/results/open-index-enforced/index-selection/multi-index-composite-key/multi-index-composite-key.1.adm b/asterix-app/src/test/resources/runtimets/results/open-index-enforced/index-selection/multi-index-composite-key/multi-index-composite-key.1.adm
new file mode 100644
index 0000000..1e19d0d
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/open-index-enforced/index-selection/multi-index-composite-key/multi-index-composite-key.1.adm
@@ -0,0 +1,2 @@
+[ { "id": 881, "fname": "Julio", "age": 38, "dept": "Sales", "lname": "Isa" }
+ ]
diff --git a/asterix-app/src/test/resources/runtimets/results/open-index-enforced/index-selection/multi-index-composite-key/multi-index-composite-key.2.adm b/asterix-app/src/test/resources/runtimets/results/open-index-enforced/index-selection/multi-index-composite-key/multi-index-composite-key.2.adm
new file mode 100644
index 0000000..1e19d0d
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/open-index-enforced/index-selection/multi-index-composite-key/multi-index-composite-key.2.adm
@@ -0,0 +1,2 @@
+[ { "id": 881, "fname": "Julio", "age": 38, "dept": "Sales", "lname": "Isa" }
+ ]
diff --git a/asterix-app/src/test/resources/runtimets/results/open-index-enforced/index-selection/multi-index/multi-index.1.adm b/asterix-app/src/test/resources/runtimets/results/open-index-enforced/index-selection/multi-index/multi-index.1.adm
new file mode 100644
index 0000000..61ee356
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/open-index-enforced/index-selection/multi-index/multi-index.1.adm
@@ -0,0 +1,2 @@
+[ { "id": 4, "dblpid": "books/acm/kim95/ChristodoulakisK95", "authors": "Stavros Christodoulakis Leonidas Koveos", "misc": "2002-01-03 318-337 1995 Modern Database Systems db/books/collections/kim95.html#ChristodoulakisK95", "title": "Multimedia Information Systems  Issues and Approaches." }
+ ]
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/results/open-index-enforced/index-selection/multi-index/multi-index.2.adm b/asterix-app/src/test/resources/runtimets/results/open-index-enforced/index-selection/multi-index/multi-index.2.adm
new file mode 100644
index 0000000..d7647ff
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/open-index-enforced/index-selection/multi-index/multi-index.2.adm
@@ -0,0 +1,2 @@
+[ { "id": 9, "dblpid": "books/acm/kim95/Kaiser95", "authors": "Gail E. Kaiser", "misc": "2002-01-03 409-433 1995 Modern Database Systems db/books/collections/kim95.html#Kaiser95", "title": "Cooperative Transactions for Multiuser Environments." }
+ ]
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/results/open-index-enforced/index-selection/multi-index/multi-index.3.adm b/asterix-app/src/test/resources/runtimets/results/open-index-enforced/index-selection/multi-index/multi-index.3.adm
new file mode 100644
index 0000000..61ee356
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/open-index-enforced/index-selection/multi-index/multi-index.3.adm
@@ -0,0 +1,2 @@
+[ { "id": 4, "dblpid": "books/acm/kim95/ChristodoulakisK95", "authors": "Stavros Christodoulakis Leonidas Koveos", "misc": "2002-01-03 318-337 1995 Modern Database Systems db/books/collections/kim95.html#ChristodoulakisK95", "title": "Multimedia Information Systems  Issues and Approaches." }
+ ]
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/testsuite.xml b/asterix-app/src/test/resources/runtimets/testsuite.xml
index d706feb..ce62d38 100644
--- a/asterix-app/src/test/resources/runtimets/testsuite.xml
+++ b/asterix-app/src/test/resources/runtimets/testsuite.xml
@@ -2928,15 +2928,22 @@
     <test-group name="open-index-enforced">
         <test-group FilePath="open-index-enforced/error-checking">
             <test-case FilePath="open-index-enforced/error-checking">
-                <compilation-unit name="enforced-field-name-collision">
-                    <output-dir compare="Text">enforced-field-name-collision</output-dir>
+                <compilation-unit name="index-on-closed-type">
+                    <output-dir compare="Text">index-on-closed-type</output-dir>
                     <expected-error>org.apache.hyracks.algebricks.common.exceptions.AlgebricksException
                     </expected-error>
                 </compilation-unit>
             </test-case>
             <test-case FilePath="open-index-enforced/error-checking">
-                <compilation-unit name="enforced-field-type-collision">
-                    <output-dir compare="Text">enforced-field-type-collision</output-dir>
+                <compilation-unit name="index-type-collision">
+                    <output-dir compare="Text">index-type-collision</output-dir>
+                    <expected-error>org.apache.asterix.common.exceptions.AsterixException
+                    </expected-error>
+                </compilation-unit>
+            </test-case>
+            <test-case FilePath="open-index-enforced/error-checking">
+                <compilation-unit name="index-type-promotion-collision">
+                    <output-dir compare="Text">index-type-promotion-collision</output-dir>
                     <expected-error>org.apache.asterix.common.exceptions.AsterixException</expected-error>
                 </compilation-unit>
             </test-case>
@@ -2948,10 +2955,9 @@
                 </compilation-unit>
             </test-case>
             <test-case FilePath="open-index-enforced/error-checking">
-                <compilation-unit name="index-on-closed-type">
-                    <output-dir compare="Text">index-on-closed-type</output-dir>
-                    <expected-error>org.apache.hyracks.algebricks.common.exceptions.AlgebricksException
-                    </expected-error>
+                <compilation-unit name="record-type-collision">
+                    <output-dir compare="Text">record-type-collision</output-dir>
+                    <expected-error>org.apache.asterix.common.exceptions.AsterixException</expected-error>
                 </compilation-unit>
             </test-case>
         </test-group>
@@ -3086,6 +3092,16 @@
                 </compilation-unit>
             </test-case>
             <test-case FilePath="open-index-enforced/index-selection">
+                <compilation-unit name="multi-index">
+                    <output-dir compare="Text">multi-index</output-dir>
+                </compilation-unit>
+            </test-case>
+            <test-case FilePath="open-index-enforced/index-selection">
+                <compilation-unit name="multi-index-composite-key">
+                    <output-dir compare="Text">multi-index-composite-key</output-dir>
+                </compilation-unit>
+            </test-case>
+            <test-case FilePath="open-index-enforced/index-selection">
                 <compilation-unit name="orders-index-custkey">
                     <output-dir compare="Text">orders-index-custkey</output-dir>
                 </compilation-unit>
diff --git a/pom.xml b/pom.xml
index 62a8ce3..c9e26dd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -53,8 +53,8 @@
         <global.test.includes>**/*TestSuite.java,**/*Test.java,${execution.tests}</global.test.includes>
         <global.test.excludes>${optimizer.tests},${metadata.tests},${invalid.tests},${repeated.tests}</global.test.excludes>
     <!-- Versions under dependencymanagement or used in many projects via properties -->
-        <algebricks.version>0.2.16-incubating</algebricks.version>
-        <hyracks.version>0.2.16-incubating</hyracks.version>
+        <algebricks.version>0.2.17-SNAPSHOT</algebricks.version>
+        <hyracks.version>0.2.17-SNAPSHOT</hyracks.version>
         <hadoop.version>2.2.0</hadoop.version>
         <junit.version>4.11</junit.version>
         <commons.io.version>2.4</commons.io.version>