[ASTERIXDB-3290][JDBC]  Changes to support database entity

Details:
Updated ADBMetadataStatement so that the Metadata queries
generated in case database entity is supported by the server
are compatible with the server metadata.

Change-Id: I53242717a5278fffd77c682a66694bb3c02b3ee6
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb-clients/+/17873
Reviewed-by: Peeyush Gupta <peeyush.gupta@couchbase.com>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBConnection.java b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBConnection.java
index ca61992..628c3fe 100644
--- a/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBConnection.java
+++ b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBConnection.java
@@ -60,6 +60,7 @@
     private volatile ADBMetaStatement metaStatement;
     private volatile String catalog;
     private volatile String schema;
+    private final boolean databaseEntitySupported;
 
     // Lifecycle
 
@@ -76,20 +77,31 @@
         this.catalogDataverseMode = getCatalogDataverseMode(properties, protocol.getErrorReporter());
         this.catalogIncludesSchemaless =
                 (Boolean) ADBDriverProperty.Common.CATALOG_INCLUDES_SCHEMALESS.fetchPropertyValue(properties);
+        this.databaseEntitySupported = checkDatabaseEntitySupport();
         initCatalogSchema(protocol, dataverseCanonicalName);
     }
 
     protected void initCatalogSchema(ADBProtocolBase protocol, String dataverseCanonicalName) throws SQLException {
         switch (catalogDataverseMode) {
             case CATALOG:
-                catalog = dataverseCanonicalName == null || dataverseCanonicalName.isEmpty()
-                        ? protocol.getDefaultDataverse() : dataverseCanonicalName;
+                if (dataverseCanonicalName == null || dataverseCanonicalName.isEmpty()) {
+                    catalog = isDatabaseEntitySupported()
+                            ? protocol.getDefaultDatabase() + "/" + protocol.getDefaultDataverse()
+                            : protocol.getDefaultDataverse();
+                } else {
+                    catalog = dataverseCanonicalName;
+                }
                 // schema = null
                 break;
             case CATALOG_SCHEMA:
                 if (dataverseCanonicalName == null || dataverseCanonicalName.isEmpty()) {
-                    catalog = protocol.getDefaultDataverse();
-                    // schema = null
+                    if (isDatabaseEntitySupported()) {
+                        catalog = protocol.getDefaultDatabase();
+                        schema = protocol.getDefaultDataverse();
+                    } else {
+                        catalog = protocol.getDefaultDataverse();
+                        // schema = null
+                    }
                 } else {
                     String[] parts = dataverseCanonicalName.split("/");
                     switch (parts.length) {
@@ -610,4 +622,28 @@
     public void setClientInfo(String name, String value) throws SQLClientInfoException {
         throw getErrorReporter().errorClientInfoMethodNotSupported(Connection.class, "setClientInfo");
     }
+
+    protected boolean checkDatabaseEntitySupport() throws SQLException {
+        checkClosed();
+
+        StringBuilder sql = new StringBuilder(256);
+        ADBStatement stmt = createStatementImpl();
+
+        sql.append("select count(*) ");
+        sql.append("from Metadata.`Dataset` ");
+        sql.append("where DataverseName='Metadata' and DatasetName='Database'");
+        ADBResultSet resultSet = stmt.executeQuery(sql.toString());
+        try {
+            if (resultSet.next()) {
+                return resultSet.getInt(1) > 0;
+            }
+            return false;
+        } finally {
+            stmt.close();
+        }
+    }
+
+    public boolean isDatabaseEntitySupported() {
+        return databaseEntitySupported;
+    }
 }
diff --git a/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBMetaStatement.java b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBMetaStatement.java
index 4525062..c2e555a 100644
--- a/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBMetaStatement.java
+++ b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBMetaStatement.java
@@ -61,12 +61,20 @@
         sql.append("from Metadata.`Dataverse` ");
         switch (connection.catalogDataverseMode) {
             case CATALOG:
-                sql.append("let TABLE_CAT = DataverseName ");
+                if (connection.isDatabaseEntitySupported()) {
+                    sql.append("let TABLE_CAT = DatabaseName || '/' || DataverseName ");
+                } else {
+                    sql.append("let TABLE_CAT = DataverseName ");
+                }
                 break;
             case CATALOG_SCHEMA:
-                sql.append("let name = decode_dataverse_name(DataverseName), ");
-                sql.append("TABLE_CAT = name[0] ");
-                sql.append("where (array_length(name) between 1 and 2) ");
+                if (connection.isDatabaseEntitySupported()) {
+                    sql.append("let TABLE_CAT = DatabaseName ");
+                } else {
+                    sql.append("let name = decode_dataverse_name(DataverseName), ");
+                    sql.append("TABLE_CAT = name[0] ");
+                    sql.append("where (array_length(name) between 1 and 2) ");
+                }
                 sql.append("group by TABLE_CAT ");
                 break;
             default:
@@ -109,15 +117,24 @@
         sql.append("let ");
         switch (connection.catalogDataverseMode) {
             case CATALOG:
-                sql.append("TABLE_CATALOG = DataverseName, ");
+                if (connection.isDatabaseEntitySupported()) {
+                    sql.append("TABLE_CATALOG = DatabaseName || '/' || DataverseName, ");
+                } else {
+                    sql.append("TABLE_CATALOG = DataverseName, ");
+                }
                 sql.append("TABLE_SCHEM = null ");
                 sql.append("where true ");
                 break;
             case CATALOG_SCHEMA:
-                sql.append("name = decode_dataverse_name(DataverseName), ");
-                sql.append("TABLE_CATALOG = name[0], ");
-                sql.append("TABLE_SCHEM = case array_length(name) when 1 then null else name[1] end ");
-                sql.append("where (array_length(name) between 1 and 2) ");
+                if (connection.isDatabaseEntitySupported()) {
+                    sql.append("TABLE_CATALOG = DatabaseName, ");
+                    sql.append("TABLE_SCHEM = DataverseName ");
+                } else {
+                    sql.append("name = decode_dataverse_name(DataverseName), ");
+                    sql.append("TABLE_CATALOG = name[0], ");
+                    sql.append("TABLE_SCHEM = case array_length(name) when 1 then null else name[1] end ");
+                    sql.append("where (array_length(name) between 1 and 2) ");
+                }
                 break;
             default:
                 throw new IllegalStateException();
@@ -149,16 +166,28 @@
         sql.append("null TYPE_SCHEM, null TYPE_NAME, null SELF_REFERENCING_COL_NAME, null REF_GENERATION ");
         sql.append("from Metadata.`Dataset` ds join Metadata.`Datatype` dt ");
         sql.append("on ds.DatatypeDataverseName = dt.DataverseName and ds.DatatypeName = dt.DatatypeName ");
+        if (connection.isDatabaseEntitySupported()) {
+            sql.append("and ds.DatatypeDatabaseName = dt.DatabaseName ");
+        }
         sql.append("let ");
         switch (connection.catalogDataverseMode) {
             case CATALOG:
-                sql.append("TABLE_CAT = ds.DataverseName, ");
+                if (connection.isDatabaseEntitySupported()) {
+                    sql.append("TABLE_CAT = ds.DatabaseName || '/' || ds.DataverseName, ");
+                } else {
+                    sql.append("TABLE_CAT = ds.DataverseName, ");
+                }
                 sql.append("TABLE_SCHEM = null, ");
                 break;
             case CATALOG_SCHEMA:
-                sql.append("dvname = decode_dataverse_name(ds.DataverseName), ");
-                sql.append("TABLE_CAT = dvname[0], ");
-                sql.append("TABLE_SCHEM = case array_length(dvname) when 1 then null else dvname[1] end, ");
+                if (connection.isDatabaseEntitySupported()) {
+                    sql.append("TABLE_CAT = ds.DatabaseName, ");
+                    sql.append("TABLE_SCHEM = ds.DataverseName, ");
+                } else {
+                    sql.append("dvname = decode_dataverse_name(ds.DataverseName), ");
+                    sql.append("TABLE_CAT = dvname[0], ");
+                    sql.append("TABLE_SCHEM = case array_length(dvname) when 1 then null else dvname[1] end, ");
+                }
                 break;
             default:
                 throw new IllegalStateException();
@@ -189,7 +218,9 @@
             case CATALOG:
                 break;
             case CATALOG_SCHEMA:
-                sql.append("and (array_length(dvname) between 1 and 2) ");
+                if (!connection.isDatabaseEntitySupported()) {
+                    sql.append("and (array_length(dvname) between 1 and 2) ");
+                }
                 break;
             default:
                 throw new IllegalStateException();
@@ -221,20 +252,35 @@
         sql.append("from Metadata.`Dataset` ds ");
         sql.append("join Metadata.`Datatype` dt ");
         sql.append("on ds.DatatypeDataverseName = dt.DataverseName and ds.DatatypeName = dt.DatatypeName ");
+        if (connection.isDatabaseEntitySupported()) {
+            sql.append("and ds.DatatypeDatabaseName = dt.DatabaseName ");
+        }
         sql.append("unnest dt.Derived.Record.Fields as field at fieldpos ");
         sql.append("left join Metadata.`Datatype` dt2 ");
         sql.append(
                 "on field.FieldType = dt2.DatatypeName and ds.DataverseName = dt2.DataverseName and dt2.Derived is known ");
+        if (connection.isDatabaseEntitySupported()) {
+            sql.append("and ds.DatabaseName = dt2.DatabaseName ");
+        }
         sql.append("let ");
         switch (connection.catalogDataverseMode) {
             case CATALOG:
-                sql.append("TABLE_CAT = ds.DataverseName, ");
+                if (connection.isDatabaseEntitySupported()) {
+                    sql.append("TABLE_CAT = ds.DatabaseName || '/' || ds.DataverseName, ");
+                } else {
+                    sql.append("TABLE_CAT = ds.DataverseName, ");
+                }
                 sql.append("TABLE_SCHEM = null, ");
                 break;
             case CATALOG_SCHEMA:
-                sql.append("dvname = decode_dataverse_name(ds.DataverseName), ");
-                sql.append("TABLE_CAT = dvname[0], ");
-                sql.append("TABLE_SCHEM = case array_length(dvname) when 1 then null else dvname[1] end, ");
+                if (connection.isDatabaseEntitySupported()) {
+                    sql.append("TABLE_CAT = ds.DatabaseName, ");
+                    sql.append("TABLE_SCHEM = ds.DataverseName, ");
+                } else {
+                    sql.append("dvname = decode_dataverse_name(ds.DataverseName), ");
+                    sql.append("TABLE_CAT = dvname[0], ");
+                    sql.append("TABLE_SCHEM = case array_length(dvname) when 1 then null else dvname[1] end, ");
+                }
                 break;
             default:
                 throw new IllegalStateException();
@@ -282,7 +328,9 @@
             case CATALOG:
                 break;
             case CATALOG_SCHEMA:
-                sql.append("and (array_length(dvname) between 1 and 2) ");
+                if (!connection.isDatabaseEntitySupported()) {
+                    sql.append("and (array_length(dvname) between 1 and 2) ");
+                }
                 break;
             default:
                 throw new IllegalStateException();
@@ -304,18 +352,30 @@
         sql.append("from Metadata.`Dataset` ds ");
         sql.append("join Metadata.`Datatype` dt ");
         sql.append("on ds.DatatypeDataverseName = dt.DataverseName and ds.DatatypeName = dt.DatatypeName ");
+        if (connection.isDatabaseEntitySupported()) {
+            sql.append("and ds.DatatypeDatabaseName = dt.DatabaseName ");
+        }
         sql.append("unnest coalesce(ds.InternalDetails, ds.ExternalDetails, ds.ViewDetails).PrimaryKey pki at pkipos ");
         sql.append("let ");
         sql.append("hasFields = array_length(dt.Derived.Record.Fields) > 0, ");
         switch (connection.catalogDataverseMode) {
             case CATALOG:
-                sql.append("TABLE_CAT = ds.DataverseName, ");
+                if (connection.isDatabaseEntitySupported()) {
+                    sql.append("TABLE_CAT = ds.DatabaseName || '/' || ds.DataverseName, ");
+                } else {
+                    sql.append("TABLE_CAT = ds.DataverseName, ");
+                }
                 sql.append("TABLE_SCHEM = null, ");
                 break;
             case CATALOG_SCHEMA:
-                sql.append("dvname = decode_dataverse_name(ds.DataverseName), ");
-                sql.append("TABLE_CAT = dvname[0], ");
-                sql.append("TABLE_SCHEM = case array_length(dvname) when 1 then null else dvname[1] end, ");
+                if (connection.isDatabaseEntitySupported()) {
+                    sql.append("TABLE_CAT = ds.DatabaseName, ");
+                    sql.append("TABLE_SCHEM = ds.DataverseName, ");
+                } else {
+                    sql.append("dvname = decode_dataverse_name(ds.DataverseName), ");
+                    sql.append("TABLE_CAT = dvname[0], ");
+                    sql.append("TABLE_SCHEM = case array_length(dvname) when 1 then null else dvname[1] end, ");
+                }
                 break;
             default:
                 throw new IllegalStateException();
@@ -340,7 +400,9 @@
             case CATALOG:
                 break;
             case CATALOG_SCHEMA:
-                sql.append("and (array_length(dvname) between 1 and 2) ");
+                if (!connection.isDatabaseEntitySupported()) {
+                    sql.append("and (array_length(dvname) between 1 and 2) ");
+                }
                 break;
             default:
                 throw new IllegalStateException();
@@ -386,26 +448,44 @@
         sql.append("from Metadata.`Dataset` ds ");
         sql.append("join Metadata.`Datatype` dt ");
         sql.append("on ds.DatatypeDataverseName = dt.DataverseName and ds.DatatypeName = dt.DatatypeName ");
+        if (connection.isDatabaseEntitySupported()) {
+            sql.append("and ds.DatatypeDatabaseName = dt.DatabaseName ");
+        }
         sql.append("unnest coalesce(ds.InternalDetails, ds.ExternalDetails, ds.ViewDetails).ForeignKeys fk at fkpos ");
         sql.append("join Metadata.`Dataset` ds2 ");
         sql.append("on fk.RefDataverseName = ds2.DataverseName and fk.RefDatasetName = ds2.DatasetName ");
+        if (connection.isDatabaseEntitySupported()) {
+            sql.append("and fk.RefDatabaseName = ds2.DatabaseName ");
+        }
         sql.append("unnest fk.ForeignKey fki at fkipos ");
         sql.append("let ");
         sql.append("hasFields = array_length(dt.Derived.Record.Fields) > 0, ");
         switch (connection.catalogDataverseMode) {
             case CATALOG:
-                sql.append("FKTABLE_CAT = ds.DataverseName, ");
+                if (connection.isDatabaseEntitySupported()) {
+                    sql.append("FKTABLE_CAT = ds.DatabaseName || '/' || ds.DataverseName, ");
+                    sql.append("PKTABLE_CAT = ds2.DatabaseName || '/' || ds2.DataverseName, ");
+                } else {
+                    sql.append("FKTABLE_CAT = ds.DataverseName, ");
+                    sql.append("PKTABLE_CAT = ds2.DataverseName, ");
+                }
                 sql.append("FKTABLE_SCHEM = null, ");
-                sql.append("PKTABLE_CAT = ds2.DataverseName, ");
                 sql.append("PKTABLE_SCHEM = null, ");
                 break;
             case CATALOG_SCHEMA:
-                sql.append("dvname = decode_dataverse_name(ds.DataverseName), ");
-                sql.append("FKTABLE_CAT = dvname[0], ");
-                sql.append("FKTABLE_SCHEM = case array_length(dvname) when 1 then null else dvname[1] end, ");
-                sql.append("dvname2 = decode_dataverse_name(ds2.DataverseName), ");
-                sql.append("PKTABLE_CAT = dvname2[0], ");
-                sql.append("PKTABLE_SCHEM = case array_length(dvname2) when 1 then null else dvname2[1] end, ");
+                if (connection.isDatabaseEntitySupported()) {
+                    sql.append("FKTABLE_CAT = ds.DatabaseName, ");
+                    sql.append("FKTABLE_SCHEM = ds.DataverseName, ");
+                    sql.append("PKTABLE_CAT = ds2.DatabaseName, ");
+                    sql.append("PKTABLE_SCHEM = ds2.DataverseName, ");
+                } else {
+                    sql.append("dvname = decode_dataverse_name(ds.DataverseName), ");
+                    sql.append("FKTABLE_CAT = dvname[0], ");
+                    sql.append("FKTABLE_SCHEM = case array_length(dvname) when 1 then null else dvname[1] end, ");
+                    sql.append("dvname2 = decode_dataverse_name(ds2.DataverseName), ");
+                    sql.append("PKTABLE_CAT = dvname2[0], ");
+                    sql.append("PKTABLE_SCHEM = case array_length(dvname2) when 1 then null else dvname2[1] end, ");
+                }
                 break;
             default:
                 throw new IllegalStateException();
@@ -447,8 +527,10 @@
             case CATALOG:
                 break;
             case CATALOG_SCHEMA:
-                sql.append("and (array_length(dvname) between 1 and 2) ");
-                sql.append("and (array_length(dvname2) between 1 and 2) ");
+                if (!connection.isDatabaseEntitySupported()) {
+                    sql.append("and (array_length(dvname) between 1 and 2) ");
+                    sql.append("and (array_length(dvname2) between 1 and 2) ");
+                }
                 break;
             default:
                 throw new IllegalStateException();
diff --git a/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBProtocolBase.java b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBProtocolBase.java
index 3186aa0..0235edd 100644
--- a/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBProtocolBase.java
+++ b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBProtocolBase.java
@@ -62,6 +62,7 @@
     public static final String PLAN_FORMAT_STRING = "string";
 
     private static final String DEFAULT_DATAVERSE = "Default";
+    private static final String DEFAULT_DATABASE = "Default";
     private static final String OPTIONAL_TYPE_SUFFIX = "?";
     private static final String EXPLAIN_ONLY_RESULT_COLUMN_NAME = "$1";
 
@@ -259,6 +260,10 @@
         return DEFAULT_DATAVERSE;
     }
 
+    public String getDefaultDatabase() {
+        return DEFAULT_DATABASE;
+    }
+
     public static class SubmitStatementOptions {
         public String dataverseName;
         public int timeoutSeconds;