[NO ISSUE][JDBC] Support foreign key metadata, other improvements
Change-Id: I01dcaf1e9ade568363df51f58f412956c9e0da45
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb-clients/+/13584
Reviewed-by: Dmitry Lychagin <dmitry.lychagin@couchbase.com>
Reviewed-by: Ian Maxon <imaxon@uci.edu>
Tested-by: Dmitry Lychagin <dmitry.lychagin@couchbase.com>
diff --git a/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBConnection.java b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBConnection.java
index 46e38bc..3a0ae5e 100644
--- a/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBConnection.java
+++ b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBConnection.java
@@ -49,35 +49,64 @@
public class ADBConnection extends ADBWrapperSupport implements Connection {
final ADBProtocol protocol;
-
final String url;
-
final String databaseVersion;
-
+ final ADBDriverProperty.CatalogDataverseMode catalogDataverseMode;
+ final boolean catalogIncludesSchemaless;
+ final boolean sqlCompatMode;
private final AtomicBoolean closed;
-
private final ConcurrentLinkedQueue<ADBStatement> statements;
-
private volatile SQLWarning warning;
-
private volatile ADBMetaStatement metaStatement;
-
- volatile String catalog;
-
- volatile String schema;
+ private volatile String catalog;
+ private volatile String schema;
// Lifecycle
- protected ADBConnection(ADBProtocol protocol, String url, String databaseVersion, String catalog, String schema,
- SQLWarning connectWarning) {
+ protected ADBConnection(ADBProtocol protocol, String url, String databaseVersion, String dataverseCanonicalName,
+ Map<ADBDriverProperty, Object> properties, SQLWarning connectWarning) throws SQLException {
this.url = Objects.requireNonNull(url);
this.protocol = Objects.requireNonNull(protocol);
this.databaseVersion = databaseVersion;
this.statements = new ConcurrentLinkedQueue<>();
this.warning = connectWarning;
- this.catalog = catalog;
- this.schema = schema;
this.closed = new AtomicBoolean(false);
+ this.sqlCompatMode = (Boolean) ADBDriverProperty.Common.SQL_COMPAT_MODE.fetchPropertyValue(properties);
+ this.catalogDataverseMode = getCatalogDataverseMode(protocol, properties);
+ this.catalogIncludesSchemaless =
+ (Boolean) ADBDriverProperty.Common.CATALOG_INCLUDES_SCHEMALESS.fetchPropertyValue(properties);
+ initCatalogSchema(protocol, dataverseCanonicalName);
+ }
+
+ private void initCatalogSchema(ADBProtocol protocol, String dataverseCanonicalName) throws SQLException {
+ switch (catalogDataverseMode) {
+ case CATALOG:
+ catalog = dataverseCanonicalName == null || dataverseCanonicalName.isEmpty()
+ ? ADBProtocol.DEFAULT_DATAVERSE : dataverseCanonicalName;
+ // schema = null
+ break;
+ case CATALOG_SCHEMA:
+ if (dataverseCanonicalName == null || dataverseCanonicalName.isEmpty()) {
+ catalog = ADBProtocol.DEFAULT_DATAVERSE;
+ // schema = null
+ } else {
+ String[] parts = dataverseCanonicalName.split("/");
+ switch (parts.length) {
+ case 1:
+ catalog = parts[0];
+ break;
+ case 2:
+ catalog = parts[0];
+ schema = parts[1];
+ break;
+ default:
+ throw protocol.getErrorReporter().errorInConnection(dataverseCanonicalName); //TODO:FIXME
+ }
+ }
+ break;
+ default:
+ throw new IllegalStateException();
+ }
}
@Override
@@ -239,7 +268,7 @@
}
private ADBStatement createStatementImpl() {
- ADBStatement stmt = new ADBStatement(this, catalog, schema);
+ ADBStatement stmt = new ADBStatement(this);
registerStatement(stmt);
return stmt;
}
@@ -280,7 +309,7 @@
}
private ADBPreparedStatement prepareStatementImpl(String sql) throws SQLException {
- ADBPreparedStatement stmt = new ADBPreparedStatement(this, sql, catalog, schema);
+ ADBPreparedStatement stmt = new ADBPreparedStatement(this, sql);
registerStatement(stmt);
return stmt;
}
@@ -328,9 +357,36 @@
@Override
public void setSchema(String schema) throws SQLException {
checkClosed();
+ if (catalogDataverseMode == ADBDriverProperty.CatalogDataverseMode.CATALOG
+ && (schema != null && !schema.isEmpty())) {
+ throw getErrorReporter().errorInConnection(schema); //TODO:FIXME:REVIEW make no-op?
+ }
this.schema = schema;
}
+ String getDataverseCanonicalName() {
+ switch (catalogDataverseMode) {
+ case CATALOG:
+ return catalog;
+ case CATALOG_SCHEMA:
+ String c = catalog;
+ String s = schema;
+ return s == null ? c : c + "/" + s;
+ default:
+ throw new IllegalStateException();
+ }
+ }
+
+ private static ADBDriverProperty.CatalogDataverseMode getCatalogDataverseMode(ADBProtocol protocol,
+ Map<ADBDriverProperty, Object> properties) throws SQLException {
+ int mode = ((Number) ADBDriverProperty.Common.CATALOG_DATAVERSE_MODE.fetchPropertyValue(properties)).intValue();
+ try {
+ return ADBDriverProperty.CatalogDataverseMode.valueOf(mode);
+ } catch (IllegalArgumentException e) {
+ throw protocol.getErrorReporter().errorInConnection(String.valueOf(mode)); //TODO:FIXME
+ }
+ }
+
// Statement lifecycle
private void registerStatement(ADBStatement stmt) {
diff --git a/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBDatabaseMetaData.java b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBDatabaseMetaData.java
index a090498..1d463dc 100644
--- a/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBDatabaseMetaData.java
+++ b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBDatabaseMetaData.java
@@ -122,7 +122,7 @@
@Override
public ADBResultSet getSchemas() throws SQLException {
- return getSchemas(metaStatement.connection.catalog, null);
+ return metaStatement.executeGetSchemasQuery();
}
@Override
@@ -135,6 +135,15 @@
return METADATA_OBJECT_NAME_LENGTH_LIMIT_UTF8;
}
+ //TODO:document
+ private boolean supportsCatalogsInStatements() {
+ return false;
+ }
+
+ private boolean supportsSchemasInStatements() {
+ return false;
+ }
+
// Tables
@Override
@@ -214,18 +223,19 @@
@Override
public ADBResultSet getImportedKeys(String catalog, String schema, String table) throws SQLException {
- return metaStatement.executeEmptyResultQuery();
+ return metaStatement.executeGetImportedKeysQuery(catalog, schema, table);
}
@Override
public ADBResultSet getExportedKeys(String catalog, String schema, String table) throws SQLException {
- return metaStatement.executeEmptyResultQuery();
+ return metaStatement.executeGetExportedKeysQuery(catalog, schema, table);
}
@Override
public ADBResultSet getCrossReference(String parentCatalog, String parentSchema, String parentTable,
String foreignCatalog, String foreignSchema, String foreignTable) throws SQLException {
- return metaStatement.executeEmptyResultQuery();
+ return metaStatement.executeCrossReferenceQuery(parentCatalog, parentSchema, parentTable, foreignCatalog,
+ foreignSchema, foreignTable);
}
// Indexes
@@ -696,12 +706,12 @@
@Override
public boolean supportsCatalogsInDataManipulation() {
- return true;
+ return supportsCatalogsInStatements();
}
@Override
public boolean supportsSchemasInDataManipulation() {
- return true;
+ return supportsSchemasInStatements();
}
@Override
@@ -725,12 +735,12 @@
@Override
public boolean supportsCatalogsInTableDefinitions() {
- return true;
+ return supportsCatalogsInStatements();
}
@Override
public boolean supportsSchemasInTableDefinitions() {
- return true;
+ return supportsSchemasInStatements();
}
@Override
@@ -752,24 +762,24 @@
@Override
public boolean supportsCatalogsInIndexDefinitions() {
- return true;
+ return supportsCatalogsInStatements();
}
@Override
public boolean supportsSchemasInIndexDefinitions() {
- return true;
+ return supportsSchemasInStatements();
}
// DDL: GRANT / REVOKE (not supported)
@Override
public boolean supportsCatalogsInPrivilegeDefinitions() {
- return false;
+ return supportsCatalogsInStatements();
}
@Override
public boolean supportsSchemasInPrivilegeDefinitions() {
- return false;
+ return supportsSchemasInStatements();
}
@Override
@@ -786,12 +796,12 @@
@Override
public boolean supportsCatalogsInProcedureCalls() {
- return false;
+ return supportsCatalogsInStatements();
}
@Override
public boolean supportsSchemasInProcedureCalls() {
- return false;
+ return supportsSchemasInStatements();
}
// Transactions
diff --git a/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBDriverBase.java b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBDriverBase.java
index 5d89612..315f270 100644
--- a/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBDriverBase.java
+++ b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBDriverBase.java
@@ -154,21 +154,6 @@
port = defaultApiPort;
}
- String catalog = ADBProtocol.DEFAULT_DATAVERSE;
- String schema = null;
- String path = subUri.getPath();
- if (path != null && path.length() > 1 && path.startsWith("/")) {
- String[] dataverse = path.substring(1).split("/");
- switch (dataverse.length) {
- case 2:
- schema = dataverse[1];
- // fall thru to 1
- case 1:
- catalog = dataverse[0];
- break;
- }
- }
-
List<NameValuePair> urlParams = URLEncodedUtils.parse(subUri, StandardCharsets.UTF_8);
ADBDriverContext driverContext = getOrCreateDriverContext();
@@ -177,10 +162,14 @@
parseConnectionProperties(urlParams, info, driverContext, properties, warning);
warning = warning.getNextWarning() != null ? warning.getNextWarning() : null;
+ String path = subUri.getPath();
+ String dataverseCanonicalName =
+ path != null && path.length() > 1 && path.startsWith("/") ? path.substring(1) : null;
+
ADBProtocol protocol = createProtocol(host, port, properties, driverContext);
try {
String databaseVersion = protocol.connect();
- return createConnection(protocol, url, databaseVersion, catalog, schema, warning);
+ return createConnection(protocol, url, databaseVersion, dataverseCanonicalName, properties, warning);
} catch (SQLException e) {
try {
protocol.close();
@@ -248,8 +237,9 @@
return new ADBProtocol(host, port, properties, driverContext);
}
- protected ADBConnection createConnection(ADBProtocol protocol, String url, String databaseVersion, String catalog,
- String schema, SQLWarning connectWarning) {
- return new ADBConnection(protocol, url, databaseVersion, catalog, schema, connectWarning);
+ protected ADBConnection createConnection(ADBProtocol protocol, String url, String databaseVersion,
+ String dataverseCanonicalName, Map<ADBDriverProperty, Object> properties, SQLWarning connectWarning)
+ throws SQLException {
+ return new ADBConnection(protocol, url, databaseVersion, dataverseCanonicalName, properties, connectWarning);
}
}
diff --git a/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBDriverContext.java b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBDriverContext.java
index 89f01e9..4b57f7d 100644
--- a/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBDriverContext.java
+++ b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBDriverContext.java
@@ -31,7 +31,7 @@
import com.fasterxml.jackson.databind.ObjectWriter;
import com.fasterxml.jackson.databind.module.SimpleModule;
-final class ADBDriverContext {
+public class ADBDriverContext {
final Class<? extends ADBDriverBase> driverClass;
diff --git a/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBDriverProperty.java b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBDriverProperty.java
index 1d1e67e..37cf57e 100644
--- a/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBDriverProperty.java
+++ b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBDriverProperty.java
@@ -19,10 +19,11 @@
package org.apache.asterix.jdbc.core;
+import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
-interface ADBDriverProperty {
+public interface ADBDriverProperty {
String getPropertyName();
@@ -36,7 +37,10 @@
PASSWORD("password", Function.identity(), null),
CONNECT_TIMEOUT("connectTimeout", Integer::parseInt, null),
SOCKET_TIMEOUT("socketTimeout", Integer::parseInt, null),
- MAX_WARNINGS("maxWarnings", Integer::parseInt, 10);
+ MAX_WARNINGS("maxWarnings", Integer::parseInt, 10),
+ CATALOG_DATAVERSE_MODE("catalogDataverseMode", Integer::parseInt, 1), // 1 -> CATALOG, 2 -> CATALOG_SCHEMA
+ CATALOG_INCLUDES_SCHEMALESS("catalogIncludesSchemaless", Boolean::parseBoolean, false),
+ SQL_COMPAT_MODE("sqlCompatMode", Boolean::parseBoolean, true); // Whether user statements are executed in 'SQL-compat' mode
private final String propertyName;
@@ -67,5 +71,25 @@
public String toString() {
return getPropertyName();
}
+
+ public Object fetchPropertyValue(Map<ADBDriverProperty, Object> properties) {
+ return properties.getOrDefault(this, defaultValue);
+ }
+ }
+
+ enum CatalogDataverseMode {
+ CATALOG,
+ CATALOG_SCHEMA;
+
+ static CatalogDataverseMode valueOf(int n) {
+ switch (n) {
+ case 1:
+ return CATALOG;
+ case 2:
+ return CATALOG_SCHEMA;
+ default:
+ throw new IllegalArgumentException(String.valueOf(n));
+ }
+ }
}
}
diff --git a/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBErrorReporter.java b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBErrorReporter.java
index f31e18a..4f409b7 100644
--- a/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBErrorReporter.java
+++ b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBErrorReporter.java
@@ -20,6 +20,7 @@
package org.apache.asterix.jdbc.core;
import java.io.IOException;
+import java.net.URISyntaxException;
import java.sql.SQLClientInfoException;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
@@ -110,7 +111,7 @@
protected SQLInvalidAuthorizationSpecException errorAuth() {
return new SQLInvalidAuthorizationSpecException("Authentication/authorization error",
- SQLState.CONNECTION_REJECTED.code);
+ SQLState.INVALID_AUTH_SPEC.code);
}
protected SQLException errorColumnNotFound(String columnNameOrNumber) {
@@ -163,6 +164,10 @@
return new SQLException(String.format("Cannot create request. %s", getMessage(e)), e);
}
+ protected SQLException errorInRequestURIGeneration(URISyntaxException e) {
+ return new SQLException(String.format("Cannot create request URI. %s", getMessage(e)), e);
+ }
+
protected SQLException errorInResultHandling(IOException e) {
return new SQLException(String.format("Cannot reading result. %s", getMessage(e)), e);
}
@@ -193,8 +198,8 @@
}
public enum SQLState {
- CONNECTION_REJECTED("08004"),
- CONNECTION_FAILURE("08006"),
+ CONNECTION_FAILURE("08001"), // TODO:08006??
+ INVALID_AUTH_SPEC("28000"),
INVALID_DATE_TYPE("HY004"),
INVALID_CURSOR_POSITION("HY108");
diff --git a/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBMetaStatement.java b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBMetaStatement.java
index 7fa8127..bea8c26 100644
--- a/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBMetaStatement.java
+++ b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBMetaStatement.java
@@ -37,41 +37,95 @@
public static final String TABLE = "TABLE";
public static final String VIEW = "VIEW";
+ private static final String PK_NAME_SUFFIX = "_pk";
+ private static final String FK_NAME_SUFFIX = "_fk";
+
protected ADBMetaStatement(ADBConnection connection) {
- super(connection, null, null);
+ super(connection);
+ }
+
+ protected void populateQueryProlog(StringBuilder sql, String comment) {
+ if (comment != null) {
+ sql.append("/* ").append(comment).append(" */\n");
+ }
+ //sql.append("set `compiler.min.memory.allocation` 'false';\n");
}
ADBResultSet executeGetCatalogsQuery() throws SQLException {
checkClosed();
StringBuilder sql = new StringBuilder(256);
+ populateQueryProlog(sql, "JDBC-GetCatalogs");
sql.append("select TABLE_CAT ");
sql.append("from Metadata.`Dataverse` ");
- sql.append("let name = decode_dataverse_name(DataverseName), ");
- sql.append("TABLE_CAT = name[0] ");
- sql.append("where array_length(name) between 1 and 2 ");
- sql.append("group by TABLE_CAT ");
+ switch (connection.catalogDataverseMode) {
+ case CATALOG:
+ sql.append("let TABLE_CAT = DataverseName ");
+ break;
+ case CATALOG_SCHEMA:
+ sql.append("let name = decode_dataverse_name(DataverseName), ");
+ sql.append("TABLE_CAT = name[0] ");
+ sql.append("where (array_length(name) between 1 and 2) ");
+ sql.append("group by TABLE_CAT ");
+ break;
+ default:
+ throw new IllegalStateException();
+ }
+
sql.append("order by TABLE_CAT");
return executeQueryImpl(sql.toString(), null);
}
+ ADBResultSet executeGetSchemasQuery() throws SQLException {
+ String catalog;
+ switch (connection.catalogDataverseMode) {
+ case CATALOG:
+ catalog = connection.getDataverseCanonicalName();
+ break;
+ case CATALOG_SCHEMA:
+ catalog = connection.getCatalog();
+ break;
+ default:
+ throw new IllegalStateException();
+ }
+ return executeGetSchemasQuery(catalog, null, "0");
+ }
+
ADBResultSet executeGetSchemasQuery(String catalog, String schemaPattern) throws SQLException {
+ return executeGetSchemasQuery(catalog, schemaPattern, "1");
+ }
+
+ ADBResultSet executeGetSchemasQuery(String catalog, String schemaPattern, String tag) throws SQLException {
checkClosed();
StringBuilder sql = new StringBuilder(512);
+ populateQueryProlog(sql, "JDBC-GetSchemas-" + tag);
+
sql.append("select TABLE_SCHEM, TABLE_CATALOG ");
sql.append("from Metadata.`Dataverse` ");
- sql.append("let name = decode_dataverse_name(DataverseName), ");
- sql.append("TABLE_CATALOG = name[0], ");
- sql.append("TABLE_SCHEM = case array_length(name) when 1 then null else name[1] end ");
- sql.append("where array_length(name) between 1 and 2 ");
+ sql.append("let ");
+ switch (connection.catalogDataverseMode) {
+ case CATALOG:
+ sql.append("TABLE_CATALOG = DataverseName, ");
+ sql.append("TABLE_SCHEM = null ");
+ sql.append("where true ");
+ break;
+ case CATALOG_SCHEMA:
+ sql.append("name = decode_dataverse_name(DataverseName), ");
+ sql.append("TABLE_CATALOG = name[0], ");
+ sql.append("TABLE_SCHEM = case array_length(name) when 1 then null else name[1] end ");
+ sql.append("where (array_length(name) between 1 and 2) ");
+ break;
+ default:
+ throw new IllegalStateException();
+ }
if (catalog != null) {
- sql.append("and TABLE_CATALOG = $1 ");
+ sql.append("and (TABLE_CATALOG = $1) ");
}
if (schemaPattern != null) {
- sql.append("and if_null(TABLE_SCHEM, '') like $2 ");
+ sql.append("and (if_null(TABLE_SCHEM, '') like $2) ");
}
sql.append("order by TABLE_CATALOG, TABLE_SCHEM");
@@ -88,38 +142,65 @@
String viewTermNonTabular = getViewTerm(false);
StringBuilder sql = new StringBuilder(1024);
+ populateQueryProlog(sql, "JDBC-GetTables");
+
sql.append("select TABLE_CAT, TABLE_SCHEM, TABLE_NAME, TABLE_TYPE, null REMARKS, null TYPE_CAT, ");
sql.append("null TYPE_SCHEM, null TYPE_NAME, null SELF_REFERENCING_COL_NAME, null REF_GENERATION ");
sql.append("from Metadata.`Dataset` ds join Metadata.`Datatype` dt ");
sql.append("on ds.DatatypeDataverseName = dt.DataverseName and ds.DatatypeName = dt.DatatypeName ");
- sql.append("let dvname = decode_dataverse_name(ds.DataverseName), ");
+ sql.append("let ");
+ switch (connection.catalogDataverseMode) {
+ case CATALOG:
+ sql.append("TABLE_CAT = ds.DataverseName, ");
+ sql.append("TABLE_SCHEM = null, ");
+ break;
+ case CATALOG_SCHEMA:
+ sql.append("dvname = decode_dataverse_name(ds.DataverseName), ");
+ sql.append("TABLE_CAT = dvname[0], ");
+ sql.append("TABLE_SCHEM = case array_length(dvname) when 1 then null else dvname[1] end, ");
+ break;
+ default:
+ throw new IllegalStateException();
+ }
+ sql.append("TABLE_NAME = ds.DatasetName, ");
sql.append("isDataset = (ds.DatasetType = 'INTERNAL' or ds.DatasetType = 'EXTERNAL'), ");
sql.append("isView = ds.DatasetType = 'VIEW', ");
sql.append("hasFields = array_length(dt.Derived.Record.Fields) > 0, ");
- sql.append("TABLE_CAT = dvname[0], ");
- sql.append("TABLE_SCHEM = case array_length(dvname) when 1 then null else dvname[1] end, ");
- sql.append("TABLE_NAME = ds.DatasetName, ");
sql.append("TABLE_TYPE = case ");
sql.append("when isDataset then (case when hasFields then '").append(datasetTermTabular).append("' else '")
.append(datasetTermNonTabular).append("' end) ");
sql.append("when isView then (case when hasFields then '").append(viewTermTabular).append("' else '")
.append(viewTermNonTabular).append("' end) ");
sql.append("else null end ");
- sql.append("where array_length(dvname) between 1 and 2 ");
+
+ sql.append("where ");
+ sql.append("(TABLE_TYPE ").append(types != null ? "in $1" : "is not null").append(") ");
if (catalog != null) {
- sql.append("and TABLE_CAT = $1 ");
+ sql.append("and (TABLE_CAT = $2) ");
}
if (schemaPattern != null) {
- sql.append("and if_null(TABLE_SCHEM, '') like $2 ");
+ sql.append("and (if_null(TABLE_SCHEM, '') like $3) ");
}
if (tableNamePattern != null) {
- sql.append("and TABLE_NAME like $3 ");
+ sql.append("and (TABLE_NAME like $4) ");
}
- sql.append("and TABLE_TYPE ").append(types != null ? "in $4" : "is not null").append(" ");
+ switch (connection.catalogDataverseMode) {
+ case CATALOG:
+ break;
+ case CATALOG_SCHEMA:
+ sql.append("and (array_length(dvname) between 1 and 2) ");
+ break;
+ default:
+ throw new IllegalStateException();
+ }
+ if (!connection.catalogIncludesSchemaless) {
+ sql.append("and hasFields ");
+ }
+
sql.append("order by TABLE_TYPE, TABLE_CAT, TABLE_SCHEM, TABLE_NAME");
- return executeQueryImpl(sql.toString(),
- Arrays.asList(catalog, schemaPattern, tableNamePattern, types != null ? Arrays.asList(types) : null));
+ List<String> typesList = types != null ? Arrays.asList(types) : null;
+ return executeQueryImpl(sql.toString(), Arrays.asList(typesList, catalog, schemaPattern, tableNamePattern));
}
ADBResultSet executeGetColumnsQuery(String catalog, String schemaPattern, String tableNamePattern,
@@ -127,6 +208,8 @@
checkClosed();
StringBuilder sql = new StringBuilder(2048);
+ populateQueryProlog(sql, "JDBC-GetColumns");
+
sql.append("select TABLE_CAT, TABLE_SCHEM, TABLE_NAME, COLUMN_NAME, DATA_TYPE, TYPE_NAME, COLUMN_SIZE, ");
sql.append("1 BUFFER_LENGTH, null DECIMAL_DIGITS, 2 NUM_PREC_RADIX, NULLABLE, ");
sql.append("null REMARKS, null COLUMN_DEF, DATA_TYPE SQL_DATA_TYPE,");
@@ -141,9 +224,20 @@
sql.append("left join Metadata.`Datatype` dt2 ");
sql.append(
"on field.FieldType = dt2.DatatypeName and ds.DataverseName = dt2.DataverseName and dt2.Derived is known ");
- sql.append("let dvname = decode_dataverse_name(ds.DataverseName), ");
- sql.append("TABLE_CAT = dvname[0], ");
- sql.append("TABLE_SCHEM = case array_length(dvname) when 1 then null else dvname[1] end, ");
+ sql.append("let ");
+ switch (connection.catalogDataverseMode) {
+ case CATALOG:
+ sql.append("TABLE_CAT = ds.DataverseName, ");
+ sql.append("TABLE_SCHEM = null, ");
+ break;
+ case CATALOG_SCHEMA:
+ sql.append("dvname = decode_dataverse_name(ds.DataverseName), ");
+ sql.append("TABLE_CAT = dvname[0], ");
+ sql.append("TABLE_SCHEM = case array_length(dvname) when 1 then null else dvname[1] end, ");
+ break;
+ default:
+ throw new IllegalStateException();
+ }
sql.append("TABLE_NAME = ds.DatasetName, ");
sql.append("COLUMN_NAME = field.FieldName, ");
sql.append("TYPE_NAME = case ");
@@ -169,21 +263,30 @@
sql.append("COLUMN_SIZE = case field.FieldType when 'string' then 32767 else 8 end, "); // TODO:based on type
sql.append("ORDINAL_POSITION = fieldpos, ");
sql.append("NULLABLE = case when field.IsNullable or field.IsMissable then 1 else 0 end ");
- sql.append("where array_length(dvname) between 1 and 2 ");
- sql.append("and array_length(dt.Derived.Record.Fields) > 0 ");
+ sql.append("where (array_length(dt.Derived.Record.Fields) > 0) ");
if (catalog != null) {
- sql.append("and TABLE_CAT = $1 ");
+ sql.append("and (TABLE_CAT = $1) ");
}
if (schemaPattern != null) {
- sql.append("and if_null(TABLE_SCHEM, '') like $2 ");
+ sql.append("and (if_null(TABLE_SCHEM, '') like $2) ");
}
if (tableNamePattern != null) {
- sql.append("and TABLE_NAME like $3 ");
+ sql.append("and (TABLE_NAME like $3) ");
}
if (columnNamePattern != null) {
- sql.append("and COLUMN_NAME like $4 ");
+ sql.append("and (COLUMN_NAME like $4) ");
}
+ switch (connection.catalogDataverseMode) {
+ case CATALOG:
+ break;
+ case CATALOG_SCHEMA:
+ sql.append("and (array_length(dvname) between 1 and 2) ");
+ break;
+ default:
+ throw new IllegalStateException();
+ }
+
sql.append("order by TABLE_CAT, TABLE_SCHEM, TABLE_NAME, ORDINAL_POSITION");
return executeQueryImpl(sql.toString(),
@@ -194,39 +297,181 @@
checkClosed();
StringBuilder sql = new StringBuilder(1024);
- sql.append("select TABLE_CAT, TABLE_SCHEM, TABLE_NAME, COLUMN_NAME, KEY_SEQ, null PK_NAME ");
- sql.append("from Metadata.`Dataset` ds unnest ds.InternalDetails.PrimaryKey pki at pkipos ");
- sql.append("let dvname = decode_dataverse_name(ds.DataverseName), ");
- sql.append("TABLE_CAT = dvname[0], ");
- sql.append("TABLE_SCHEM = case array_length(dvname) when 1 then null else dvname[1] end, ");
+ populateQueryProlog(sql, "JDBC-GetPrimaryKeys");
+
+ sql.append("select TABLE_CAT, TABLE_SCHEM, TABLE_NAME, COLUMN_NAME, KEY_SEQ, PK_NAME ");
+ sql.append("from Metadata.`Dataset` ds ");
+ sql.append("join Metadata.`Datatype` dt ");
+ sql.append("on ds.DatatypeDataverseName = dt.DataverseName and ds.DatatypeName = dt.DatatypeName ");
+ sql.append("unnest coalesce(ds.InternalDetails, ds.ExternalDetails, ds.ViewDetails).PrimaryKey pki at pkipos ");
+ sql.append("let ");
+ sql.append("hasFields = array_length(dt.Derived.Record.Fields) > 0, ");
+ switch (connection.catalogDataverseMode) {
+ case CATALOG:
+ sql.append("TABLE_CAT = ds.DataverseName, ");
+ sql.append("TABLE_SCHEM = null, ");
+ break;
+ case CATALOG_SCHEMA:
+ sql.append("dvname = decode_dataverse_name(ds.DataverseName), ");
+ sql.append("TABLE_CAT = dvname[0], ");
+ sql.append("TABLE_SCHEM = case array_length(dvname) when 1 then null else dvname[1] end, ");
+ break;
+ default:
+ throw new IllegalStateException();
+ }
sql.append("TABLE_NAME = ds.DatasetName, ");
sql.append("COLUMN_NAME = pki[0], ");
- sql.append("KEY_SEQ = pkipos ");
- sql.append("where array_length(dvname) between 1 and 2 ");
- sql.append("and (every pk in ds.InternalDetails.PrimaryKey satisfies array_length(pk) = 1 end) ");
- sql.append("and (every si in ds.InternalDetails.KeySourceIndicator satisfies si = 0 end ) ");
+ sql.append("KEY_SEQ = pkipos, ");
+ sql.append("PK_NAME = TABLE_NAME || '").append(PK_NAME_SUFFIX).append("', ");
+ sql.append("dsDetails = coalesce(ds.InternalDetails, ds.ExternalDetails, ds.ViewDetails) ");
+ sql.append("where (every pk in dsDetails.PrimaryKey satisfies array_length(pk) = 1 end) ");
+ sql.append("and (every si in dsDetails.KeySourceIndicator satisfies si = 0 end ) ");
if (catalog != null) {
- sql.append("and TABLE_CAT = $1 ");
+ sql.append("and (TABLE_CAT = $1) ");
}
if (schema != null) {
- sql.append("and if_null(TABLE_SCHEM, '') like $2 ");
+ sql.append("and (if_null(TABLE_SCHEM, '') like $2) ");
}
if (table != null) {
- sql.append("and TABLE_NAME like $3 ");
+ sql.append("and (TABLE_NAME like $3) ");
}
+ switch (connection.catalogDataverseMode) {
+ case CATALOG:
+ break;
+ case CATALOG_SCHEMA:
+ sql.append("and (array_length(dvname) between 1 and 2) ");
+ break;
+ default:
+ throw new IllegalStateException();
+ }
+ if (!connection.catalogIncludesSchemaless) {
+ sql.append("and hasFields ");
+ }
+
sql.append("order by COLUMN_NAME");
return executeQueryImpl(sql.toString(), Arrays.asList(catalog, schema, table));
}
+ ADBResultSet executeGetImportedKeysQuery(String catalog, String schema, String table) throws SQLException {
+ return executeGetImportedExportedKeysQuery("JDBC-GetImportedKeys", null, null, null, catalog, schema, table,
+ false);
+ }
+
+ ADBResultSet executeGetExportedKeysQuery(String catalog, String schema, String table) throws SQLException {
+ return executeGetImportedExportedKeysQuery("JDBC-GetExportedKeys", catalog, schema, table, null, null, null,
+ true);
+ }
+
+ ADBResultSet executeCrossReferenceQuery(String parentCatalog, String parentSchema, String parentTable,
+ String foreignCatalog, String foreignSchema, String foreignTable) throws SQLException {
+ return executeGetImportedExportedKeysQuery("JDBC-CrossReference", parentCatalog, parentSchema, parentTable,
+ foreignCatalog, foreignSchema, foreignTable, true);
+ }
+
+ protected ADBResultSet executeGetImportedExportedKeysQuery(String comment, String pkCatalog, String pkSchema,
+ String pkTable, String fkCatalog, String fkSchema, String fkTable, boolean orderByFk) throws SQLException {
+ StringBuilder sql = new StringBuilder(2048);
+ populateQueryProlog(sql, comment);
+
+ sql.append("select PKTABLE_CAT, PKTABLE_SCHEM, PKTABLE_NAME, PKCOLUMN_NAME, ");
+ sql.append("FKTABLE_CAT, FKTABLE_SCHEM, FKTABLE_NAME, FKCOLUMN_NAME, KEY_SEQ, ");
+ sql.append(DatabaseMetaData.importedKeyNoAction).append(" UPDATE_RULE, ");
+ sql.append(DatabaseMetaData.importedKeyNoAction).append(" DELETE_RULE, ");
+ sql.append("FK_NAME, PK_NAME, ");
+ sql.append(DatabaseMetaData.importedKeyInitiallyDeferred).append(" DEFERRABILITY ");
+ sql.append("from Metadata.`Dataset` ds ");
+ sql.append("join Metadata.`Datatype` dt ");
+ sql.append("on ds.DatatypeDataverseName = dt.DataverseName and ds.DatatypeName = dt.DatatypeName ");
+ sql.append("unnest coalesce(ds.InternalDetails, ds.ExternalDetails, ds.ViewDetails).ForeignKeys fk at fkpos ");
+ sql.append("join Metadata.`Dataset` ds2 ");
+ sql.append("on fk.RefDataverseName = ds2.DataverseName and fk.RefDatasetName = ds2.DatasetName ");
+ sql.append("unnest fk.ForeignKey fki at fkipos ");
+ sql.append("let ");
+ sql.append("hasFields = array_length(dt.Derived.Record.Fields) > 0, ");
+ switch (connection.catalogDataverseMode) {
+ case CATALOG:
+ sql.append("FKTABLE_CAT = ds.DataverseName, ");
+ sql.append("FKTABLE_SCHEM = null, ");
+ sql.append("PKTABLE_CAT = ds2.DataverseName, ");
+ sql.append("PKTABLE_SCHEM = null, ");
+ break;
+ case CATALOG_SCHEMA:
+ sql.append("dvname = decode_dataverse_name(ds.DataverseName), ");
+ sql.append("FKTABLE_CAT = dvname[0], ");
+ sql.append("FKTABLE_SCHEM = case array_length(dvname) when 1 then null else dvname[1] end, ");
+ sql.append("dvname2 = decode_dataverse_name(ds2.DataverseName), ");
+ sql.append("PKTABLE_CAT = dvname2[0], ");
+ sql.append("PKTABLE_SCHEM = case array_length(dvname2) when 1 then null else dvname2[1] end, ");
+ break;
+ default:
+ throw new IllegalStateException();
+ }
+ sql.append("ds2Details = coalesce(ds2.InternalDetails, ds2.ExternalDetails, ds2.ViewDetails), ");
+ sql.append("FKTABLE_NAME = ds.DatasetName, ");
+ sql.append("PKTABLE_NAME = ds2.DatasetName, ");
+ sql.append("FKCOLUMN_NAME = fki[0], ");
+ sql.append("PKCOLUMN_NAME = ds2Details.PrimaryKey[fkipos-1][0], ");
+ sql.append("KEY_SEQ = fkipos, ");
+ sql.append("PK_NAME = PKTABLE_NAME || '").append(PK_NAME_SUFFIX).append("', ");
+ sql.append("FK_NAME = FKTABLE_NAME || '").append(FK_NAME_SUFFIX).append("_' || string(fkpos) ");
+ sql.append("where (every fki2 in fk.ForeignKey satisfies array_length(fki2) = 1 end) ");
+ sql.append("and (every fksi in fk.KeySourceIndicator satisfies fksi = 0 end ) ");
+ sql.append("and (every pki in ds2Details.PrimaryKey satisfies array_length(pki) = 1 end) ");
+ sql.append("and (every pksi in ds2Details.KeySourceIndicator satisfies pksi = 0 end) ");
+
+ if (pkCatalog != null) {
+ sql.append("and (").append("PKTABLE_CAT").append(" = $1) ");
+ }
+ if (pkSchema != null) {
+ sql.append("and (if_null(").append("PKTABLE_SCHEM").append(", '') like $2) ");
+ }
+ if (pkTable != null) {
+ sql.append("and (").append("PKTABLE_NAME").append(" like $3) ");
+ }
+
+ if (fkCatalog != null) {
+ sql.append("and (").append("FKTABLE_CAT").append(" = $4) ");
+ }
+ if (fkSchema != null) {
+ sql.append("and (if_null(").append("FKTABLE_SCHEM").append(", '') like $5) ");
+ }
+ if (fkTable != null) {
+ sql.append("and (").append("FKTABLE_NAME").append(" like $6) ");
+ }
+
+ switch (connection.catalogDataverseMode) {
+ case CATALOG:
+ break;
+ case CATALOG_SCHEMA:
+ sql.append("and (array_length(dvname) between 1 and 2) ");
+ sql.append("and (array_length(dvname2) between 1 and 2) ");
+ break;
+ default:
+ throw new IllegalStateException();
+ }
+ if (!connection.catalogIncludesSchemaless) {
+ sql.append("and hasFields ");
+ }
+
+ sql.append("order by ").append(
+ orderByFk ? "FKTABLE_CAT, FKTABLE_SCHEM, FKTABLE_NAME" : "PKTABLE_CAT, PKTABLE_SCHEM, PKTABLE_NAME")
+ .append(", KEY_SEQ");
+
+ return executeQueryImpl(sql.toString(),
+ Arrays.asList(pkCatalog, pkSchema, pkTable, fkCatalog, fkSchema, fkTable));
+ }
+
ADBResultSet executeGetTableTypesQuery() throws SQLException {
checkClosed();
LinkedHashSet<String> tableTypes = new LinkedHashSet<>();
tableTypes.add(getDatasetTerm(true));
- tableTypes.add(getDatasetTerm(false));
tableTypes.add(getViewTerm(true));
- tableTypes.add(getViewTerm(false));
+ if (connection.catalogIncludesSchemaless) {
+ tableTypes.add(getDatasetTerm(false));
+ tableTypes.add(getViewTerm(false));
+ }
List<ADBColumn> columns = Collections.singletonList(new ADBColumn("TABLE_TYPE", ADBDatatype.STRING, false));
@@ -340,6 +585,14 @@
return null;
}
+ @Override
+ protected ADBProtocol.SubmitStatementOptions createSubmitStatementOptions() {
+ ADBProtocol.SubmitStatementOptions options = super.createSubmitStatementOptions();
+ // Metadata queries are always executed in SQL++ mode
+ options.sqlCompatMode = false;
+ return options;
+ }
+
protected String getDatasetTerm(boolean tabular) {
return tabular ? TABLE : SCHEMALESS + " " + TABLE;
}
diff --git a/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBPreparedStatement.java b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBPreparedStatement.java
index 98129fd..aa88f1c 100644
--- a/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBPreparedStatement.java
+++ b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBPreparedStatement.java
@@ -49,11 +49,12 @@
final List<ADBColumn> resultColumns;
- ADBPreparedStatement(ADBConnection connection, String sql, String catalog, String schema) throws SQLException {
- super(connection, catalog, schema);
- // TODO:timeout
- ADBProtocol.QueryServiceResponse response =
- connection.protocol.submitStatement(sql, null, false, true, 0, catalog, schema);
+ ADBPreparedStatement(ADBConnection connection, String sql) throws SQLException {
+ super(connection);
+ ADBProtocol.SubmitStatementOptions stmtOptions = createSubmitStatementOptions();
+ stmtOptions.compileOnly = true;
+ stmtOptions.timeoutSeconds = 0; /* TODO:timeout */
+ ADBProtocol.QueryServiceResponse response = connection.protocol.submitStatement(sql, null, null, stmtOptions);
int parameterCount = connection.protocol.getStatementParameterCount(response);
boolean isQuery = connection.protocol.isStatementCategory(response,
ADBProtocol.QueryServiceResponse.StatementCategory.QUERY);
@@ -294,7 +295,7 @@
@Override
public void setDate(int parameterIndex, java.sql.Date v, Calendar cal) throws SQLException {
checkClosed();
- setDate(parameterIndex, v);
+ setArg(parameterIndex, cal != null ? new SqlCalendarDate(v, cal.getTimeZone()) : v);
}
@Override
@@ -306,7 +307,7 @@
@Override
public void setTime(int parameterIndex, java.sql.Time v, Calendar cal) throws SQLException {
checkClosed();
- setTime(parameterIndex, v);
+ setArg(parameterIndex, cal != null ? new SqlCalendarTime(v, cal.getTimeZone()) : v);
}
@Override
@@ -318,7 +319,7 @@
@Override
public void setTimestamp(int parameterIndex, java.sql.Timestamp v, Calendar cal) throws SQLException {
checkClosed();
- setTimestamp(parameterIndex, v);
+ setArg(parameterIndex, cal != null ? new SqlCalendarTimestamp(v, cal.getTimeZone()) : v);
}
// Generic (setObject)
diff --git a/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBProtocol.java b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBProtocol.java
index e8a36f5..fed6d6a 100644
--- a/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBProtocol.java
+++ b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBProtocol.java
@@ -36,6 +36,7 @@
import java.util.ListIterator;
import java.util.Map;
import java.util.Objects;
+import java.util.UUID;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -50,10 +51,12 @@
import org.apache.http.client.CredentialsProvider;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpOptions;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.protocol.HttpClientContext;
+import org.apache.http.client.utils.URIBuilder;
import org.apache.http.config.SocketConfig;
import org.apache.http.conn.HttpClientConnectionManager;
import org.apache.http.entity.ContentProducer;
@@ -84,8 +87,9 @@
public class ADBProtocol {
- private static final String QUERY_ENDPOINT_PATH = "/query/service";
+ private static final String QUERY_SERVICE_ENDPOINT_PATH = "/query/service";
private static final String QUERY_RESULT_ENDPOINT_PATH = "/query/service/result";
+ private static final String ACTIVE_REQUESTS_ENDPOINT_PATH = "/admin/requests/running";
private static final String STATEMENT = "statement";
private static final String ARGS = "args";
@@ -98,6 +102,8 @@
private static final String CLIENT_TYPE = "client-type";
private static final String PLAN_FORMAT = "plan-format";
private static final String MAX_WARNINGS = "max-warnings";
+ private static final String SQL_COMPAT = "sql-compat";
+ private static final String CLIENT_CONTEXT_ID = "client_context_id";
private static final String MODE_DEFERRED = "deferred";
private static final String CLIENT_TYPE_JDBC = "jdbc";
@@ -116,20 +122,23 @@
final CloseableHttpClient httpClient;
final URI queryEndpoint;
final URI queryResultEndpoint;
+ final URI activeRequestsEndpoint;
final String user;
final int maxWarnings;
protected ADBProtocol(String host, int port, Map<ADBDriverProperty, Object> params, ADBDriverContext driverContext)
throws SQLException {
- URI queryEndpoint = createEndpointUri(host, port, QUERY_ENDPOINT_PATH, driverContext.errorReporter);
+ URI queryEndpoint = createEndpointUri(host, port, getQueryServiceEndpointPath(), driverContext.errorReporter);
URI queryResultEndpoint =
- createEndpointUri(host, port, QUERY_RESULT_ENDPOINT_PATH, driverContext.errorReporter);
+ createEndpointUri(host, port, getQueryResultEndpointPath(), driverContext.errorReporter);
+ URI activeRequestsEndpoint =
+ createEndpointUri(host, port, getActiveRequestsEndpointPath(), driverContext.errorReporter);
PoolingHttpClientConnectionManager httpConnectionManager = new PoolingHttpClientConnectionManager();
int maxConnections = Math.max(16, Runtime.getRuntime().availableProcessors());
httpConnectionManager.setDefaultMaxPerRoute(maxConnections);
httpConnectionManager.setMaxTotal(maxConnections);
SocketConfig.Builder socketConfigBuilder = null;
- Number socketTimeoutMillis = (Number) params.get(ADBDriverProperty.Common.SOCKET_TIMEOUT);
+ Number socketTimeoutMillis = (Number) ADBDriverProperty.Common.SOCKET_TIMEOUT.fetchPropertyValue(params);
if (socketTimeoutMillis != null) {
socketConfigBuilder = SocketConfig.custom();
socketConfigBuilder.setSoTimeout(socketTimeoutMillis.intValue());
@@ -138,7 +147,7 @@
httpConnectionManager.setDefaultSocketConfig(socketConfigBuilder.build());
}
RequestConfig.Builder requestConfigBuilder = RequestConfig.custom();
- Number connectTimeoutMillis = (Number) params.get(ADBDriverProperty.Common.CONNECT_TIMEOUT);
+ Number connectTimeoutMillis = (Number) ADBDriverProperty.Common.CONNECT_TIMEOUT.fetchPropertyValue(params);
if (connectTimeoutMillis != null) {
requestConfigBuilder.setConnectionRequestTimeout(connectTimeoutMillis.intValue());
requestConfigBuilder.setConnectTimeout(connectTimeoutMillis.intValue());
@@ -152,18 +161,17 @@
httpClientBuilder.setConnectionManager(httpConnectionManager);
httpClientBuilder.setConnectionManagerShared(true);
httpClientBuilder.setDefaultRequestConfig(requestConfig);
- String user = (String) params.get(ADBDriverProperty.Common.USER);
+ String user = (String) ADBDriverProperty.Common.USER.fetchPropertyValue(params);
if (user != null) {
- String password = (String) params.get(ADBDriverProperty.Common.PASSWORD);
+ String password = (String) ADBDriverProperty.Common.PASSWORD.fetchPropertyValue(params);
httpClientBuilder.setDefaultCredentialsProvider(createCredentialsProvider(user, password));
}
-
- Number maxWarnings = ((Number) params.getOrDefault(ADBDriverProperty.Common.MAX_WARNINGS,
- ADBDriverProperty.Common.MAX_WARNINGS.getDefaultValue()));
+ Number maxWarnings = (Number) ADBDriverProperty.Common.MAX_WARNINGS.fetchPropertyValue(params);
this.user = user;
this.queryEndpoint = queryEndpoint;
this.queryResultEndpoint = queryResultEndpoint;
+ this.activeRequestsEndpoint = activeRequestsEndpoint;
this.httpConnectionManager = httpConnectionManager;
this.httpClient = httpClientBuilder.build();
this.httpClientContext = createHttpClientContext(queryEndpoint);
@@ -239,8 +247,8 @@
}
}
- QueryServiceResponse submitStatement(String sql, List<?> args, boolean forceReadOnly, boolean compileOnly,
- int timeoutSeconds, String catalog, String schema) throws SQLException {
+ QueryServiceResponse submitStatement(String sql, List<?> args, UUID executionId, SubmitStatementOptions options)
+ throws SQLException {
HttpPost httpPost = new HttpPost(queryEndpoint);
httpPost.setHeader(HttpHeaders.ACCEPT, ContentType.APPLICATION_JSON
.withParameters(new BasicNameValuePair(FORMAT_LOSSLESS_ADM, Boolean.TRUE.toString())).toString());
@@ -255,17 +263,23 @@
jsonGen.writeBooleanField(SIGNATURE, true);
jsonGen.writeStringField(PLAN_FORMAT, PLAN_FORMAT_STRING);
jsonGen.writeNumberField(MAX_WARNINGS, maxWarnings);
- if (compileOnly) {
+ if (options.compileOnly) {
jsonGen.writeBooleanField(COMPILE_ONLY, true);
}
- if (forceReadOnly) {
+ if (options.forceReadOnly) {
jsonGen.writeBooleanField(READ_ONLY, true);
}
- if (timeoutSeconds > 0) {
- jsonGen.writeStringField(TIMEOUT, timeoutSeconds + "s");
+ if (options.sqlCompatMode) {
+ jsonGen.writeBooleanField(SQL_COMPAT, true);
}
- if (catalog != null) {
- jsonGen.writeStringField(DATAVERSE, schema != null ? catalog + "/" + schema : catalog);
+ if (options.timeoutSeconds > 0) {
+ jsonGen.writeStringField(TIMEOUT, options.timeoutSeconds + "s");
+ }
+ if (options.dataverseName != null) {
+ jsonGen.writeStringField(DATAVERSE, options.dataverseName);
+ }
+ if (executionId != null) {
+ jsonGen.writeStringField(CLIENT_CONTEXT_ID, executionId.toString());
}
if (args != null && !args.isEmpty()) {
jsonGen.writeFieldName(ARGS);
@@ -279,11 +293,9 @@
throw getErrorReporter().errorInRequestGeneration(e);
}
- System.err.printf("<ADB_DRIVER_SQL>%n%s%n</ADB_DRIVER_SQL>%n", sql);
-
if (getLogger().isLoggable(Level.FINE)) {
- getLogger().log(Level.FINE, String.format("%s { %s } with args { %s }", compileOnly ? "compile" : "execute",
- sql, args != null ? args : ""));
+ getLogger().log(Level.FINE, String.format("%s { %s } with args { %s }",
+ options.compileOnly ? "compile" : "execute", sql, args != null ? args : ""));
}
httpPost.setEntity(new EntityTemplateImpl(baos, ContentType.APPLICATION_JSON));
@@ -296,6 +308,21 @@
}
}
+ public static class SubmitStatementOptions {
+ public String dataverseName;
+ public int timeoutSeconds;
+ public boolean forceReadOnly;
+ public boolean compileOnly;
+ public boolean sqlCompatMode;
+
+ private SubmitStatementOptions() {
+ }
+ }
+
+ SubmitStatementOptions createSubmitStatementOptions() {
+ return new SubmitStatementOptions();
+ }
+
private QueryServiceResponse handlePostQueryResponse(CloseableHttpResponse httpResponse)
throws SQLException, IOException {
int httpStatus = httpResponse.getStatusLine().getStatusCode();
@@ -419,6 +446,33 @@
}
}
+ void cancelStatementExecution(UUID executionId) throws SQLException {
+ HttpDelete httpDelete;
+ try {
+ URIBuilder uriBuilder = new URIBuilder(activeRequestsEndpoint);
+ uriBuilder.setParameter(CLIENT_CONTEXT_ID, String.valueOf(executionId));
+ httpDelete = new HttpDelete(uriBuilder.build());
+ } catch (URISyntaxException e) {
+ throw getErrorReporter().errorInRequestURIGeneration(e);
+ }
+
+ try (CloseableHttpResponse httpResponse = httpClient.execute(httpDelete, httpClientContext)) {
+ int httpStatus = httpResponse.getStatusLine().getStatusCode();
+ switch (httpStatus) {
+ case HttpStatus.SC_OK:
+ case HttpStatus.SC_NOT_FOUND:
+ break;
+ case HttpStatus.SC_UNAUTHORIZED:
+ case HttpStatus.SC_FORBIDDEN:
+ throw getErrorReporter().errorAuth();
+ default:
+ throw getErrorReporter().errorInProtocol(httpResponse.getStatusLine().toString());
+ }
+ } catch (IOException e) {
+ throw getErrorReporter().errorInConnection(e);
+ }
+ }
+
private HttpClientContext createHttpClientContext(URI uri) {
HttpClientContext hcCtx = HttpClientContext.create();
AuthCache ac = new BasicAuthCache();
@@ -542,6 +596,18 @@
return om;
}
+ protected String getQueryServiceEndpointPath() {
+ return QUERY_SERVICE_ENDPOINT_PATH;
+ }
+
+ protected String getQueryResultEndpointPath() {
+ return QUERY_RESULT_ENDPOINT_PATH;
+ }
+
+ protected String getActiveRequestsEndpointPath() {
+ return ACTIVE_REQUESTS_ENDPOINT_PATH;
+ }
+
private static void closeQuietly(Exception mainExc, java.io.Closeable... closeableList) {
for (Closeable closeable : closeableList) {
if (closeable != null) {
diff --git a/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBRowStore.java b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBRowStore.java
index da9c5bd..800aeee 100644
--- a/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBRowStore.java
+++ b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBRowStore.java
@@ -35,8 +35,10 @@
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDate;
+import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.Period;
+import java.time.ZoneId;
import java.time.format.DateTimeParseException;
import java.util.Arrays;
import java.util.Calendar;
@@ -45,6 +47,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.TimeZone;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@@ -76,12 +79,16 @@
static final List<Class<?>> GET_OBJECT_NON_ATOMIC = Arrays.asList(Collection.class, List.class, Map.class);
+ private static final ZoneId TZ_UTC = ZoneId.of("UTC");
+
private final ADBResultSet resultSet;
private final ADBDatatype[] columnTypes;
private final Object[] objectStore;
private final long[] registerStore; // 2 registers per column
+ private final TimeZone tzSystem = TimeZone.getDefault();
+
private int parsedLength;
private long currentDateChronon;
private JsonGenerator jsonGen;
@@ -591,16 +598,15 @@
}
Date getDate(int columnIndex, Calendar cal) throws SQLException {
- // TODO:cal is not used
ADBDatatype valueType = getColumnType(columnIndex);
switch (valueType) {
case MISSING:
case NULL:
return null;
case DATE:
- return toDateFromDateChronon(getColumnRegister(columnIndex, 0));
+ return toDateFromDateChronon(getColumnRegister(columnIndex, 0), getTimeZone(cal, tzSystem));
case DATETIME:
- return toDateFromDatetimeChronon(getColumnRegister(columnIndex, 0));
+ return toDateFromDatetimeChronon(getColumnRegister(columnIndex, 0), getTimeZone(cal, tzSystem));
case STRING:
try {
LocalDate d = LocalDate.parse(getStringFromObjectStore(columnIndex)); // TODO:review
@@ -639,20 +645,20 @@
}
Time getTime(int columnIndex, Calendar cal) throws SQLException {
- // TODO:cal not used
ADBDatatype valueType = getColumnType(columnIndex);
switch (valueType) {
case MISSING:
case NULL:
return null;
case TIME:
- return toTimeFromTimeChronon(getColumnRegister(columnIndex, 0));
+ return toTimeFromTimeChronon(getColumnRegister(columnIndex, 0), getTimeZone(cal, tzSystem));
case DATETIME:
- return toTimeFromDatetimeChronon(getColumnRegister(columnIndex, 0));
+ return toTimeFromDatetimeChronon(getColumnRegister(columnIndex, 0), getTimeZone(cal, tzSystem));
case STRING:
try {
LocalTime t = LocalTime.parse(getStringFromObjectStore(columnIndex)); // TODO:review
- return toTimeFromTimeChronon(TimeUnit.NANOSECONDS.toMillis(t.toNanoOfDay()));
+ return toTimeFromTimeChronon(TimeUnit.NANOSECONDS.toMillis(t.toNanoOfDay()),
+ getTimeZone(cal, tzSystem));
} catch (DateTimeParseException e) {
throw getErrorReporter().errorInvalidValueOfType(valueType);
}
@@ -687,16 +693,15 @@
}
Timestamp getTimestamp(int columnIndex, Calendar cal) throws SQLException {
- //TODO:FIXME:CAL NOT USED
ADBDatatype valueType = getColumnType(columnIndex);
switch (valueType) {
case MISSING:
case NULL:
return null;
case DATE:
- return toTimestampFromDateChronon(getColumnRegister(columnIndex, 0));
+ return toTimestampFromDateChronon(getColumnRegister(columnIndex, 0), getTimeZone(cal, tzSystem));
case DATETIME:
- return toTimestampFromDatetimeChronon(getColumnRegister(columnIndex, 0));
+ return toTimestampFromDatetimeChronon(getColumnRegister(columnIndex, 0), getTimeZone(cal, tzSystem));
case STRING:
try {
Instant i = Instant.parse(getStringFromObjectStore(columnIndex));
@@ -711,19 +716,19 @@
}
}
- Instant getInstant(int columnIndex) throws SQLException {
+ LocalDateTime getLocalDateTime(int columnIndex) throws SQLException {
ADBDatatype valueType = getColumnType(columnIndex);
switch (valueType) {
case MISSING:
case NULL:
return null;
case DATE:
- return toInstantFromDateChronon(getColumnRegister(columnIndex, 0));
+ return toLocalDateTimeFromDateChronon(getColumnRegister(columnIndex, 0));
case DATETIME:
- return toInstantFromDatetimeChronon(getColumnRegister(columnIndex, 0));
+ return toLocalDateTimeFromDatetimeChronon(getColumnRegister(columnIndex, 0));
case STRING:
try {
- return Instant.parse(getStringFromObjectStore(columnIndex)); // TODO:review
+ return LocalDateTime.parse(getStringFromObjectStore(columnIndex)); // TODO:review
} catch (DateTimeParseException e) {
throw getErrorReporter().errorInvalidValueOfType(valueType);
}
@@ -822,7 +827,7 @@
case TIME:
return toLocalTimeFromTimeChronon(getColumnRegister(columnIndex, 0)).toString(); // TODO:review
case DATETIME:
- return toInstantFromDatetimeChronon(getColumnRegister(columnIndex, 0)).toString(); // TODO:review
+ return toLocalDateTimeFromDatetimeChronon(getColumnRegister(columnIndex, 0)).toString(); // TODO:review
case YEARMONTHDURATION:
return getPeriodFromObjectStore(columnIndex).toString(); // TODO:review
case DAYTIMEDURATION:
@@ -883,11 +888,11 @@
case DOUBLE:
return getNumberFromObjectStore(columnIndex);
case DATE:
- return toDateFromDateChronon(getColumnRegister(columnIndex, 0));
+ return toDateFromDateChronon(getColumnRegister(columnIndex, 0), tzSystem);
case TIME:
- return toTimeFromTimeChronon(getColumnRegister(columnIndex, 0));
+ return toTimeFromTimeChronon(getColumnRegister(columnIndex, 0), tzSystem);
case DATETIME:
- return toTimestampFromDatetimeChronon(getColumnRegister(columnIndex, 0));
+ return toTimestampFromDatetimeChronon(getColumnRegister(columnIndex, 0), tzSystem);
case YEARMONTHDURATION:
return getPeriodFromObjectStore(columnIndex);
case DAYTIMEDURATION:
@@ -952,7 +957,7 @@
map.put(Time.class, ADBRowStore::getTime);
map.put(LocalTime.class, ADBRowStore::getLocalTime);
map.put(Timestamp.class, ADBRowStore::getTimestamp);
- map.put(Instant.class, ADBRowStore::getInstant);
+ map.put(LocalDateTime.class, ADBRowStore::getLocalDateTime);
map.put(Period.class, ADBRowStore::getPeriod);
map.put(Duration.class, ADBRowStore::getDuration);
map.put(UUID.class, ADBRowStore::getUUID);
@@ -960,12 +965,12 @@
return map;
}
- private Date toDateFromDateChronon(long dateChrononInDays) {
- return new Date(TimeUnit.DAYS.toMillis(dateChrononInDays));
+ private Date toDateFromDateChronon(long dateChrononInDays, TimeZone tz) {
+ return new Date(getDatetimeChrononAdjusted(TimeUnit.DAYS.toMillis(dateChrononInDays), tz));
}
- private Date toDateFromDatetimeChronon(long datetimeChrononInMillis) {
- return new Date(datetimeChrononInMillis);
+ private Date toDateFromDatetimeChronon(long datetimeChrononInMillis, TimeZone tz) {
+ return new Date(getDatetimeChrononAdjusted(datetimeChrononInMillis, tz));
}
private LocalDate toLocalDateFromDateChronon(long dateChrononInDays) {
@@ -976,13 +981,13 @@
return LocalDate.ofEpochDay(TimeUnit.MILLISECONDS.toDays(datetimeChrononInMillis));
}
- private Time toTimeFromTimeChronon(long timeChrononInMillis) {
+ private Time toTimeFromTimeChronon(long timeChrononInMillis, TimeZone tz) {
long datetimeChrononInMillis = getCurrentDateChrononInMillis() + timeChrononInMillis;
- return new Time(datetimeChrononInMillis);
+ return new Time(getDatetimeChrononAdjusted(datetimeChrononInMillis, tz));
}
- private Time toTimeFromDatetimeChronon(long datetimeChrononInMillis) {
- return new Time(datetimeChrononInMillis);
+ private Time toTimeFromDatetimeChronon(long datetimeChrononInMillis, TimeZone tz) {
+ return new Time(getDatetimeChrononAdjusted(datetimeChrononInMillis, tz));
}
private LocalTime toLocalTimeFromTimeChronon(long timeChrononInMillis) {
@@ -993,30 +998,38 @@
return LocalTime.ofNanoOfDay(TimeUnit.MILLISECONDS.toNanos(datetimeChrononInMillis));
}
- private Timestamp toTimestampFromDatetimeChronon(long datetimeChrononInMillis) {
- return new Timestamp(datetimeChrononInMillis);
+ private Timestamp toTimestampFromDatetimeChronon(long datetimeChrononInMillis, TimeZone tz) {
+ return new Timestamp(getDatetimeChrononAdjusted(datetimeChrononInMillis, tz));
}
- private Timestamp toTimestampFromDateChronon(long dateChrononInDays) {
- return new Timestamp(TimeUnit.DAYS.toMillis(dateChrononInDays));
+ private Timestamp toTimestampFromDateChronon(long dateChrononInDays, TimeZone tz) {
+ return new Timestamp(getDatetimeChrononAdjusted(TimeUnit.DAYS.toMillis(dateChrononInDays), tz));
}
- private Instant toInstantFromDatetimeChronon(long datetimeChrononInMillis) {
- return Instant.ofEpochMilli(datetimeChrononInMillis);
+ private LocalDateTime toLocalDateTimeFromDatetimeChronon(long datetimeChrononInMillis) {
+ return LocalDateTime.ofInstant(Instant.ofEpochMilli(datetimeChrononInMillis), TZ_UTC);
}
- private Instant toInstantFromDateChronon(long dateChrononInDays) {
- return Instant.ofEpochMilli(TimeUnit.DAYS.toMillis(dateChrononInDays));
+ private LocalDateTime toLocalDateTimeFromDateChronon(long dateChrononInDays) {
+ return LocalDate.ofEpochDay(dateChrononInDays).atStartOfDay();
+ }
+
+ private long getDatetimeChrononAdjusted(long datetimeChrononInMillis, TimeZone tz) {
+ int tzOffset = tz.getOffset(datetimeChrononInMillis);
+ return datetimeChrononInMillis - tzOffset;
}
private long getCurrentDateChrononInMillis() {
if (currentDateChronon == 0) {
- long chrononOfDay = TimeUnit.DAYS.toMillis(1);
- currentDateChronon = System.currentTimeMillis() / chrononOfDay * chrononOfDay;
+ currentDateChronon = TimeUnit.DAYS.toMillis(TimeUnit.MILLISECONDS.toDays(System.currentTimeMillis()));
}
return currentDateChronon;
}
+ private TimeZone getTimeZone(Calendar cal, TimeZone tzDefault) {
+ return cal != null ? cal.getTimeZone() : tzDefault;
+ }
+
private String printAsJson(Object value) throws SQLException {
if (jsonGenBuffer == null) {
jsonGenBuffer = new StringWriter();
diff --git a/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBStatement.java b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBStatement.java
index 94ac5a3..61072e8 100644
--- a/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBStatement.java
+++ b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBStatement.java
@@ -31,10 +31,11 @@
import java.sql.Time;
import java.sql.Timestamp;
import java.time.Duration;
-import java.time.Instant;
import java.time.LocalDate;
+import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.Period;
+import java.time.ZoneId;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
@@ -43,6 +44,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
+import java.util.TimeZone;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
@@ -63,19 +65,21 @@
class ADBStatement extends ADBWrapperSupport implements java.sql.Statement {
+ static final List<Class<?>> SET_OBJECT_ATOMIC_EXTRA =
+ Arrays.asList(SqlCalendarDate.class, SqlCalendarTime.class, SqlCalendarTimestamp.class);
+
static final List<Class<?>> SET_OBJECT_NON_ATOMIC = Arrays.asList(Object[].class, Collection.class, Map.class);
static final Map<Class<?>, AbstractValueSerializer> SERIALIZER_MAP = createSerializerMap();
protected final ADBConnection connection;
- protected final String catalog;
- protected final String schema;
protected final AtomicBoolean closed = new AtomicBoolean(false);
protected volatile boolean closeOnCompletion;
protected int queryTimeoutSeconds;
protected long maxRows;
+ private volatile UUID executionId;
// common result fields
protected int updateCount = -1;
@@ -91,12 +95,11 @@
// Lifecycle
- ADBStatement(ADBConnection connection, String catalog, String schema) {
+ ADBStatement(ADBConnection connection) {
this.connection = Objects.requireNonNull(connection);
- this.catalog = catalog;
- this.schema = schema;
this.resultSetsWithResources = new ConcurrentLinkedQueue<>();
this.resultSetsWithoutResources = new ConcurrentLinkedQueue<>();
+ resetExecutionId();
}
@Override
@@ -153,16 +156,22 @@
protected ADBResultSet executeQueryImpl(String sql, List<?> args) throws SQLException {
// note: we're not assigning executeResponse field at this method
- ADBProtocol.QueryServiceResponse response =
- connection.protocol.submitStatement(sql, args, true, false, queryTimeoutSeconds, catalog, schema);
- boolean isQuery = connection.protocol.isStatementCategory(response,
- ADBProtocol.QueryServiceResponse.StatementCategory.QUERY);
- if (!isQuery) {
- throw getErrorReporter().errorInvalidStatementCategory();
+ try {
+ ADBProtocol.SubmitStatementOptions stmtOptions = createSubmitStatementOptions();
+ stmtOptions.forceReadOnly = true;
+ ADBProtocol.QueryServiceResponse response =
+ connection.protocol.submitStatement(sql, args, executionId, stmtOptions);
+ boolean isQuery = connection.protocol.isStatementCategory(response,
+ ADBProtocol.QueryServiceResponse.StatementCategory.QUERY);
+ if (!isQuery) {
+ throw getErrorReporter().errorInvalidStatementCategory();
+ }
+ warnings = connection.protocol.getWarningIfExists(response);
+ updateCount = -1;
+ return fetchResultSet(response);
+ } finally {
+ resetExecutionId();
}
- warnings = connection.protocol.getWarningIfExists(response);
- updateCount = -1;
- return fetchResultSet(response);
}
@Override
@@ -208,17 +217,22 @@
}
protected int executeUpdateImpl(String sql, List<Object> args) throws SQLException {
- ADBProtocol.QueryServiceResponse response =
- connection.protocol.submitStatement(sql, args, false, false, queryTimeoutSeconds, catalog, schema);
- boolean isQuery = connection.protocol.isStatementCategory(response,
- ADBProtocol.QueryServiceResponse.StatementCategory.QUERY);
- // TODO: remove result set on the server (both query and update returning cases)
- if (isQuery) {
- throw getErrorReporter().errorInvalidStatementCategory();
+ try {
+ ADBProtocol.SubmitStatementOptions stmtOptions = createSubmitStatementOptions();
+ ADBProtocol.QueryServiceResponse response =
+ connection.protocol.submitStatement(sql, args, executionId, stmtOptions);
+ boolean isQuery = connection.protocol.isStatementCategory(response,
+ ADBProtocol.QueryServiceResponse.StatementCategory.QUERY);
+ // TODO: remove result set on the server (both query and update returning cases)
+ if (isQuery) {
+ throw getErrorReporter().errorInvalidStatementCategory();
+ }
+ warnings = connection.protocol.getWarningIfExists(response);
+ updateCount = connection.protocol.getUpdateCount(response);
+ return updateCount;
+ } finally {
+ resetExecutionId();
}
- warnings = connection.protocol.getWarningIfExists(response);
- updateCount = connection.protocol.getUpdateCount(response);
- return updateCount;
}
@Override
@@ -243,25 +257,31 @@
}
protected boolean executeImpl(String sql, List<Object> args) throws SQLException {
- ADBProtocol.QueryServiceResponse response =
- connection.protocol.submitStatement(sql, args, false, false, queryTimeoutSeconds, catalog, schema);
- warnings = connection.protocol.getWarningIfExists(response);
- boolean isQuery = connection.protocol.isStatementCategory(response,
- ADBProtocol.QueryServiceResponse.StatementCategory.QUERY);
- if (isQuery) {
- updateCount = -1;
- executeResponse = response;
- return true;
- } else {
- updateCount = connection.protocol.getUpdateCount(response);
- executeResponse = null;
- return false;
+ try {
+ ADBProtocol.SubmitStatementOptions stmtOptions = createSubmitStatementOptions();
+ ADBProtocol.QueryServiceResponse response =
+ connection.protocol.submitStatement(sql, args, executionId, stmtOptions);
+ warnings = connection.protocol.getWarningIfExists(response);
+ boolean isQuery = connection.protocol.isStatementCategory(response,
+ ADBProtocol.QueryServiceResponse.StatementCategory.QUERY);
+ if (isQuery) {
+ updateCount = -1;
+ executeResponse = response;
+ return true;
+ } else {
+ updateCount = connection.protocol.getUpdateCount(response);
+ executeResponse = null;
+ return false;
+ }
+ } finally {
+ resetExecutionId();
}
}
@Override
public void cancel() throws SQLException {
- throw getErrorReporter().errorMethodNotSupported(Statement.class, "cancel");
+ checkClosed();
+ connection.protocol.cancelStatementExecution(executionId);
}
@Override
@@ -284,6 +304,18 @@
checkClosed();
}
+ private void resetExecutionId() {
+ executionId = UUID.randomUUID();
+ }
+
+ protected ADBProtocol.SubmitStatementOptions createSubmitStatementOptions() {
+ ADBProtocol.SubmitStatementOptions stmtOptions = connection.protocol.createSubmitStatementOptions();
+ stmtOptions.dataverseName = connection.getDataverseCanonicalName();
+ stmtOptions.sqlCompatMode = connection.sqlCompatMode;
+ stmtOptions.timeoutSeconds = queryTimeoutSeconds;
+ return stmtOptions;
+ }
+
// Batch execution
@Override
@@ -635,7 +667,7 @@
}
static boolean isSetObjectCompatible(Class<?> cls) {
- if (ADBRowStore.OBJECT_ACCESSORS_ATOMIC.containsKey(cls)) {
+ if (ADBRowStore.OBJECT_ACCESSORS_ATOMIC.containsKey(cls) || SET_OBJECT_ATOMIC_EXTRA.contains(cls)) {
return true;
}
for (Class<?> aClass : SET_OBJECT_NON_ATOMIC) {
@@ -657,11 +689,14 @@
registerSerializer(serializerMap, createDoubleSerializer());
registerSerializer(serializerMap, createStringSerializer());
registerSerializer(serializerMap, createSqlDateSerializer());
+ registerSerializer(serializerMap, createSqlDateWithCalendarSerializer());
registerSerializer(serializerMap, createLocalDateSerializer());
registerSerializer(serializerMap, createSqlTimeSerializer());
+ registerSerializer(serializerMap, createSqlCalendarTimeSerializer());
registerSerializer(serializerMap, createLocalTimeSerializer());
registerSerializer(serializerMap, createSqlTimestampSerializer());
- registerSerializer(serializerMap, createInstantSerializer());
+ registerSerializer(serializerMap, createSqlCalendarTimestampSerializer());
+ registerSerializer(serializerMap, createLocalDateTimeSerializer());
registerSerializer(serializerMap, createPeriodSerializer());
registerSerializer(serializerMap, createDurationSerializer());
return serializerMap;
@@ -720,7 +755,22 @@
@Override
protected void serializeNonTaggedValue(Object value, StringBuilder out) {
long millis = ((Date) value).getTime();
- out.append(millis);
+ long millisAdjusted = getDatetimeChrononAdjusted(millis, TimeZone.getDefault());
+ long days = TimeUnit.MILLISECONDS.toDays(millisAdjusted);
+ out.append(days);
+ }
+ };
+ }
+
+ private static ATaggedValueSerializer createSqlDateWithCalendarSerializer() {
+ return new ATaggedValueSerializer(SqlCalendarDate.class, ADBDatatype.DATE) {
+ @Override
+ protected void serializeNonTaggedValue(Object value, StringBuilder out) {
+ SqlCalendarDate dateWithCalendar = (SqlCalendarDate) value;
+ long millis = dateWithCalendar.date.getTime();
+ long millisAdjusted = getDatetimeChrononAdjusted(millis, dateWithCalendar.timeZone);
+ long days = TimeUnit.MILLISECONDS.toDays(millisAdjusted);
+ out.append(days);
}
};
}
@@ -729,8 +779,8 @@
return new ATaggedValueSerializer(java.time.LocalDate.class, ADBDatatype.DATE) {
@Override
protected void serializeNonTaggedValue(Object value, StringBuilder out) {
- long millis = TimeUnit.DAYS.toMillis(((LocalDate) value).toEpochDay());
- out.append(millis);
+ long days = ((LocalDate) value).toEpochDay();
+ out.append(days);
}
};
}
@@ -740,7 +790,22 @@
@Override
protected void serializeNonTaggedValue(Object value, StringBuilder out) {
long millis = ((Time) value).getTime();
- out.append(millis);
+ long millisAdjusted = getDatetimeChrononAdjusted(millis, TimeZone.getDefault());
+ long timeMillis = millisAdjusted - TimeUnit.DAYS.toMillis(TimeUnit.MILLISECONDS.toDays(millisAdjusted));
+ out.append(timeMillis);
+ }
+ };
+ }
+
+ private static ATaggedValueSerializer createSqlCalendarTimeSerializer() {
+ return new ATaggedValueSerializer(SqlCalendarTime.class, ADBDatatype.TIME) {
+ @Override
+ protected void serializeNonTaggedValue(Object value, StringBuilder out) {
+ SqlCalendarTime timeWithCalendar = (SqlCalendarTime) value;
+ long millis = timeWithCalendar.time.getTime();
+ long millisAdjusted = getDatetimeChrononAdjusted(millis, timeWithCalendar.timeZone);
+ long timeMillis = millisAdjusted - TimeUnit.DAYS.toMillis(TimeUnit.MILLISECONDS.toDays(millisAdjusted));
+ out.append(timeMillis);
}
};
}
@@ -749,8 +814,9 @@
return new ATaggedValueSerializer(java.time.LocalTime.class, ADBDatatype.TIME) {
@Override
protected void serializeNonTaggedValue(Object value, StringBuilder out) {
- long millis = TimeUnit.NANOSECONDS.toMillis(((LocalTime) value).toNanoOfDay());
- out.append(millis);
+ long nanos = ((LocalTime) value).toNanoOfDay();
+ long timeMillis = TimeUnit.NANOSECONDS.toMillis(nanos);
+ out.append(timeMillis);
}
};
}
@@ -760,16 +826,29 @@
@Override
protected void serializeNonTaggedValue(Object value, StringBuilder out) {
long millis = ((Timestamp) value).getTime();
- out.append(millis);
+ long millisAdjusted = getDatetimeChrononAdjusted(millis, TimeZone.getDefault());
+ out.append(millisAdjusted);
}
};
}
- private static ATaggedValueSerializer createInstantSerializer() {
- return new ATaggedValueSerializer(java.time.Instant.class, ADBDatatype.DATETIME) {
+ private static ATaggedValueSerializer createSqlCalendarTimestampSerializer() {
+ return new ATaggedValueSerializer(SqlCalendarTimestamp.class, ADBDatatype.DATETIME) {
@Override
protected void serializeNonTaggedValue(Object value, StringBuilder out) {
- long millis = ((Instant) value).toEpochMilli();
+ SqlCalendarTimestamp timestampWithCalendar = (SqlCalendarTimestamp) value;
+ long millis = timestampWithCalendar.timestamp.getTime();
+ long millisAdjusted = getDatetimeChrononAdjusted(millis, timestampWithCalendar.timeZone);
+ out.append(millisAdjusted);
+ }
+ };
+ }
+
+ private static ATaggedValueSerializer createLocalDateTimeSerializer() {
+ return new ATaggedValueSerializer(java.time.LocalDateTime.class, ADBDatatype.DATETIME) {
+ @Override
+ protected void serializeNonTaggedValue(Object value, StringBuilder out) {
+ long millis = ((LocalDateTime) value).atZone(TZ_UTC).toInstant().toEpochMilli();
out.append(millis);
}
};
@@ -812,6 +891,8 @@
private static abstract class ATaggedValueSerializer extends AbstractValueSerializer {
+ protected static ZoneId TZ_UTC = ZoneId.of("UTC");
+
protected final ADBDatatype adbType;
protected ATaggedValueSerializer(Class<?> javaType, ADBDatatype adbType) {
@@ -842,5 +923,46 @@
private static char hex(int i) {
return (char) (i + (i < 10 ? '0' : ('A' - 10)));
}
+
+ protected long getDatetimeChrononAdjusted(long datetimeChrononInMillis, TimeZone tz) {
+ int tzOffset = tz.getOffset(datetimeChrononInMillis);
+ return datetimeChrononInMillis + tzOffset;
+ }
+ }
+
+ static abstract class AbstractSqlCalendarDateTime {
+ final TimeZone timeZone;
+
+ AbstractSqlCalendarDateTime(TimeZone timeZone) {
+ this.timeZone = timeZone;
+ }
+ }
+
+ static final class SqlCalendarDate extends AbstractSqlCalendarDateTime {
+ final Date date;
+
+ SqlCalendarDate(Date date, TimeZone timeZone) {
+ super(timeZone);
+ this.date = date;
+ }
+ }
+
+ static final class SqlCalendarTime extends AbstractSqlCalendarDateTime {
+ final Time time;
+
+ SqlCalendarTime(Time time, TimeZone timeZone) {
+ super(timeZone);
+ this.time = time;
+ }
+ }
+
+ static final class SqlCalendarTimestamp extends AbstractSqlCalendarDateTime {
+ final Timestamp timestamp;
+
+ SqlCalendarTimestamp(Timestamp timestamp, TimeZone timeZone) {
+ super(timeZone);
+ this.timestamp = timestamp;
+
+ }
}
}