[ASTERIXDB-3259][MTD] Ensure no DDLs/DMLs on System database

- user model changes: no
- storage format changes: no
- interface changes: no

Change-Id: I330aabe9b91ef4aa65f5c353b94eaa9564adbd0a
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17883
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/AbstractLangTranslator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/AbstractLangTranslator.java
index e7e1514..6536c86 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/AbstractLangTranslator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/AbstractLangTranslator.java
@@ -87,12 +87,20 @@
 
     protected static final String BAD_DATAVERSE_DML_MESSAGE = "%s operation is not permitted in " + dataverse() + " %s";
 
+    protected static final String BAD_DATAVERSE_DML_MSG_DB =
+            "%s operation is not permitted in " + dataverse() + " %s in database %s";
+
     protected static final String BAD_DATAVERSE_DDL_MESSAGE = "Cannot %s " + dataverse() + ": %s";
 
+    protected static final String BAD_DATAVERSE_DDL_MSG_DB = "Cannot %s " + dataverse() + ": %s in database: %s";
+
     protected static final String BAD_DATAVERSE_OBJECT_DDL_MESSAGE =
             "Cannot %s a %s belonging to the " + dataverse() + ": %s";
 
-    public void validateOperation(ICcApplicationContext appCtx, Namespace namespace, Statement stmt)
+    protected static final String BAD_DATAVERSE_OBJECT_DDL_MSG_DB =
+            "Cannot %s a %s belonging to the " + dataverse() + ": %s in database: %s";
+
+    public void validateOperation(ICcApplicationContext appCtx, Namespace activeNamespace, Statement stmt)
             throws AlgebricksException {
 
         final IClusterStateManager clusterStateManager = appCtx.getClusterStateManager();
@@ -149,51 +157,41 @@
             }
         }
 
+        boolean usingDb = appCtx.getNamespaceResolver().isUsingDatabase();
         boolean invalidOperation = false;
         String message = null;
-        DataverseName dataverseName = namespace != null ? namespace.getDataverseName() : null;
+        DataverseName dataverseName;
+        Namespace namespace;
         switch (stmt.getKind()) {
             case LOAD:
-                LoadStatement loadStmt = (LoadStatement) stmt;
-                if (loadStmt.getDataverseName() != null) {
-                    dataverseName = loadStmt.getDataverseName();
-                }
-                invalidOperation = isMetadataDataverse(dataverseName);
+                namespace = getStatementNamespace(((LoadStatement) stmt).getNamespace(), activeNamespace);
+                invalidOperation = isSystemNamespace(namespace);
                 if (invalidOperation) {
-                    message = String.format(BAD_DATAVERSE_DML_MESSAGE, "Load", dataverseName);
+                    message = formatDmlMessage("Load", namespace, usingDb);
                 }
                 break;
 
             case INSERT:
-                InsertStatement insertStmt = (InsertStatement) stmt;
-                if (insertStmt.getDataverseName() != null) {
-                    dataverseName = insertStmt.getDataverseName();
-                }
-                invalidOperation = isMetadataDataverse(dataverseName);
+                namespace = getStatementNamespace(((InsertStatement) stmt).getNamespace(), activeNamespace);
+                invalidOperation = isSystemNamespace(namespace);
                 if (invalidOperation) {
-                    message = String.format(BAD_DATAVERSE_DML_MESSAGE, "Insert", dataverseName);
+                    message = formatDmlMessage("Insert", namespace, usingDb);
                 }
                 break;
 
             case UPSERT:
-                UpsertStatement upsertStmt = (UpsertStatement) stmt;
-                if (upsertStmt.getDataverseName() != null) {
-                    dataverseName = upsertStmt.getDataverseName();
-                }
-                invalidOperation = isMetadataDataverse(dataverseName);
+                namespace = getStatementNamespace(((UpsertStatement) stmt).getNamespace(), activeNamespace);
+                invalidOperation = isSystemNamespace(namespace);
                 if (invalidOperation) {
-                    message = String.format(BAD_DATAVERSE_DML_MESSAGE, "Upsert", dataverseName);
+                    message = formatDmlMessage("Upsert", namespace, usingDb);
                 }
                 break;
 
             case DELETE:
-                DeleteStatement deleteStmt = (DeleteStatement) stmt;
-                if (deleteStmt.getDataverseName() != null) {
-                    dataverseName = deleteStmt.getDataverseName();
-                }
-                invalidOperation = isMetadataDataverse(dataverseName);
+                namespace = getStatementNamespace(((DeleteStatement) stmt).getNamespace(), activeNamespace);
+                invalidOperation = isSystemNamespace(namespace);
                 if (invalidOperation) {
-                    message = String.format(BAD_DATAVERSE_DML_MESSAGE, "Delete", dataverseName);
+                    message = formatDmlMessage("Delete", namespace, usingDb);
                 }
                 break;
 
@@ -218,36 +216,32 @@
             }
 
             case CREATE_DATAVERSE:
-                //TODO(DB): check it's not System database for all cases
                 CreateDataverseStatement dvCreateStmt = (CreateDataverseStatement) stmt;
                 dataverseName = dvCreateStmt.getDataverseName();
                 invalidOperation = FunctionConstants.ASTERIX_DV.equals(dataverseName)
                         || FunctionConstants.ALGEBRICKS_DV.equals(dataverseName) || isMetadataDataverse(dataverseName)
-                        || isDefaultDataverse(dataverseName);
+                        || isDefaultDataverse(dataverseName) || isSystemDatabase(dvCreateStmt.getDatabaseName());
                 if (invalidOperation) {
-                    message = String.format(BAD_DATAVERSE_DDL_MESSAGE, "create", dataverseName);
+                    message = formatDdlMessage("create", dataverseName, dvCreateStmt.getDatabaseName(), usingDb);
                 }
                 break;
 
             case DATAVERSE_DROP:
-                //TODO(DB): check it's not System database for all cases
                 DataverseDropStatement dvDropStmt = (DataverseDropStatement) stmt;
                 dataverseName = dvDropStmt.getDataverseName();
-                invalidOperation = isMetadataDataverse(dataverseName) || isDefaultDataverse(dataverseName);
+                invalidOperation = isMetadataDataverse(dataverseName) || isDefaultDataverse(dataverseName)
+                        || isSystemDatabase(dvDropStmt.getDatabaseName());
                 if (invalidOperation) {
-                    message = String.format(BAD_DATAVERSE_DDL_MESSAGE, "drop", dataverseName);
+                    message = formatDdlMessage("drop", dataverseName, dvDropStmt.getDatabaseName(), usingDb);
                 }
                 break;
 
             case DATASET_DECL:
-                //TODO(DB): check it's not System database for all cases
                 DatasetDecl dsCreateStmt = (DatasetDecl) stmt;
-                if (dsCreateStmt.getDataverse() != null) {
-                    dataverseName = dsCreateStmt.getDataverse();
-                }
-                invalidOperation = isMetadataDataverse(dataverseName);
+                namespace = getStatementNamespace(((DatasetDecl) stmt).getNamespace(), activeNamespace);
+                invalidOperation = isSystemNamespace(namespace);
                 if (invalidOperation) {
-                    message = String.format(BAD_DATAVERSE_OBJECT_DDL_MESSAGE, "create", dataset(), dataverseName);
+                    message = formatObjectDdlMessage("create", dataset(), namespace, usingDb);
                 }
 
                 if (!invalidOperation) {
@@ -273,75 +267,57 @@
                 break;
 
             case DATASET_DROP:
-                //TODO(DB): check it's not System database for all cases
-                DropDatasetStatement dsDropStmt = (DropDatasetStatement) stmt;
-                if (dsDropStmt.getDataverseName() != null) {
-                    dataverseName = dsDropStmt.getDataverseName();
-                }
-                invalidOperation = isMetadataDataverse(dataverseName);
+                namespace = getStatementNamespace(((DropDatasetStatement) stmt).getNamespace(), activeNamespace);
+                invalidOperation = isSystemNamespace(namespace);
                 if (invalidOperation) {
-                    message = String.format(BAD_DATAVERSE_OBJECT_DDL_MESSAGE, "drop", dataset(), dataverseName);
+                    message = formatObjectDdlMessage("drop", dataset(), namespace, usingDb);
                 }
                 break;
 
             case INDEX_DROP:
-                //TODO(DB): check it's not System database for all cases
-                IndexDropStatement idxDropStmt = (IndexDropStatement) stmt;
-                if (idxDropStmt.getDataverseName() != null) {
-                    dataverseName = idxDropStmt.getDataverseName();
-                }
-                invalidOperation = isMetadataDataverse(dataverseName);
+                namespace = getStatementNamespace(((IndexDropStatement) stmt).getNamespace(), activeNamespace);
+                invalidOperation = isSystemNamespace(namespace);
                 if (invalidOperation) {
-                    message = String.format(BAD_DATAVERSE_OBJECT_DDL_MESSAGE, "drop", "index", dataverseName);
+                    message = formatObjectDdlMessage("drop", "index", namespace, usingDb);
                 }
                 break;
 
             case TYPE_DECL:
-                //TODO(DB): check it's not System database for all cases
-                TypeDecl typeCreateStmt = (TypeDecl) stmt;
-                if (typeCreateStmt.getDataverseName() != null) {
-                    dataverseName = typeCreateStmt.getDataverseName();
-                }
-                invalidOperation = isMetadataDataverse(dataverseName);
+                namespace = getStatementNamespace(((TypeDecl) stmt).getNamespace(), activeNamespace);
+                invalidOperation = isSystemNamespace(namespace);
                 if (invalidOperation) {
-                    message = String.format(BAD_DATAVERSE_OBJECT_DDL_MESSAGE, "create", "type", dataverseName);
+                    message = formatObjectDdlMessage("create", "type", namespace, usingDb);
                 }
                 break;
 
             case TYPE_DROP:
-                //TODO(DB): check it's not System database for all cases
-                TypeDropStatement typeDropStmt = (TypeDropStatement) stmt;
-                if (typeDropStmt.getDataverseName() != null) {
-                    dataverseName = typeDropStmt.getDataverseName();
-                }
-                invalidOperation = isMetadataDataverse(dataverseName);
+                namespace = getStatementNamespace(((TypeDropStatement) stmt).getNamespace(), activeNamespace);
+                invalidOperation = isSystemNamespace(namespace);
                 if (invalidOperation) {
-                    message = String.format(BAD_DATAVERSE_OBJECT_DDL_MESSAGE, "drop", "type", dataverseName);
+                    message = formatObjectDdlMessage("drop", "type", namespace, usingDb);
                 }
                 break;
 
             case CREATE_SYNONYM:
-                //TODO(DB): check it's not System database for all cases
-                CreateSynonymStatement synCreateStmt = (CreateSynonymStatement) stmt;
-                if (synCreateStmt.getDataverseName() != null) {
-                    dataverseName = synCreateStmt.getDataverseName();
-                }
-                invalidOperation = isMetadataDataverse(dataverseName);
+                namespace = getStatementNamespace(((CreateSynonymStatement) stmt).getNamespace(), activeNamespace);
+                invalidOperation = isSystemNamespace(namespace);
                 if (invalidOperation) {
-                    message = String.format(BAD_DATAVERSE_OBJECT_DDL_MESSAGE, "create", "synonym", dataverseName);
+                    message = formatObjectDdlMessage("create", "synonym", namespace, usingDb);
                 }
                 break;
 
             case FUNCTION_DECL:
-                //TODO(DB): check it's not System database for all cases
+                //TODO(DB): change to use namespace like others
                 FunctionDecl fnDeclStmt = (FunctionDecl) stmt;
                 FunctionSignature fnDeclSignature = fnDeclStmt.getSignature();
                 if (fnDeclSignature.getDataverseName() != null) {
-                    dataverseName = fnDeclSignature.getDataverseName();
+                    namespace = new Namespace(fnDeclSignature.getDatabaseName(), fnDeclSignature.getDataverseName());
+                } else {
+                    namespace = activeNamespace;
                 }
-                invalidOperation = isMetadataDataverse(dataverseName);
+                invalidOperation = isSystemNamespace(namespace);
                 if (invalidOperation) {
-                    message = String.format(BAD_DATAVERSE_OBJECT_DDL_MESSAGE, "declare", "function", dataverseName);
+                    message = formatObjectDdlMessage("declare", "function", namespace, usingDb);
                 }
                 break;
 
@@ -350,91 +326,68 @@
                 CreateFunctionStatement fnCreateStmt = (CreateFunctionStatement) stmt;
                 FunctionSignature fnCreateSignature = fnCreateStmt.getFunctionSignature();
                 if (fnCreateSignature.getDataverseName() != null) {
-                    dataverseName = fnCreateSignature.getDataverseName();
+                    namespace =
+                            new Namespace(fnCreateSignature.getDatabaseName(), fnCreateSignature.getDataverseName());
+                } else {
+                    namespace = activeNamespace;
                 }
-                invalidOperation = isMetadataDataverse(dataverseName);
+                invalidOperation = isSystemNamespace(namespace);
                 if (invalidOperation) {
-                    message = String.format(BAD_DATAVERSE_OBJECT_DDL_MESSAGE, "create", "function", dataverseName);
+                    message = formatObjectDdlMessage("create", "function", namespace, usingDb);
                 }
                 break;
 
             case CREATE_LIBRARY:
-                //TODO(DB): check it's not System database for all cases
-                CreateLibraryStatement libCreateStmt = (CreateLibraryStatement) stmt;
-                if (libCreateStmt.getDataverseName() != null) {
-                    dataverseName = libCreateStmt.getDataverseName();
-                }
-                invalidOperation = isMetadataDataverse(dataverseName);
+                namespace = getStatementNamespace(((CreateLibraryStatement) stmt).getNamespace(), activeNamespace);
+                invalidOperation = isSystemNamespace(namespace);
                 if (invalidOperation) {
-                    message = String.format(BAD_DATAVERSE_OBJECT_DDL_MESSAGE, "create", "library", dataverseName);
+                    message = formatObjectDdlMessage("create", "library", namespace, usingDb);
                 }
                 break;
 
             case CREATE_ADAPTER:
-                //TODO(DB): check it's not System database for all cases
-                CreateAdapterStatement adCreateStmt = (CreateAdapterStatement) stmt;
-                if (adCreateStmt.getDataverseName() != null) {
-                    dataverseName = adCreateStmt.getDataverseName();
-                }
-                invalidOperation = isMetadataDataverse(dataverseName);
+                namespace = getStatementNamespace(((CreateAdapterStatement) stmt).getNamespace(), activeNamespace);
+                invalidOperation = isSystemNamespace(namespace);
                 if (invalidOperation) {
-                    message = String.format(BAD_DATAVERSE_OBJECT_DDL_MESSAGE, "create", "adapter", dataverseName);
+                    message = formatObjectDdlMessage("create", "adapter", namespace, usingDb);
                 }
                 break;
 
             case CREATE_VIEW:
-                //TODO(DB): check it's not System database for all cases
-                CreateViewStatement viewCreateStmt = (CreateViewStatement) stmt;
-                if (viewCreateStmt.getDataverseName() != null) {
-                    dataverseName = viewCreateStmt.getDataverseName();
-                }
-                invalidOperation = isMetadataDataverse(dataverseName);
+                namespace = getStatementNamespace(((CreateViewStatement) stmt).getNamespace(), activeNamespace);
+                invalidOperation = isSystemNamespace(namespace);
                 if (invalidOperation) {
-                    message = String.format(BAD_DATAVERSE_OBJECT_DDL_MESSAGE, "create", "view", dataverseName);
+                    message = formatObjectDdlMessage("create", "view", namespace, usingDb);
                 }
                 break;
 
             case CREATE_FEED:
-                //TODO(DB): check it's not System database for all cases
-                CreateFeedStatement feedCreateStmt = (CreateFeedStatement) stmt;
-                if (feedCreateStmt.getDataverseName() != null) {
-                    dataverseName = feedCreateStmt.getDataverseName();
-                }
-                invalidOperation = isMetadataDataverse(dataverseName);
+                namespace = getStatementNamespace(((CreateFeedStatement) stmt).getNamespace(), activeNamespace);
+                invalidOperation = isSystemNamespace(namespace);
                 if (invalidOperation) {
-                    message = String.format(BAD_DATAVERSE_OBJECT_DDL_MESSAGE, "create", "feed", dataverseName);
+                    message = formatObjectDdlMessage("create", "feed", namespace, usingDb);
                 }
                 break;
 
             case CREATE_FEED_POLICY:
-                //TODO(DB): check it's not System database for all cases
-                invalidOperation = isMetadataDataverse(dataverseName);
+                invalidOperation = isSystemNamespace(activeNamespace);
                 if (invalidOperation) {
-                    message = String.format(BAD_DATAVERSE_OBJECT_DDL_MESSAGE, "create", "ingestion policy",
-                            dataverseName);
+                    message = formatObjectDdlMessage("create", "ingestion policy", activeNamespace, usingDb);
                 }
                 break;
 
             case ANALYZE:
-                //TODO(DB): check it's not System database for all cases
-                AnalyzeStatement analyzeStmt = (AnalyzeStatement) stmt;
-                if (analyzeStmt.getDataverseName() != null) {
-                    dataverseName = analyzeStmt.getDataverseName();
-                }
-                invalidOperation = isMetadataDataverse(dataverseName);
+                namespace = getStatementNamespace(((AnalyzeStatement) stmt).getNamespace(), activeNamespace);
+                invalidOperation = isSystemNamespace(namespace);
                 if (invalidOperation) {
-                    message = String.format(BAD_DATAVERSE_OBJECT_DDL_MESSAGE, "analyze", dataset(), dataverseName);
+                    message = formatObjectDdlMessage("analyze", dataset(), namespace, usingDb);
                 }
                 break;
             case ANALYZE_DROP:
-                //TODO(DB): check it's not System database for all cases
-                AnalyzeDropStatement analyzeDropStmt = (AnalyzeDropStatement) stmt;
-                if (analyzeDropStmt.getDataverseName() != null) {
-                    dataverseName = analyzeDropStmt.getDataverseName();
-                }
-                invalidOperation = isMetadataDataverse(dataverseName);
+                namespace = getStatementNamespace(((AnalyzeDropStatement) stmt).getNamespace(), activeNamespace);
+                invalidOperation = isSystemNamespace(namespace);
                 if (invalidOperation) {
-                    message = String.format(BAD_DATAVERSE_OBJECT_DDL_MESSAGE, "analyze drop", dataset(), dataverseName);
+                    message = formatObjectDdlMessage("analyze drop", dataset(), namespace, usingDb);
                 }
                 break;
         }
@@ -445,6 +398,31 @@
         }
     }
 
+    private static String formatDmlMessage(String operation, Namespace ns, boolean usingDb) {
+        DataverseName dv = ns.getDataverseName();
+        return usingDb ? String.format(BAD_DATAVERSE_DML_MSG_DB, operation, dv, ns.getDatabaseName())
+                : String.format(BAD_DATAVERSE_DML_MESSAGE, operation, dv);
+    }
+
+    private static String formatDdlMessage(String operation, DataverseName dv, String db, boolean usingDb) {
+        return usingDb ? String.format(BAD_DATAVERSE_DDL_MSG_DB, operation, dv, db)
+                : String.format(BAD_DATAVERSE_DDL_MESSAGE, operation, dv);
+    }
+
+    protected static String formatObjectDdlMessage(String operation, String object, Namespace ns, boolean usingDb) {
+        DataverseName dv = ns.getDataverseName();
+        return usingDb ? String.format(BAD_DATAVERSE_OBJECT_DDL_MSG_DB, operation, object, dv, ns.getDatabaseName())
+                : String.format(BAD_DATAVERSE_OBJECT_DDL_MESSAGE, operation, object, dv);
+    }
+
+    protected static Namespace getStatementNamespace(Namespace namespace, Namespace activeNamespace) {
+        return namespace != null ? namespace : activeNamespace;
+    }
+
+    protected static boolean isSystemNamespace(Namespace ns) {
+        return ns != null && (isSystemDatabase(ns.getDatabaseName()) || isMetadataDataverse(ns.getDataverseName()));
+    }
+
     protected static boolean isSystemDatabase(String databaseName) {
         return MetadataConstants.SYSTEM_DATABASE.equals(databaseName);
     }