[NO ISSUE][COMP] Introduce QueryTranslator.validateIndexType()

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Introduce QueryTranslator.validateIndexType() that
  can be overriden by product extensions to customize
  supported index types.

Change-Id: I000ba078579b8e0a96d0219cefad137f41abf791
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/6204
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 3906bd5..818fbd4 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -878,13 +878,14 @@
         String dataverseName = getActiveDataverse(stmtCreateIndex.getDataverseName());
         String datasetName = stmtCreateIndex.getDatasetName().getValue();
         String indexName = stmtCreateIndex.getIndexName().getValue();
+        IndexType indexType = stmtCreateIndex.getIndexType();
         List<Integer> keySourceIndicators = stmtCreateIndex.getFieldSourceIndicators();
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
         String datasetFullyQualifiedName = dataverseName + "." + datasetName;
         boolean isSecondaryPrimary = stmtCreateIndex.getFieldExprs().isEmpty();
-        Dataset ds = null;
-        Index index = null;
+        Dataset ds;
+        Index index;
         MetadataLockUtil.createIndexBegin(lockManager, metadataProvider.getLocks(), dataverseName,
                 datasetFullyQualifiedName);
         try {
@@ -894,6 +895,9 @@
                         dataverseName);
             }
 
+            DatasetType datasetType = ds.getDatasetType();
+            validateIndexType(datasetType, indexType, isSecondaryPrimary, sourceLoc);
+
             index = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), dataverseName,
                     datasetName, indexName);
             if (index != null) {
@@ -904,14 +908,11 @@
                     throw new CompilationException(ErrorCode.INDEX_EXISTS, sourceLoc, indexName);
                 }
             }
+
             // find keySourceIndicators for secondary primary index since the parser isn't aware of them
-            if (isSecondaryPrimary && ds.getDatasetType() == DatasetType.INTERNAL) {
+            if (isSecondaryPrimary && datasetType == DatasetType.INTERNAL) {
                 keySourceIndicators = ((InternalDatasetDetails) ds.getDatasetDetails()).getKeySourceIndicator();
             }
-            // disable creating secondary primary index on an external dataset
-            if (isSecondaryPrimary && ds.getDatasetType() == DatasetType.EXTERNAL) {
-                throw new AsterixException(ErrorCode.CANNOT_CREATE_SEC_PRIMARY_IDX_ON_EXT_DATASET);
-            }
             // disable creating an index on meta fields (fields with source indicator == 1 are meta fields)
             if (keySourceIndicators.stream().anyMatch(fieldSource -> fieldSource == 1) && !isSecondaryPrimary) {
                 throw new AsterixException(ErrorCode.COMPILATION_ERROR, sourceLoc,
@@ -956,9 +957,8 @@
                 if (fieldExpr.second == null) {
                     fieldType = subType.getSubFieldType(fieldExpr.first.subList(i, fieldExpr.first.size()));
                 } else {
-                    if (!stmtCreateIndex.isEnforced() && stmtCreateIndex.getIndexType() != IndexType.BTREE) {
-                        throw new AsterixException(ErrorCode.INDEX_ILLEGAL_NON_ENFORCED_TYPED, sourceLoc,
-                                stmtCreateIndex.getIndexType());
+                    if (!stmtCreateIndex.isEnforced() && indexType != IndexType.BTREE) {
+                        throw new AsterixException(ErrorCode.INDEX_ILLEGAL_NON_ENFORCED_TYPED, sourceLoc, indexType);
                     }
                     if (stmtCreateIndex.isEnforced() && !fieldExpr.second.isUnknownable()) {
                         throw new AsterixException(ErrorCode.INDEX_ILLEGAL_ENFORCED_NON_OPTIONAL, sourceLoc,
@@ -1012,10 +1012,10 @@
             // Currently, we do not support this. Therefore, as a temporary solution, we
             // print an
             // error message and stop.
-            if (stmtCreateIndex.getIndexType() == IndexType.SINGLE_PARTITION_WORD_INVIX
-                    || stmtCreateIndex.getIndexType() == IndexType.SINGLE_PARTITION_NGRAM_INVIX
-                    || stmtCreateIndex.getIndexType() == IndexType.LENGTH_PARTITIONED_WORD_INVIX
-                    || stmtCreateIndex.getIndexType() == IndexType.LENGTH_PARTITIONED_NGRAM_INVIX) {
+            if (indexType == IndexType.SINGLE_PARTITION_WORD_INVIX
+                    || indexType == IndexType.SINGLE_PARTITION_NGRAM_INVIX
+                    || indexType == IndexType.LENGTH_PARTITIONED_WORD_INVIX
+                    || indexType == IndexType.LENGTH_PARTITIONED_NGRAM_INVIX) {
                 List<List<String>> partitioningKeys = ds.getPrimaryKeys();
                 for (List<String> partitioningKey : partitioningKeys) {
                     IAType keyType = aRecordType.getSubFieldType(partitioningKey);
@@ -1032,9 +1032,9 @@
                 }
             }
 
-            Index newIndex = new Index(dataverseName, datasetName, indexName, stmtCreateIndex.getIndexType(),
-                    indexFields, keySourceIndicators, indexFieldTypes, stmtCreateIndex.getGramLength(),
-                    overridesFieldTypes, stmtCreateIndex.isEnforced(), false, MetadataUtil.PENDING_ADD_OP);
+            Index newIndex = new Index(dataverseName, datasetName, indexName, indexType, indexFields,
+                    keySourceIndicators, indexFieldTypes, stmtCreateIndex.getGramLength(), overridesFieldTypes,
+                    stmtCreateIndex.isEnforced(), false, MetadataUtil.PENDING_ADD_OP);
             doCreateIndex(hcc, metadataProvider, ds, newIndex, jobFlags, sourceLoc);
         } finally {
             metadataProvider.getLocks().unlock();
@@ -1285,6 +1285,14 @@
         }
     }
 
+    protected void validateIndexType(DatasetType datasetType, IndexType indexType, boolean isSecondaryPrimaryIndex,
+            SourceLocation sourceLoc) throws AlgebricksException {
+        // disable creating secondary primary index on an external dataset
+        if (datasetType == DatasetType.EXTERNAL && isSecondaryPrimaryIndex) {
+            throw new CompilationException(ErrorCode.CANNOT_CREATE_SEC_PRIMARY_IDX_ON_EXT_DATASET);
+        }
+    }
+
     protected void validateIndexKeyFields(CreateIndexStatement stmtCreateIndex, List<Integer> keySourceIndicators,
             ARecordType aRecordType, ARecordType metaRecordType, List<List<String>> indexFields,
             List<IAType> indexFieldTypes) throws AlgebricksException {