[ASTERIXDB-1967][IDX] Pass correct override flag to Files Index

- user model changes: no
- storage format changes: no
- interface changes: no

details:
- When creating an external index, overridesFieldTypes was passed
  incorrectly to the files index. Since the files index is not an open
  index, the flag is always set to false.

Change-Id: I0ccc216bb8bdb2e44ec7d8583faaeec452978f9e
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1859
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <hubailmor@gmail.com>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 79d660c..2967a38 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -690,7 +690,8 @@
         }
     }
 
-    protected void validateIfResourceIsActiveInFeed(Dataset dataset) throws CompilationException {
+    protected static void validateIfResourceIsActiveInFeed(ICcApplicationContext appCtx, Dataset dataset)
+            throws CompilationException {
         StringBuilder builder = null;
         ActiveLifecycleListener activeListener = (ActiveLifecycleListener) appCtx.getActiveLifecycleListener();
         ActiveJobNotificationHandler activeEventHandler = activeListener.getNotificationHandler();
@@ -722,8 +723,8 @@
                     hints.get(DatasetNodegroupCardinalityHint.NAME));
             boolean valid = validation.first;
             if (!valid) {
-                throw new CompilationException("Incorrect use of hint '" + DatasetNodegroupCardinalityHint.NAME +
-                        "': " + validation.second);
+                throw new CompilationException(
+                        "Incorrect use of hint '" + DatasetNodegroupCardinalityHint.NAME + "': " + validation.second);
             } else {
                 nodegroupCardinality = Integer.parseInt(hints.get(DatasetNodegroupCardinalityHint.NAME));
             }
@@ -737,26 +738,17 @@
 
     protected void handleCreateIndexStatement(MetadataProvider metadataProvider, Statement stmt,
             IHyracksClientConnection hcc) throws Exception {
-        ProgressState progress = ProgressState.NO_PROGRESS;
         CreateIndexStatement stmtCreateIndex = (CreateIndexStatement) stmt;
         String dataverseName = getActiveDataverse(stmtCreateIndex.getDataverseName());
         String datasetName = stmtCreateIndex.getDatasetName().getValue();
         List<Integer> keySourceIndicators = stmtCreateIndex.getFieldSourceIndicators();
-
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-        boolean bActiveTxn = true;
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
         MetadataLockManager.INSTANCE.createIndexBegin(metadataProvider.getLocks(), dataverseName,
                 dataverseName + "." + datasetName);
         String indexName = null;
-        JobSpecification spec = null;
         Dataset ds = null;
         // For external datasets
-        List<ExternalFile> externalFilesSnapshot = null;
-        boolean firstExternalDatasetIndex = false;
-        boolean filesIndexReplicated = false;
-        Index filesIndex = null;
-        boolean datasetLocked = false;
         Index index = null;
         try {
             ds = metadataProvider.findDataset(dataverseName, datasetName);
@@ -768,6 +760,14 @@
             indexName = stmtCreateIndex.getIndexName().getValue();
             index = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), dataverseName,
                     datasetName, indexName);
+            if (index != null) {
+                if (stmtCreateIndex.getIfNotExists()) {
+                    MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+                    return;
+                } else {
+                    throw new AlgebricksException("An index with this name " + indexName + " already exists.");
+                }
+            }
             Datatype dt = MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(),
                     ds.getItemTypeDataverseName(), ds.getItemTypeName());
             ARecordType aRecordType = (ARecordType) dt.getDatatype();
@@ -831,16 +831,6 @@
 
             validateIndexKeyFields(stmtCreateIndex, keySourceIndicators, aRecordType, metaRecordType, indexFields,
                     indexFieldTypes);
-
-            if (index != null) {
-                if (stmtCreateIndex.getIfNotExists()) {
-                    MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-                    return;
-                } else {
-                    throw new AlgebricksException("An index with this name " + indexName + " already exists.");
-                }
-            }
-
             // Checks whether a user is trying to create an inverted secondary index on a dataset
             // with a variable-length primary key.
             // Currently, we do not support this. Therefore, as a temporary solution, we print an
@@ -864,8 +854,30 @@
                 }
             }
 
+            Index newIndex = new Index(dataverseName, datasetName, indexName, stmtCreateIndex.getIndexType(),
+                    indexFields, keySourceIndicators, indexFieldTypes, stmtCreateIndex.getGramLength(),
+                    overridesFieldTypes, stmtCreateIndex.isEnforced(), false, MetadataUtil.PENDING_ADD_OP);
+            doCreateIndex(hcc, metadataProvider, ds, newIndex, jobFlags);
+        } finally {
+            metadataProvider.getLocks().unlock();
+        }
+    }
+
+    public static void doCreateIndex(IHyracksClientConnection hcc, MetadataProvider metadataProvider, Dataset ds,
+            Index index, EnumSet<JobFlag> jobFlags) throws Exception {
+        ProgressState progress = ProgressState.NO_PROGRESS;
+        boolean bActiveTxn = true;
+        Index filesIndex = null;
+        boolean firstExternalDatasetIndex = false;
+        boolean datasetLocked = false;
+        List<ExternalFile> externalFilesSnapshot;
+        MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
+        JobSpecification spec;
+        boolean filesIndexReplicated = false;
+        try {
+            index.setPendingOp(MetadataUtil.PENDING_ADD_OP);
             if (ds.getDatasetType() == DatasetType.INTERNAL) {
-                validateIfResourceIsActiveInFeed(ds);
+                validateIfResourceIsActiveInFeed(metadataProvider.getApplicationContext(), ds);
             } else {
                 // External dataset
                 // Check if the dataset is indexible
@@ -875,13 +887,14 @@
                                     + " Adapter can't be indexed");
                 }
                 // Check if the name of the index is valid
-                if (!ExternalIndexingOperations.isValidIndexName(datasetName, indexName)) {
+                if (!ExternalIndexingOperations.isValidIndexName(index.getDatasetName(), index.getIndexName())) {
                     throw new AlgebricksException("external dataset index name is invalid");
                 }
 
                 // Check if the files index exist
-                filesIndex = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), dataverseName,
-                        datasetName, IndexingConstants.getFilesIndexName(datasetName));
+                filesIndex = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(),
+                        index.getDataverseName(), index.getDatasetName(),
+                        IndexingConstants.getFilesIndexName(index.getDatasetName()));
                 firstExternalDatasetIndex = filesIndex == null;
                 // Lock external dataset
                 ExternalDatasetsRegistry.INSTANCE.buildIndexBegin(ds, firstExternalDatasetIndex);
@@ -889,7 +902,8 @@
                 if (firstExternalDatasetIndex) {
                     // Verify that no one has created an index before we acquire the lock
                     filesIndex = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(),
-                            dataverseName, datasetName, IndexingConstants.getFilesIndexName(datasetName));
+                            index.getDataverseName(), index.getDatasetName(),
+                            IndexingConstants.getFilesIndexName(index.getDatasetName()));
                     if (filesIndex != null) {
                         ExternalDatasetsRegistry.INSTANCE.buildIndexEnd(ds, firstExternalDatasetIndex);
                         firstExternalDatasetIndex = false;
@@ -900,9 +914,10 @@
                     // Get snapshot from External File System
                     externalFilesSnapshot = ExternalIndexingOperations.getSnapshotFromExternalFileSystem(ds);
                     // Add an entry for the files index
-                    filesIndex = new Index(dataverseName, datasetName, IndexingConstants.getFilesIndexName(datasetName),
-                            IndexType.BTREE, ExternalIndexingOperations.FILE_INDEX_FIELD_NAMES, null,
-                            ExternalIndexingOperations.FILE_INDEX_FIELD_TYPES, overridesFieldTypes, false, false,
+                    filesIndex = new Index(index.getDataverseName(), index.getDatasetName(),
+                            IndexingConstants.getFilesIndexName(index.getDatasetName()), IndexType.BTREE,
+                            ExternalIndexingOperations.FILE_INDEX_FIELD_NAMES, null,
+                            ExternalIndexingOperations.FILE_INDEX_FIELD_TYPES, false, false, false,
                             MetadataUtil.PENDING_ADD_OP);
                     MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), filesIndex);
                     // Add files to the external files index
@@ -917,48 +932,43 @@
                                 "Failed to create job spec for replicating Files Index For external dataset");
                     }
                     filesIndexReplicated = true;
-                    runJob(hcc, spec);
+                    runJob(hcc, spec, jobFlags);
                 }
             }
 
             // check whether there exists another enforced index on the same field
-            if (stmtCreateIndex.isEnforced()) {
-                List<Index> indexes = MetadataManager.INSTANCE
-                        .getDatasetIndexes(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName);
+            if (index.isEnforced()) {
+                List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(
+                        metadataProvider.getMetadataTxnContext(), index.getDataverseName(), index.getDatasetName());
                 for (Index existingIndex : indexes) {
-                    if (existingIndex.getKeyFieldNames().equals(indexFields)
-                            && !existingIndex.getKeyFieldTypes().equals(indexFieldTypes)
+                    if (existingIndex.getKeyFieldNames().equals(index.getKeyFieldNames())
+                            && !existingIndex.getKeyFieldTypes().equals(index.getKeyFieldTypes())
                             && existingIndex.isEnforced()) {
-                        throw new CompilationException("Cannot create index " + indexName + " , enforced index "
-                                + existingIndex.getIndexName() + " on field \"" + StringUtils.join(indexFields, ',')
-                                + "\" is already defined with type \"" + existingIndex.getKeyFieldTypes() + "\"");
+                        throw new CompilationException("Cannot create index " + index.getIndexName()
+                                + " , enforced index " + existingIndex.getIndexName() + " on field \""
+                                + StringUtils.join(index.getKeyFieldNames(), ',') + "\" is already defined with type \""
+                                + existingIndex.getKeyFieldTypes() + "\"");
                     }
                 }
             }
-
             // #. add a new index with PendingAddOp
-            index = new Index(dataverseName, datasetName, indexName, stmtCreateIndex.getIndexType(), indexFields,
-                    keySourceIndicators, indexFieldTypes, stmtCreateIndex.getGramLength(), overridesFieldTypes,
-                    stmtCreateIndex.isEnforced(), false, MetadataUtil.PENDING_ADD_OP);
             MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), index);
-
             // #. prepare to create the index artifact in NC.
             spec = IndexUtil.buildSecondaryIndexCreationJobSpec(ds, index, metadataProvider);
             if (spec == null) {
-                throw new CompilationException("Failed to create job spec for creating index '"
-                        + stmtCreateIndex.getDatasetName() + "." + stmtCreateIndex.getIndexName() + "'");
+                throw new CompilationException("Failed to create job spec for creating index '" + ds.getDatasetName()
+                        + "." + index.getIndexName() + "'");
             }
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
-
             progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
-
             // #. create the index artifact in NC.
-            runJob(hcc, spec);
+            runJob(hcc, spec, jobFlags);
 
             // #. flush the internal dataset for correlated policy
             if (ds.isCorrelated() && ds.getDatasetType() == DatasetType.INTERNAL) {
-                FlushDatasetUtil.flushDataset(hcc, metadataProvider, dataverseName, datasetName, datasetName);
+                FlushDatasetUtil.flushDataset(hcc, metadataProvider, index.getDataverseName(), index.getDatasetName(),
+                        index.getDatasetName());
             }
 
             mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
@@ -970,7 +980,7 @@
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
 
-            runJob(hcc, spec);
+            runJob(hcc, spec, jobFlags);
 
             // #. begin new metadataTxn
             mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
@@ -978,15 +988,15 @@
             metadataProvider.setMetadataTxnContext(mdTxnCtx);
 
             // #. add another new index with PendingNoOp after deleting the index with PendingAddOp
-            MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName,
-                    indexName);
+            MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), index.getDataverseName(),
+                    index.getDatasetName(), index.getIndexName());
             index.setPendingOp(MetadataUtil.PENDING_NO_OP);
             MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), index);
             // add another new files index with PendingNoOp after deleting the index with
             // PendingAddOp
             if (firstExternalDatasetIndex) {
-                MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName,
-                        filesIndex.getIndexName());
+                MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), index.getDataverseName(),
+                        index.getDatasetName(), filesIndex.getIndexName());
                 filesIndex.setPendingOp(MetadataUtil.PENDING_NO_OP);
                 MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), filesIndex);
                 // update transaction timestamp
@@ -994,7 +1004,6 @@
                 MetadataManager.INSTANCE.updateDataset(mdTxnCtx, ds);
             }
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-
         } catch (Exception e) {
             if (bActiveTxn) {
                 abort(e, e, mdTxnCtx);
@@ -1008,7 +1017,7 @@
                             ExternalIndexingOperations.buildDropFilesIndexJobSpec(metadataProvider, ds);
                     MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                     bActiveTxn = false;
-                    runJob(hcc, jobSpec);
+                    runJob(hcc, jobSpec, jobFlags);
                 } catch (Exception e2) {
                     e.addSuppressed(e2);
                     if (bActiveTxn) {
@@ -1027,7 +1036,7 @@
                     JobSpecification jobSpec = IndexUtil.buildDropIndexJobSpec(index, metadataProvider, ds);
                     MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                     bActiveTxn = false;
-                    runJob(hcc, jobSpec);
+                    runJob(hcc, jobSpec, jobFlags);
                 } catch (Exception e2) {
                     e.addSuppressed(e2);
                     if (bActiveTxn) {
@@ -1045,21 +1054,25 @@
                     } catch (Exception e2) {
                         e.addSuppressed(e2);
                         abort(e, e2, mdTxnCtx);
-                        throw new IllegalStateException("System is inconsistent state: pending files for("
-                                + dataverseName + "." + datasetName + ") couldn't be removed from the metadata", e);
+                        throw new IllegalStateException(
+                                "System is inconsistent state: pending files for(" + index.getDataverseName() + "."
+                                        + index.getDatasetName() + ") couldn't be removed from the metadata",
+                                e);
                     }
                     mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                     metadataProvider.setMetadataTxnContext(mdTxnCtx);
                     try {
                         // Drop the files index from metadata
-                        MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName,
-                                datasetName, IndexingConstants.getFilesIndexName(datasetName));
+                        MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(),
+                                index.getDataverseName(), index.getDatasetName(),
+                                IndexingConstants.getFilesIndexName(index.getDatasetName()));
                         MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                     } catch (Exception e2) {
                         e.addSuppressed(e2);
                         abort(e, e2, mdTxnCtx);
-                        throw new IllegalStateException("System is inconsistent state: pending index(" + dataverseName
-                                + "." + datasetName + "." + IndexingConstants.getFilesIndexName(datasetName)
+                        throw new IllegalStateException("System is inconsistent state: pending index("
+                                + index.getDataverseName() + "." + index.getDatasetName() + "."
+                                + IndexingConstants.getFilesIndexName(index.getDatasetName())
                                 + ") couldn't be removed from the metadata", e);
                     }
                 }
@@ -1067,19 +1080,19 @@
                 mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                 metadataProvider.setMetadataTxnContext(mdTxnCtx);
                 try {
-                    MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName,
-                            datasetName, indexName);
+                    MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(),
+                            index.getDataverseName(), index.getDatasetName(), index.getIndexName());
                     MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                 } catch (Exception e2) {
                     e.addSuppressed(e2);
                     abort(e, e2, mdTxnCtx);
-                    throw new IllegalStateException("System is in inconsistent state: pending index(" + dataverseName
-                            + "." + datasetName + "." + indexName + ") couldn't be removed from the metadata", e);
+                    throw new IllegalStateException("System is in inconsistent state: pending index("
+                            + index.getDataverseName() + "." + index.getDatasetName() + "." + index.getIndexName()
+                            + ") couldn't be removed from the metadata", e);
                 }
             }
             throw e;
         } finally {
-            metadataProvider.getLocks().unlock();
             if (datasetLocked) {
                 ExternalDatasetsRegistry.INSTANCE.buildIndexEnd(ds, firstExternalDatasetIndex);
             }
@@ -2389,6 +2402,11 @@
     }
 
     private void runJob(IHyracksClientConnection hcc, JobSpecification jobSpec) throws Exception {
+        runJob(hcc, jobSpec, jobFlags);
+    }
+
+    private static void runJob(IHyracksClientConnection hcc, JobSpecification jobSpec, EnumSet<JobFlag> jobFlags)
+            throws Exception {
         JobUtils.runJob(hcc, jobSpec, jobFlags, true);
     }