[NO ISSUE][COMP] Extensible item type check in CREATE DATASET

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Introduce QueryTranslator.validateDatasetItemType().
  By overriding this method product extensions can further
  restrict types that are allowed for dataset creation.

Change-Id: I426430707766b7b6049abd2687bee69c011335fb
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/5785
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Hussain Towaileb <hussainht@gmail.com>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index e211531..9d427ec 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -636,6 +636,7 @@
                                 itemTypeDataverseName + "." + itemTypeName);
                     }
                     itemType = itemTypeEntity.getDatatype();
+                    validateDatasetItemType(dsType, itemType, false, sourceLoc);
                     break;
                 case RECORD:
                     itemTypeDataverseName = dataverseName;
@@ -643,6 +644,7 @@
                     MetadataLockUtil.createTypeBegin(lockManager, metadataProvider.getLocks(), itemTypeDataverseName,
                             itemTypeDataverseName + "." + itemTypeName);
                     itemType = translateType(itemTypeDataverseName, itemTypeName, itemTypeExpr, mdTxnCtx);
+                    validateDatasetItemType(dsType, itemType, false, sourceLoc);
                     MetadataManager.INSTANCE.addDatatype(mdTxnCtx,
                             new Datatype(itemTypeDataverseName, itemTypeName, itemType, true));
                     break;
@@ -660,13 +662,8 @@
             } else {
                 validateCompactionPolicy(compactionPolicy, compactionPolicyProperties, mdTxnCtx, false, sourceLoc);
             }
-            switch (dd.getDatasetType()) {
+            switch (dsType) {
                 case INTERNAL:
-                    if (itemType.getTypeTag() != ATypeTag.OBJECT) {
-                        throw new CompilationException(ErrorCode.COMPILATION_ERROR, sourceLoc,
-                                "Dataset type has to be a record type.");
-                    }
-
                     IAType metaItemType = null;
                     if (metaItemTypeExpr != null) {
                         switch (metaItemTypeExpr.getTypeKind()) {
@@ -679,10 +676,7 @@
                                             metaItemTypeDataverseName + "." + metaItemTypeName);
                                 }
                                 metaItemType = metaItemTypeEntity.getDatatype();
-                                if (metaItemType.getTypeTag() != ATypeTag.OBJECT) {
-                                    throw new CompilationException(ErrorCode.COMPILATION_ERROR, sourceLoc,
-                                            "Dataset meta type has to be a record type.");
-                                }
+                                validateDatasetItemType(dsType, metaItemType, true, sourceLoc);
                                 break;
                             case RECORD:
                                 metaItemTypeDataverseName = dataverseName;
@@ -691,6 +685,7 @@
                                         metaItemTypeDataverseName, metaItemTypeDataverseName + "." + metaItemTypeName);
                                 metaItemType = translateType(metaItemTypeDataverseName, metaItemTypeName,
                                         metaItemTypeExpr, mdTxnCtx);
+                                validateDatasetItemType(dsType, metaItemType, true, sourceLoc);
                                 MetadataManager.INSTANCE.addDatatype(mdTxnCtx,
                                         new Datatype(metaItemTypeDataverseName, metaItemTypeName, metaItemType, true));
                                 break;
@@ -734,7 +729,7 @@
                     break;
                 default:
                     throw new CompilationException(ErrorCode.COMPILATION_ERROR, sourceLoc,
-                            "Unknown dataset type " + dd.getDatasetType());
+                            "Unknown dataset type " + dsType);
             }
 
             // #. initialize DatasetIdFactory if it is not initialized.
@@ -748,7 +743,7 @@
                     datasetDetails, dd.getHints(), dsType, DatasetIdFactory.generateDatasetId(),
                     MetadataUtil.PENDING_ADD_OP, compressionScheme);
             MetadataManager.INSTANCE.addDataset(metadataProvider.getMetadataTxnContext(), dataset);
-            if (dd.getDatasetType() == DatasetType.INTERNAL) {
+            if (dsType == DatasetType.INTERNAL) {
                 JobSpecification jobSpec = DatasetUtil.createDatasetJobSpec(dataset, metadataProvider);
 
                 // #. make metadataTxn commit before calling runJob.
@@ -819,6 +814,14 @@
         }
     }
 
+    protected void validateDatasetItemType(DatasetType datasetType, IAType itemType, boolean isMetaItemType,
+            SourceLocation sourceLoc) throws AlgebricksException {
+        if (itemType.getTypeTag() != ATypeTag.OBJECT) {
+            throw new CompilationException(ErrorCode.COMPILATION_ERROR, sourceLoc,
+                    String.format("Dataset %s has to be a record type.", isMetaItemType ? "meta type" : "type"));
+        }
+    }
+
     protected Map<String, String> createExternalDatasetProperties(DatasetDecl dd, MetadataProvider metadataProvider,
             MetadataTransactionContext mdTxnCtx) throws AlgebricksException {
         ExternalDetailsDecl externalDetails = (ExternalDetailsDecl) dd.getDatasetDetailsDecl();