[NO ISSUE][COMP] Refactor locking in CREATE DATASET

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Refactor locking in CREATE DATASET to make it extensible
  by products (introduce QueryTranslator.createDatasetBegin())
- Refactor MetadataLockUtil.createDatasetBegin() into two methods

Change-Id: I1b430a5e8f4a3cdf1a360397e3d160f7b8c95d26
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/6323
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 818fbd4..c2d50b3 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -562,7 +562,8 @@
         String datasetName = dd.getName().getValue();
         DatasetType dsType = dd.getDatasetType();
         TypeExpression itemTypeExpr = dd.getItemType();
-        String itemTypeDataverseName = null, itemTypeName = null, itemTypeFullyQualifiedName = null;
+        String itemTypeDataverseName, itemTypeName;
+        boolean itemTypeAnonymous;
         switch (itemTypeExpr.getTypeKind()) {
             case TYPEREFERENCE:
                 TypeReferenceExpression itemTypeRefExpr = (TypeReferenceExpression) itemTypeExpr;
@@ -570,17 +571,22 @@
                 itemTypeDataverseName = itemTypeDataverseIdent != null && itemTypeDataverseIdent.getValue() != null
                         ? itemTypeDataverseIdent.getValue() : dataverseName;
                 itemTypeName = itemTypeRefExpr.getIdent().second.getValue();
-                itemTypeFullyQualifiedName = itemTypeDataverseName + '.' + itemTypeName;
+                itemTypeAnonymous = false;
                 break;
             case RECORD:
+                itemTypeDataverseName = dataverseName;
+                itemTypeName = DatasetUtil.createInlineTypeName(datasetName, false);
+                itemTypeAnonymous = true;
                 break;
             default:
                 throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_STATE, sourceLoc,
                         String.valueOf(itemTypeExpr.getTypeKind()));
         }
+        String itemTypeFullyQualifiedName = itemTypeDataverseName + '.' + itemTypeName;
 
         TypeExpression metaItemTypeExpr = dd.getMetaItemType();
-        String metaItemTypeDataverseName = null, metaItemTypeName = null, metaItemTypeFullyQualifiedName = null;
+        String metaItemTypeDataverseName = null, metaItemTypeName = null, metaItemTypeFullyQualifiedName;
+        boolean metaItemTypeAnonymous;
         if (metaItemTypeExpr != null) {
             switch (metaItemTypeExpr.getTypeKind()) {
                 case TYPEREFERENCE:
@@ -590,14 +596,21 @@
                             metaItemTypeDataverseIdent != null && metaItemTypeDataverseIdent.getValue() != null
                                     ? metaItemTypeDataverseIdent.getValue() : dataverseName;
                     metaItemTypeName = metaItemTypeRefExpr.getIdent().second.getValue();
-                    metaItemTypeFullyQualifiedName = metaItemTypeDataverseName + '.' + metaItemTypeName;
+                    metaItemTypeAnonymous = false;
                     break;
                 case RECORD:
+                    metaItemTypeDataverseName = dataverseName;
+                    metaItemTypeName = DatasetUtil.createInlineTypeName(datasetName, true);
+                    metaItemTypeAnonymous = true;
                     break;
                 default:
                     throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_STATE, sourceLoc,
                             String.valueOf(metaItemTypeExpr.getTypeKind()));
             }
+            metaItemTypeFullyQualifiedName = metaItemTypeDataverseName + '.' + metaItemTypeName;
+        } else {
+            metaItemTypeFullyQualifiedName = null;
+            metaItemTypeAnonymous = true; // doesn't matter
         }
 
         Identifier ngNameId = dd.getNodegroupName();
@@ -610,10 +623,10 @@
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         boolean bActiveTxn = true;
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        MetadataLockUtil.createDatasetBegin(lockManager, metadataProvider.getLocks(), dataverseName,
-                itemTypeDataverseName, itemTypeFullyQualifiedName, metaItemTypeDataverseName,
-                metaItemTypeFullyQualifiedName, nodegroupName, compactionPolicy, dataverseName + "." + datasetName,
-                defaultCompactionPolicy);
+        createDatasetBegin(metadataProvider, dataverseName, dataverseName + "." + datasetName, itemTypeDataverseName,
+                itemTypeAnonymous, itemTypeFullyQualifiedName, metaItemTypeDataverseName,
+                metaItemTypeFullyQualifiedName, metaItemTypeAnonymous, nodegroupName, compactionPolicy,
+                defaultCompactionPolicy, dd);
         Dataset dataset = null;
         try {
             IDatasetDetails datasetDetails = null;
@@ -632,17 +645,12 @@
                     Datatype itemTypeEntity = metadataProvider.findTypeEntity(itemTypeDataverseName, itemTypeName);
                     if (itemTypeEntity == null || itemTypeEntity.getIsAnonymous()) {
                         // anonymous types cannot be referred from CREATE DATASET
-                        throw new AsterixException(ErrorCode.UNKNOWN_TYPE, sourceLoc,
-                                itemTypeDataverseName + "." + itemTypeName);
+                        throw new AsterixException(ErrorCode.UNKNOWN_TYPE, sourceLoc, itemTypeFullyQualifiedName);
                     }
                     itemType = itemTypeEntity.getDatatype();
                     validateDatasetItemType(dsType, itemType, false, sourceLoc);
                     break;
                 case RECORD:
-                    itemTypeDataverseName = dataverseName;
-                    itemTypeName = DatasetUtil.createInlineTypeName(datasetName, false);
-                    MetadataLockUtil.createTypeBegin(lockManager, metadataProvider.getLocks(), itemTypeDataverseName,
-                            itemTypeDataverseName + "." + itemTypeName);
                     itemType = translateType(itemTypeDataverseName, itemTypeName, itemTypeExpr, mdTxnCtx);
                     validateDatasetItemType(dsType, itemType, false, sourceLoc);
                     MetadataManager.INSTANCE.addDatatype(mdTxnCtx,
@@ -673,16 +681,12 @@
                                 if (metaItemTypeEntity == null || metaItemTypeEntity.getIsAnonymous()) {
                                     // anonymous types cannot be referred from CREATE DATASET
                                     throw new AsterixException(ErrorCode.UNKNOWN_TYPE, sourceLoc,
-                                            metaItemTypeDataverseName + "." + metaItemTypeName);
+                                            metaItemTypeFullyQualifiedName);
                                 }
                                 metaItemType = metaItemTypeEntity.getDatatype();
                                 validateDatasetItemType(dsType, metaItemType, true, sourceLoc);
                                 break;
                             case RECORD:
-                                metaItemTypeDataverseName = dataverseName;
-                                metaItemTypeName = DatasetUtil.createInlineTypeName(datasetName, true);
-                                MetadataLockUtil.createTypeBegin(lockManager, metadataProvider.getLocks(),
-                                        metaItemTypeDataverseName, metaItemTypeDataverseName + "." + metaItemTypeName);
                                 metaItemType = translateType(metaItemTypeDataverseName, metaItemTypeName,
                                         metaItemTypeExpr, mdTxnCtx);
                                 validateDatasetItemType(dsType, metaItemType, true, sourceLoc);
@@ -722,7 +726,8 @@
                     break;
                 case EXTERNAL:
                     ExternalDetailsDecl externalDetails = (ExternalDetailsDecl) dd.getDatasetDetailsDecl();
-                    Map<String, String> properties = createExternalDatasetProperties(dd, metadataProvider, mdTxnCtx);
+                    Map<String, String> properties =
+                            createExternalDatasetProperties(dataverseName, dd, metadataProvider, mdTxnCtx);
                     ExternalDataUtils.normalize(properties);
                     ExternalDataUtils.validate(properties);
                     validateExternalDatasetDetails(externalDetails, properties);
@@ -816,6 +821,17 @@
         }
     }
 
+    protected void createDatasetBegin(MetadataProvider metadataProvider, String dataverseName,
+            String datasetFullyQualifiedName, String itemTypeDataverseName, boolean itemTypeAnonymous,
+            String itemTypeFullyQualifiedName, String metaItemTypeDataverseName, String metaItemTypeFullyQualifiedName,
+            boolean metaItemTypeAnonymous, String nodegroupName, String compactionPolicy,
+            boolean defaultCompactionPolicy, DatasetDecl dd) throws AlgebricksException {
+        MetadataLockUtil.createDatasetBegin(lockManager, metadataProvider.getLocks(), dataverseName,
+                itemTypeDataverseName, itemTypeFullyQualifiedName, itemTypeAnonymous, metaItemTypeDataverseName,
+                metaItemTypeFullyQualifiedName, metaItemTypeAnonymous, nodegroupName, compactionPolicy,
+                datasetFullyQualifiedName, defaultCompactionPolicy);
+    }
+
     protected void validateDatasetItemType(DatasetType datasetType, IAType itemType, boolean isMetaItemType,
             SourceLocation sourceLoc) throws AlgebricksException {
         if (itemType.getTypeTag() != ATypeTag.OBJECT) {
@@ -824,8 +840,8 @@
         }
     }
 
-    protected Map<String, String> createExternalDatasetProperties(DatasetDecl dd, MetadataProvider metadataProvider,
-            MetadataTransactionContext mdTxnCtx) throws AlgebricksException {
+    protected Map<String, String> createExternalDatasetProperties(String dataverseName, DatasetDecl dd,
+            MetadataProvider metadataProvider, MetadataTransactionContext mdTxnCtx) throws AlgebricksException {
         ExternalDetailsDecl externalDetails = (ExternalDetailsDecl) dd.getDatasetDetailsDecl();
         return externalDetails.getProperties();
     }
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockUtil.java
index baac712..037a560 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockUtil.java
@@ -27,10 +27,11 @@
     private MetadataLockUtil() {
     }
 
-    public static void createDatasetBegin(IMetadataLockManager lockMgr, LockList locks, String dataverseName,
-            String itemTypeDataverseName, String itemTypeFullyQualifiedName, String metaItemTypeDataverseName,
-            String metaItemTypeFullyQualifiedName, String nodeGroupName, String compactionPolicyName,
-            String datasetFullyQualifiedName, boolean isDefaultCompactionPolicy) throws AlgebricksException {
+    public static void createDatasetBeginPre(IMetadataLockManager lockMgr, LockList locks, String dataverseName,
+            String itemTypeDataverseName, String itemTypeFullyQualifiedName, boolean itemTypeAnonymous,
+            String metaItemTypeDataverseName, String metaItemTypeFullyQualifiedName, boolean metaItemTypeAnonymous,
+            String nodeGroupName, String compactionPolicyName, boolean isDefaultCompactionPolicy)
+            throws AlgebricksException {
         lockMgr.acquireDataverseReadLock(locks, dataverseName);
         if (!dataverseName.equals(itemTypeDataverseName)) {
             lockMgr.acquireDataverseReadLock(locks, itemTypeDataverseName);
@@ -39,10 +40,20 @@
                 && !metaItemTypeDataverseName.equals(itemTypeDataverseName)) {
             lockMgr.acquireDataverseReadLock(locks, metaItemTypeDataverseName);
         }
-        lockMgr.acquireDataTypeReadLock(locks, itemTypeFullyQualifiedName);
+        if (itemTypeAnonymous) {
+            // the datatype will be created
+            lockMgr.acquireDataTypeWriteLock(locks, itemTypeFullyQualifiedName);
+        } else {
+            lockMgr.acquireDataTypeReadLock(locks, itemTypeFullyQualifiedName);
+        }
         if (metaItemTypeFullyQualifiedName != null
                 && !metaItemTypeFullyQualifiedName.equals(itemTypeFullyQualifiedName)) {
-            lockMgr.acquireDataTypeReadLock(locks, metaItemTypeFullyQualifiedName);
+            if (metaItemTypeAnonymous) {
+                // the datatype will be created
+                lockMgr.acquireDataTypeWriteLock(locks, metaItemTypeFullyQualifiedName);
+            } else {
+                lockMgr.acquireDataTypeReadLock(locks, metaItemTypeFullyQualifiedName);
+            }
         }
         if (nodeGroupName != null) {
             lockMgr.acquireNodeGroupReadLock(locks, nodeGroupName);
@@ -50,6 +61,16 @@
         if (!isDefaultCompactionPolicy) {
             lockMgr.acquireMergePolicyReadLock(locks, compactionPolicyName);
         }
+    }
+
+    public static void createDatasetBegin(IMetadataLockManager lockMgr, LockList locks, String dataverseName,
+            String itemTypeDataverseName, String itemTypeFullyQualifiedName, boolean itemTypeAnonymous,
+            String metaItemTypeDataverseName, String metaItemTypeFullyQualifiedName, boolean metaItemTypeAnonymous,
+            String nodeGroupName, String compactionPolicyName, String datasetFullyQualifiedName,
+            boolean isDefaultCompactionPolicy) throws AlgebricksException {
+        createDatasetBeginPre(lockMgr, locks, dataverseName, itemTypeDataverseName, itemTypeFullyQualifiedName,
+                itemTypeAnonymous, metaItemTypeDataverseName, metaItemTypeFullyQualifiedName, metaItemTypeAnonymous,
+                nodeGroupName, compactionPolicyName, isDefaultCompactionPolicy);
         lockMgr.acquireDatasetWriteLock(locks, datasetFullyQualifiedName);
     }