Merge commit '6904d36' from 'gerrit/mad-hatter'

Change-Id: Ib118bb2d1b18a93412015b1b652684c3fbb11e52
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index f2302f9..95e67be 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -587,16 +587,21 @@
         DataverseName dataverseName = getActiveDataverseName(dd.getDataverse());
         String datasetName = dd.getName().getValue();
         TypeExpression itemTypeExpr = dd.getItemType();
-        DataverseName itemTypeDataverseName = null;
-        String itemTypeName = null;
+        DataverseName itemTypeDataverseName;
+        String itemTypeName;
+        boolean itemTypeAnonymous;
         switch (itemTypeExpr.getTypeKind()) {
             case TYPEREFERENCE:
                 TypeReferenceExpression itemTypeRefExpr = (TypeReferenceExpression) itemTypeExpr;
                 Pair<DataverseName, Identifier> itemTypeIdent = itemTypeRefExpr.getIdent();
                 itemTypeDataverseName = itemTypeIdent.first != null ? itemTypeIdent.first : dataverseName;
                 itemTypeName = itemTypeRefExpr.getIdent().second.getValue();
+                itemTypeAnonymous = false;
                 break;
             case RECORD:
+                itemTypeDataverseName = dataverseName;
+                itemTypeName = DatasetUtil.createInlineTypeName(datasetName, false);
+                itemTypeAnonymous = true;
                 break;
             default:
                 throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_STATE, stmt.getSourceLocation(),
@@ -606,6 +611,7 @@
         TypeExpression metaItemTypeExpr = dd.getMetaItemType();
         DataverseName metaItemTypeDataverseName = null;
         String metaItemTypeName = null;
+        boolean metaItemTypeAnonymous;
         if (metaItemTypeExpr != null) {
             switch (metaItemTypeExpr.getTypeKind()) {
                 case TYPEREFERENCE:
@@ -614,13 +620,19 @@
                     metaItemTypeDataverseName =
                             metaItemTypeIdent.first != null ? metaItemTypeIdent.first : dataverseName;
                     metaItemTypeName = metaItemTypeRefExpr.getIdent().second.getValue();
+                    metaItemTypeAnonymous = false;
                     break;
                 case RECORD:
+                    metaItemTypeDataverseName = dataverseName;
+                    metaItemTypeName = DatasetUtil.createInlineTypeName(datasetName, true);
+                    metaItemTypeAnonymous = true;
                     break;
                 default:
                     throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_STATE, stmt.getSourceLocation(),
                             String.valueOf(metaItemTypeExpr.getTypeKind()));
             }
+        } else {
+            metaItemTypeAnonymous = true; // doesn't matter
         }
 
         Identifier ngNameId = dd.getNodegroupName();
@@ -629,8 +641,9 @@
         boolean defaultCompactionPolicy = compactionPolicy == null;
 
         lockUtil.createDatasetBegin(lockManager, metadataProvider.getLocks(), dataverseName, datasetName,
-                itemTypeDataverseName, itemTypeName, metaItemTypeDataverseName, metaItemTypeName, nodegroupName,
-                compactionPolicy, defaultCompactionPolicy, dd.getDatasetDetailsDecl());
+                itemTypeDataverseName, itemTypeName, itemTypeAnonymous, metaItemTypeDataverseName, metaItemTypeName,
+                metaItemTypeAnonymous, nodegroupName, compactionPolicy, defaultCompactionPolicy, dd.getDatasetType(),
+                dd.getDatasetDetailsDecl());
         try {
             doCreateDatasetStatement(metadataProvider, dd, dataverseName, datasetName, itemTypeDataverseName,
                     itemTypeExpr, itemTypeName, metaItemTypeExpr, metaItemTypeDataverseName, metaItemTypeName, hcc,
@@ -675,16 +688,12 @@
                     if (itemTypeEntity == null || itemTypeEntity.getIsAnonymous()) {
                         // anonymous types cannot be referred from CREATE DATASET
                         throw new AsterixException(ErrorCode.UNKNOWN_TYPE, sourceLoc,
-                                itemTypeDataverseName + "." + itemTypeName);
+                                DatasetUtil.getFullyQualifiedDisplayName(itemTypeDataverseName, itemTypeName));
                     }
                     itemType = itemTypeEntity.getDatatype();
                     validateDatasetItemType(dsType, itemType, false, sourceLoc);
                     break;
                 case RECORD:
-                    itemTypeDataverseName = dataverseName;
-                    itemTypeName = DatasetUtil.createInlineTypeName(datasetName, false);
-                    lockUtil.createTypeBegin(lockManager, metadataProvider.getLocks(), itemTypeDataverseName,
-                            itemTypeName);
                     itemType = translateType(itemTypeDataverseName, itemTypeName, itemTypeExpr, mdTxnCtx);
                     validateDatasetItemType(dsType, itemType, false, sourceLoc);
                     MetadataManager.INSTANCE.addDatatype(mdTxnCtx,
@@ -714,17 +723,13 @@
                                         metadataProvider.findTypeEntity(metaItemTypeDataverseName, metaItemTypeName);
                                 if (metaItemTypeEntity == null || metaItemTypeEntity.getIsAnonymous()) {
                                     // anonymous types cannot be referred from CREATE DATASET
-                                    throw new AsterixException(ErrorCode.UNKNOWN_TYPE, sourceLoc,
-                                            metaItemTypeDataverseName + "." + metaItemTypeName);
+                                    throw new AsterixException(ErrorCode.UNKNOWN_TYPE, sourceLoc, DatasetUtil
+                                            .getFullyQualifiedDisplayName(metaItemTypeDataverseName, metaItemTypeName));
                                 }
                                 metaItemType = metaItemTypeEntity.getDatatype();
                                 validateDatasetItemType(dsType, metaItemType, true, sourceLoc);
                                 break;
                             case RECORD:
-                                metaItemTypeDataverseName = dataverseName;
-                                metaItemTypeName = DatasetUtil.createInlineTypeName(datasetName, true);
-                                lockUtil.createTypeBegin(lockManager, metadataProvider.getLocks(),
-                                        metaItemTypeDataverseName, metaItemTypeName);
                                 metaItemType = translateType(metaItemTypeDataverseName, metaItemTypeName,
                                         metaItemTypeExpr, mdTxnCtx);
                                 validateDatasetItemType(dsType, metaItemType, true, sourceLoc);
@@ -764,7 +769,8 @@
                     break;
                 case EXTERNAL:
                     ExternalDetailsDecl externalDetails = (ExternalDetailsDecl) dd.getDatasetDetailsDecl();
-                    Map<String, String> properties = createExternalDatasetProperties(dd, metadataProvider, mdTxnCtx);
+                    Map<String, String> properties =
+                            createExternalDatasetProperties(dataverseName, dd, metadataProvider, mdTxnCtx);
                     ExternalDataUtils.normalize(properties);
                     ExternalDataUtils.validate(properties);
                     validateExternalDatasetDetails(externalDetails, properties);
@@ -864,8 +870,8 @@
         }
     }
 
-    protected Map<String, String> createExternalDatasetProperties(DatasetDecl dd, MetadataProvider metadataProvider,
-            MetadataTransactionContext mdTxnCtx) throws AlgebricksException {
+    protected Map<String, String> createExternalDatasetProperties(DataverseName dataverseName, DatasetDecl dd,
+            MetadataProvider metadataProvider, MetadataTransactionContext mdTxnCtx) throws AlgebricksException {
         ExternalDetailsDecl externalDetails = (ExternalDetailsDecl) dd.getDatasetDetailsDecl();
         return externalDetails.getProperties();
     }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/IMetadataLockUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/IMetadataLockUtil.java
index ea7941e..b788c1a 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/IMetadataLockUtil.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/IMetadataLockUtil.java
@@ -20,6 +20,7 @@
 package org.apache.asterix.common.metadata;
 
 import org.apache.asterix.common.api.IMetadataLockManager;
+import org.apache.asterix.common.config.DatasetConfig;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 
 public interface IMetadataLockUtil {
@@ -35,10 +36,10 @@
     // Dataset helpers
 
     void createDatasetBegin(IMetadataLockManager lockManager, LockList locks, DataverseName dataverseName,
-            String datasetName, DataverseName itemTypeDataverseName, String itemTypeName,
-            DataverseName metaItemTypeDataverseName, String metaItemTypeName, String nodeGroupName,
-            String compactionPolicyName, boolean isDefaultCompactionPolicy, Object datasetDetails)
-            throws AlgebricksException;
+            String datasetName, DataverseName itemTypeDataverseName, String itemTypeName, boolean itemTypeAnonymous,
+            DataverseName metaItemTypeDataverseName, String metaItemTypeName, boolean metaItemTypeAnonymous,
+            String nodeGroupName, String compactionPolicyName, boolean isDefaultCompactionPolicy,
+            DatasetConfig.DatasetType datasetType, Object datasetDetails) throws AlgebricksException;
 
     void dropDatasetBegin(IMetadataLockManager lockManager, LockList locks, DataverseName dataverseName,
             String datasetName) throws AlgebricksException;
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockUtil.java
index 1b88a06..eab69e0 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockUtil.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.metadata.utils;
 
 import org.apache.asterix.common.api.IMetadataLockManager;
+import org.apache.asterix.common.config.DatasetConfig;
 import org.apache.asterix.common.metadata.DataverseName;
 import org.apache.asterix.common.metadata.IMetadataLockUtil;
 import org.apache.asterix.common.metadata.LockList;
@@ -40,20 +41,21 @@
 
     @Override
     public void createDatasetBegin(IMetadataLockManager lockMgr, LockList locks, DataverseName dataverseName,
-            String datasetName, DataverseName itemTypeDataverseName, String itemTypeName,
-            DataverseName metaItemTypeDataverseName, String metaItemTypeName, String nodeGroupName,
-            String compactionPolicyName, boolean isDefaultCompactionPolicy, Object datasetDetails)
-            throws AlgebricksException {
-        createDatasetBeginPre(lockMgr, locks, dataverseName, itemTypeDataverseName, itemTypeName,
-                metaItemTypeDataverseName, metaItemTypeName, nodeGroupName, compactionPolicyName,
+            String datasetName, DataverseName itemTypeDataverseName, String itemTypeName, boolean itemTypeAnonymous,
+            DataverseName metaItemTypeDataverseName, String metaItemTypeName, boolean metaItemTypeAnonymous,
+            String nodeGroupName, String compactionPolicyName, boolean isDefaultCompactionPolicy,
+            DatasetConfig.DatasetType datasetType, Object datasetDetails) throws AlgebricksException {
+        createDatasetBeginPre(lockMgr, locks, dataverseName, itemTypeDataverseName, itemTypeName, itemTypeAnonymous,
+                metaItemTypeDataverseName, metaItemTypeName, metaItemTypeAnonymous, nodeGroupName, compactionPolicyName,
                 isDefaultCompactionPolicy);
         lockMgr.acquireDatasetWriteLock(locks, dataverseName, datasetName);
     }
 
     protected final void createDatasetBeginPre(IMetadataLockManager lockMgr, LockList locks,
             DataverseName dataverseName, DataverseName itemTypeDataverseName, String itemTypeName,
-            DataverseName metaItemTypeDataverseName, String metaItemTypeName, String nodeGroupName,
-            String compactionPolicyName, boolean isDefaultCompactionPolicy) throws AlgebricksException {
+            boolean itemTypeAnonymous, DataverseName metaItemTypeDataverseName, String metaItemTypeName,
+            boolean metaItemTypeAnonymous, String nodeGroupName, String compactionPolicyName,
+            boolean isDefaultCompactionPolicy) throws AlgebricksException {
         lockMgr.acquireDataverseReadLock(locks, dataverseName);
         if (itemTypeDataverseName != null && !dataverseName.equals(itemTypeDataverseName)) {
             lockMgr.acquireDataverseReadLock(locks, itemTypeDataverseName);
@@ -62,12 +64,20 @@
                 && !metaItemTypeDataverseName.equals(itemTypeDataverseName)) {
             lockMgr.acquireDataverseReadLock(locks, metaItemTypeDataverseName);
         }
-        if (itemTypeName != null) {
+        if (itemTypeAnonymous) {
+            // the datatype will be created
+            lockMgr.acquireDataTypeWriteLock(locks, itemTypeDataverseName, itemTypeName);
+        } else {
             lockMgr.acquireDataTypeReadLock(locks, itemTypeDataverseName, itemTypeName);
         }
         if (metaItemTypeDataverseName != null && !metaItemTypeDataverseName.equals(itemTypeDataverseName)
                 && !metaItemTypeName.equals(itemTypeName)) {
-            lockMgr.acquireDataTypeReadLock(locks, metaItemTypeDataverseName, metaItemTypeName);
+            if (metaItemTypeAnonymous) {
+                // the datatype will be created
+                lockMgr.acquireDataTypeWriteLock(locks, metaItemTypeDataverseName, metaItemTypeName);
+            } else {
+                lockMgr.acquireDataTypeReadLock(locks, metaItemTypeDataverseName, metaItemTypeName);
+            }
         }
         if (nodeGroupName != null) {
             lockMgr.acquireNodeGroupReadLock(locks, nodeGroupName);