[NO ISSUE][JDBC] Refactor ADBProtocol
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Introduce ADBProtocolBase - base class for ADBProtocol
Change-Id: I6fc2479e278aca3ab7bf415c249674e0708799a5
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb-clients/+/13663
Reviewed-by: Dmitry Lychagin <dmitry.lychagin@couchbase.com>
Reviewed-by: Ian Maxon <imaxon@uci.edu>
Tested-by: Dmitry Lychagin <dmitry.lychagin@couchbase.com>
diff --git a/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBConnection.java b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBConnection.java
index 3a0ae5e..c9d78f7 100644
--- a/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBConnection.java
+++ b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBConnection.java
@@ -48,7 +48,7 @@
public class ADBConnection extends ADBWrapperSupport implements Connection {
- final ADBProtocol protocol;
+ final ADBProtocolBase protocol;
final String url;
final String databaseVersion;
final ADBDriverProperty.CatalogDataverseMode catalogDataverseMode;
@@ -63,7 +63,7 @@
// Lifecycle
- protected ADBConnection(ADBProtocol protocol, String url, String databaseVersion, String dataverseCanonicalName,
+ protected ADBConnection(ADBProtocolBase protocol, String url, String databaseVersion, String dataverseCanonicalName,
Map<ADBDriverProperty, Object> properties, SQLWarning connectWarning) throws SQLException {
this.url = Objects.requireNonNull(url);
this.protocol = Objects.requireNonNull(protocol);
@@ -72,22 +72,22 @@
this.warning = connectWarning;
this.closed = new AtomicBoolean(false);
this.sqlCompatMode = (Boolean) ADBDriverProperty.Common.SQL_COMPAT_MODE.fetchPropertyValue(properties);
- this.catalogDataverseMode = getCatalogDataverseMode(protocol, properties);
+ this.catalogDataverseMode = getCatalogDataverseMode(properties, protocol.getErrorReporter());
this.catalogIncludesSchemaless =
(Boolean) ADBDriverProperty.Common.CATALOG_INCLUDES_SCHEMALESS.fetchPropertyValue(properties);
initCatalogSchema(protocol, dataverseCanonicalName);
}
- private void initCatalogSchema(ADBProtocol protocol, String dataverseCanonicalName) throws SQLException {
+ private void initCatalogSchema(ADBProtocolBase protocol, String dataverseCanonicalName) throws SQLException {
switch (catalogDataverseMode) {
case CATALOG:
catalog = dataverseCanonicalName == null || dataverseCanonicalName.isEmpty()
- ? ADBProtocol.DEFAULT_DATAVERSE : dataverseCanonicalName;
+ ? protocol.getDefaultDataverse() : dataverseCanonicalName;
// schema = null
break;
case CATALOG_SCHEMA:
if (dataverseCanonicalName == null || dataverseCanonicalName.isEmpty()) {
- catalog = ADBProtocol.DEFAULT_DATAVERSE;
+ catalog = protocol.getDefaultDataverse();
// schema = null
} else {
String[] parts = dataverseCanonicalName.split("/");
@@ -377,13 +377,13 @@
}
}
- private static ADBDriverProperty.CatalogDataverseMode getCatalogDataverseMode(ADBProtocol protocol,
- Map<ADBDriverProperty, Object> properties) throws SQLException {
+ private static ADBDriverProperty.CatalogDataverseMode getCatalogDataverseMode(
+ Map<ADBDriverProperty, Object> properties, ADBErrorReporter errorReporter) throws SQLException {
int mode = ((Number) ADBDriverProperty.Common.CATALOG_DATAVERSE_MODE.fetchPropertyValue(properties)).intValue();
try {
return ADBDriverProperty.CatalogDataverseMode.valueOf(mode);
} catch (IllegalArgumentException e) {
- throw protocol.getErrorReporter().errorInConnection(String.valueOf(mode)); //TODO:FIXME
+ throw errorReporter.errorInConnection(String.valueOf(mode)); //TODO:FIXME
}
}
@@ -579,11 +579,11 @@
@Override
protected ADBErrorReporter getErrorReporter() {
- return protocol.driverContext.errorReporter;
+ return protocol.getErrorReporter();
}
protected Logger getLogger() {
- return protocol.driverContext.logger;
+ return protocol.getLogger();
}
// Miscellaneous unsupported features (error is raised)
diff --git a/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBDatabaseMetaData.java b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBDatabaseMetaData.java
index 1d463dc..3e8cfc4 100644
--- a/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBDatabaseMetaData.java
+++ b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBDatabaseMetaData.java
@@ -48,22 +48,22 @@
@Override
public String getDriverName() {
- return metaStatement.connection.protocol.driverContext.driverVersion.productName;
+ return metaStatement.connection.protocol.getDriverContext().getDriverVersion().productName;
}
@Override
public String getDriverVersion() {
- return metaStatement.connection.protocol.driverContext.driverVersion.productVersion;
+ return metaStatement.connection.protocol.getDriverContext().getDriverVersion().productVersion;
}
@Override
public int getDriverMajorVersion() {
- return metaStatement.connection.protocol.driverContext.driverVersion.majorVersion;
+ return metaStatement.connection.protocol.getDriverContext().getDriverVersion().majorVersion;
}
@Override
public int getDriverMinorVersion() {
- return metaStatement.connection.protocol.driverContext.driverVersion.minorVersion;
+ return metaStatement.connection.protocol.getDriverContext().getDriverVersion().minorVersion;
}
@Override
@@ -895,7 +895,7 @@
@Override
public String getUserName() {
- return metaStatement.connection.protocol.user;
+ return metaStatement.connection.protocol.getUser();
}
@Override
diff --git a/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBDriverBase.java b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBDriverBase.java
index 46f9705..904c18b 100644
--- a/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBDriverBase.java
+++ b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBDriverBase.java
@@ -91,9 +91,10 @@
private static void parseConnectionProperty(String name, String textValue, ADBDriverContext driverContext,
Map<ADBDriverProperty, Object> outProperties, SQLWarning outWarning) throws SQLException {
- ADBDriverProperty property = driverContext.supportedProperties.get(name);
+ ADBDriverProperty property = driverContext.getSupportedProperties().get(name);
if (property == null) {
- outWarning.setNextWarning(new SQLWarning(driverContext.errorReporter.warningParameterNotSupported(name)));
+ outWarning.setNextWarning(
+ new SQLWarning(driverContext.getErrorReporter().warningParameterNotSupported(name)));
return;
}
if (textValue == null || textValue.isEmpty()) {
@@ -103,7 +104,7 @@
try {
value = Objects.requireNonNull(property.getValueParser().apply(textValue));
} catch (RuntimeException e) {
- throw driverContext.errorReporter.errorParameterValueNotSupported(name);
+ throw driverContext.getErrorReporter().errorParameterValueNotSupported(name);
}
outProperties.put(property, value);
}
@@ -167,7 +168,7 @@
String dataverseCanonicalName =
path != null && path.length() > 1 && path.startsWith("/") ? path.substring(1) : null;
- ADBProtocol protocol = createProtocol(host, port, properties, driverContext);
+ ADBProtocolBase protocol = createProtocol(host, port, properties, driverContext);
try {
String databaseVersion = protocol.connect();
return createConnection(protocol, url, databaseVersion, dataverseCanonicalName, properties, warning);
@@ -182,7 +183,8 @@
}
public DriverPropertyInfo[] getPropertyInfo(String url, Properties info) {
- Collection<ADBDriverProperty> supportedProperties = getOrCreateDriverContext().supportedProperties.values();
+ Collection<ADBDriverProperty> supportedProperties =
+ getOrCreateDriverContext().getSupportedProperties().values();
List<DriverPropertyInfo> result = new ArrayList<>(supportedProperties.size());
for (ADBDriverProperty property : supportedProperties) {
if (property.isHidden()) {
@@ -197,11 +199,11 @@
}
public int getMajorVersion() {
- return getOrCreateDriverContext().driverVersion.majorVersion;
+ return getOrCreateDriverContext().getDriverVersion().majorVersion;
}
public int getMinorVersion() {
- return getOrCreateDriverContext().driverVersion.minorVersion;
+ return getOrCreateDriverContext().getDriverVersion().minorVersion;
}
public boolean jdbcCompliant() {
@@ -237,12 +239,10 @@
return new ADBErrorReporter();
}
- protected ADBProtocol createProtocol(String host, int port, Map<ADBDriverProperty, Object> properties,
- ADBDriverContext driverContext) throws SQLException {
- return new ADBProtocol(host, port, properties, driverContext);
- }
+ protected abstract ADBProtocolBase createProtocol(String host, int port, Map<ADBDriverProperty, Object> properties,
+ ADBDriverContext driverContext) throws SQLException;
- protected ADBConnection createConnection(ADBProtocol protocol, String url, String databaseVersion,
+ protected ADBConnection createConnection(ADBProtocolBase protocol, String url, String databaseVersion,
String dataverseCanonicalName, Map<ADBDriverProperty, Object> properties, SQLWarning connectWarning)
throws SQLException {
return new ADBConnection(protocol, url, databaseVersion, dataverseCanonicalName, properties, connectWarning);
diff --git a/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBDriverContext.java b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBDriverContext.java
index 4b57f7d..3a7401f 100644
--- a/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBDriverContext.java
+++ b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBDriverContext.java
@@ -26,30 +26,35 @@
import java.util.Objects;
import java.util.logging.Logger;
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.PropertyAccessor;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.MapperFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import com.fasterxml.jackson.databind.ObjectWriter;
+import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.module.SimpleModule;
public class ADBDriverContext {
- final Class<? extends ADBDriverBase> driverClass;
+ private final Class<? extends ADBDriverBase> driverClass;
- final ADBErrorReporter errorReporter;
+ private final ADBErrorReporter errorReporter;
- final ADBProductVersion driverVersion;
+ private final ADBProductVersion driverVersion;
- final Map<String, ADBDriverProperty> supportedProperties;
+ private final Map<String, ADBDriverProperty> supportedProperties;
- final Logger logger;
+ private final Logger logger;
- final ObjectReader genericObjectReader;
+ private final ObjectReader genericObjectReader;
- final ObjectWriter genericObjectWriter;
+ private final ObjectWriter genericObjectWriter;
- final ObjectReader admFormatObjectReader;
+ private final ObjectReader admFormatObjectReader;
- final ObjectWriter admFormatObjectWriter;
+ private final ObjectWriter admFormatObjectWriter;
ADBDriverContext(Class<? extends ADBDriverBase> driverClass,
Collection<ADBDriverProperty> driverSupportedProperties, ADBErrorReporter errorReporter) {
@@ -59,7 +64,7 @@
this.driverVersion = ADBProductVersion.parseDriverVersion(driverClass.getPackage());
this.supportedProperties = createPropertyIndexByName(driverSupportedProperties);
- ObjectMapper genericObjectMapper = ADBProtocol.createObjectMapper();
+ ObjectMapper genericObjectMapper = createGenericObjectMapper();
this.genericObjectReader = genericObjectMapper.reader();
this.genericObjectWriter = genericObjectMapper.writer();
ObjectMapper admFormatObjectMapper = createADMFormatObjectMapper();
@@ -67,11 +72,23 @@
this.admFormatObjectWriter = admFormatObjectMapper.writer();
}
+ protected ObjectMapper createGenericObjectMapper() {
+ ObjectMapper om = new ObjectMapper();
+ om.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.NON_PRIVATE);
+ // serialization
+ om.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
+ // deserialization
+ om.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+ om.configure(DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_AS_NULL, true);
+ om.enable(MapperFeature.ACCEPT_CASE_INSENSITIVE_ENUMS);
+ return om;
+ }
+
protected ObjectMapper createADMFormatObjectMapper() {
ObjectMapper mapper = new ObjectMapper();
SimpleModule serdeModule = new SimpleModule(driverClass.getName());
- ADBStatement.configureSerialization(serdeModule);
- ADBRowStore.configureDeserialization(mapper, serdeModule);
+ ADBStatement.configureADMFormatSerialization(serdeModule);
+ ADBRowStore.configureADMFormatDeserialization(mapper, serdeModule);
mapper.registerModule(serdeModule);
return mapper;
}
@@ -83,4 +100,36 @@
}
return Collections.unmodifiableMap(m);
}
+
+ public ADBErrorReporter getErrorReporter() {
+ return errorReporter;
+ }
+
+ public Logger getLogger() {
+ return logger;
+ }
+
+ public ObjectReader getGenericObjectReader() {
+ return genericObjectReader;
+ }
+
+ public ObjectWriter getGenericObjectWriter() {
+ return genericObjectWriter;
+ }
+
+ public ObjectReader getAdmFormatObjectReader() {
+ return admFormatObjectReader;
+ }
+
+ public ObjectWriter getAdmFormatObjectWriter() {
+ return admFormatObjectWriter;
+ }
+
+ public ADBProductVersion getDriverVersion() {
+ return driverVersion;
+ }
+
+ public Map<String, ADBDriverProperty> getSupportedProperties() {
+ return supportedProperties;
+ }
}
diff --git a/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBMetaStatement.java b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBMetaStatement.java
index bea8c26..0e2f53d 100644
--- a/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBMetaStatement.java
+++ b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBMetaStatement.java
@@ -476,7 +476,8 @@
List<ADBColumn> columns = Collections.singletonList(new ADBColumn("TABLE_TYPE", ADBDatatype.STRING, false));
AbstractValueSerializer stringSer = getADMFormatSerializer(String.class);
- ArrayNode result = (ArrayNode) connection.protocol.driverContext.genericObjectReader.createArrayNode();
+ ArrayNode result =
+ (ArrayNode) connection.protocol.getDriverContext().getGenericObjectReader().createArrayNode();
for (String tableType : tableTypes) {
result.addObject().put("TABLE_TYPE", stringSer.serializeToString(tableType));
}
@@ -511,7 +512,8 @@
columns.add(new ADBColumn("SQL_DATETIME_SUB", ADBDatatype.INTEGER, true));
columns.add(new ADBColumn("NUM_PREC_RADIX", ADBDatatype.INTEGER, true));
- ArrayNode result = (ArrayNode) connection.protocol.driverContext.genericObjectReader.createArrayNode();
+ ArrayNode result =
+ (ArrayNode) connection.protocol.getDriverContext().getGenericObjectReader().createArrayNode();
populateTypeInfo(result.addObject(), ADBDatatype.BOOLEAN, 1, null, null, null, null, null, null, int16Ser,
int32Ser, stringSer);
populateTypeInfo(result.addObject(), ADBDatatype.TINYINT, 3, 10, 0, 0, false, null, null, int16Ser, int32Ser,
@@ -586,8 +588,8 @@
}
@Override
- protected ADBProtocol.SubmitStatementOptions createSubmitStatementOptions() {
- ADBProtocol.SubmitStatementOptions options = super.createSubmitStatementOptions();
+ protected ADBProtocolBase.SubmitStatementOptions createSubmitStatementOptions() {
+ ADBProtocolBase.SubmitStatementOptions options = super.createSubmitStatementOptions();
// Metadata queries are always executed in SQL++ mode
options.sqlCompatMode = false;
return options;
diff --git a/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBPreparedStatement.java b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBPreparedStatement.java
index aa88f1c..c95d462 100644
--- a/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBPreparedStatement.java
+++ b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBPreparedStatement.java
@@ -51,13 +51,14 @@
ADBPreparedStatement(ADBConnection connection, String sql) throws SQLException {
super(connection);
- ADBProtocol.SubmitStatementOptions stmtOptions = createSubmitStatementOptions();
+ ADBProtocolBase.SubmitStatementOptions stmtOptions = createSubmitStatementOptions();
stmtOptions.compileOnly = true;
stmtOptions.timeoutSeconds = 0; /* TODO:timeout */
- ADBProtocol.QueryServiceResponse response = connection.protocol.submitStatement(sql, null, null, stmtOptions);
+ ADBProtocolBase.QueryServiceResponse response =
+ connection.protocol.submitStatement(sql, null, null, stmtOptions);
int parameterCount = connection.protocol.getStatementParameterCount(response);
boolean isQuery = connection.protocol.isStatementCategory(response,
- ADBProtocol.QueryServiceResponse.StatementCategory.QUERY);
+ ADBProtocolBase.QueryServiceResponse.StatementCategory.QUERY);
List<ADBColumn> columns = isQuery ? connection.protocol.getColumns(response) : Collections.emptyList();
this.sql = sql;
this.args = Arrays.asList(new Object[parameterCount]);
diff --git a/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBProtocol.java b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBProtocol.java
index cc31c7f..38afb84 100644
--- a/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBProtocol.java
+++ b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBProtocol.java
@@ -19,21 +19,15 @@
package org.apache.asterix.jdbc.core;
-import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
-import java.io.StringReader;
import java.net.URI;
import java.net.URISyntaxException;
import java.sql.SQLException;
-import java.sql.SQLWarning;
-import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
-import java.util.ListIterator;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
@@ -70,69 +64,36 @@
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.message.BasicNameValuePair;
-import com.fasterxml.jackson.annotation.JsonAutoDetect;
-import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.core.JsonEncoding;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.JsonToken;
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.MapperFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.exc.InvalidDefinitionException;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-public class ADBProtocol {
+public class ADBProtocol extends ADBProtocolBase {
private static final String QUERY_SERVICE_ENDPOINT_PATH = "/query/service";
private static final String QUERY_RESULT_ENDPOINT_PATH = "/query/service/result";
private static final String ACTIVE_REQUESTS_ENDPOINT_PATH = "/admin/requests/running";
- private static final String STATEMENT = "statement";
- private static final String ARGS = "args";
- private static final String MODE = "mode";
- private static final String READ_ONLY = "readonly";
- private static final String DATAVERSE = "dataverse";
- private static final String TIMEOUT = "timeout";
- private static final String SIGNATURE = "signature";
- private static final String COMPILE_ONLY = "compile-only";
- private static final String CLIENT_TYPE = "client-type";
- private static final String PLAN_FORMAT = "plan-format";
- private static final String MAX_WARNINGS = "max-warnings";
- private static final String SQL_COMPAT = "sql-compat";
- private static final String CLIENT_CONTEXT_ID = "client_context_id";
-
- private static final String MODE_DEFERRED = "deferred";
- private static final String CLIENT_TYPE_JDBC = "jdbc";
- private static final String RESULTS = "results";
- private static final String FORMAT_LOSSLESS_ADM = "lossless-adm";
- private static final String PLAN_FORMAT_STRING = "string";
-
- private static final String OPTIONAL_TYPE_SUFFIX = "?";
- static final char TEXT_DELIMITER = ':';
- static final String EXPLAIN_ONLY_RESULT_COLUMN_NAME = "$1";
- static final String DEFAULT_DATAVERSE = "Default";
-
- final ADBDriverContext driverContext;
- final HttpClientConnectionManager httpConnectionManager;
- final HttpClientContext httpClientContext;
- final CloseableHttpClient httpClient;
final URI queryEndpoint;
final URI queryResultEndpoint;
final URI activeRequestsEndpoint;
- final String user;
- final int maxWarnings;
+ final HttpClientConnectionManager httpConnectionManager;
+ final HttpClientContext httpClientContext;
+ final CloseableHttpClient httpClient;
- protected ADBProtocol(String host, int port, Map<ADBDriverProperty, Object> params, ADBDriverContext driverContext)
+ public ADBProtocol(String host, int port, Map<ADBDriverProperty, Object> params, ADBDriverContext driverContext)
throws SQLException {
- URI queryEndpoint = createEndpointUri(host, port, getQueryServiceEndpointPath(), driverContext.errorReporter);
+ super(driverContext, params);
+ URI queryEndpoint =
+ createEndpointUri(host, port, QUERY_SERVICE_ENDPOINT_PATH, driverContext.getErrorReporter());
URI queryResultEndpoint =
- createEndpointUri(host, port, getQueryResultEndpointPath(), driverContext.errorReporter);
+ createEndpointUri(host, port, QUERY_RESULT_ENDPOINT_PATH, driverContext.getErrorReporter());
URI activeRequestsEndpoint =
- createEndpointUri(host, port, getActiveRequestsEndpointPath(params), driverContext.errorReporter);
+ createEndpointUri(host, port, getActiveRequestsEndpointPath(params), driverContext.getErrorReporter());
+
PoolingHttpClientConnectionManager httpConnectionManager = new PoolingHttpClientConnectionManager();
int maxConnections = Math.max(16, Runtime.getRuntime().availableProcessors());
httpConnectionManager.setDefaultMaxPerRoute(maxConnections);
@@ -156,45 +117,24 @@
requestConfigBuilder.setSocketTimeout(socketTimeoutMillis.intValue());
}
RequestConfig requestConfig = requestConfigBuilder.build();
-
HttpClientBuilder httpClientBuilder = HttpClientBuilder.create();
httpClientBuilder.setConnectionManager(httpConnectionManager);
httpClientBuilder.setConnectionManagerShared(true);
httpClientBuilder.setDefaultRequestConfig(requestConfig);
- String user = (String) ADBDriverProperty.Common.USER.fetchPropertyValue(params);
if (user != null) {
String password = (String) ADBDriverProperty.Common.PASSWORD.fetchPropertyValue(params);
httpClientBuilder.setDefaultCredentialsProvider(createCredentialsProvider(user, password));
}
- Number maxWarnings = (Number) ADBDriverProperty.Common.MAX_WARNINGS.fetchPropertyValue(params);
-
- this.user = user;
this.queryEndpoint = queryEndpoint;
this.queryResultEndpoint = queryResultEndpoint;
this.activeRequestsEndpoint = activeRequestsEndpoint;
this.httpConnectionManager = httpConnectionManager;
this.httpClient = httpClientBuilder.build();
this.httpClientContext = createHttpClientContext(queryEndpoint);
- this.driverContext = Objects.requireNonNull(driverContext);
- this.maxWarnings = Math.max(maxWarnings.intValue(), 0);
}
- private static URI createEndpointUri(String host, int port, String path, ADBErrorReporter errorReporter)
- throws SQLException {
- try {
- return new URI("http", null, host, port, path, null, null);
- } catch (URISyntaxException e) {
- throw errorReporter.errorParameterValueNotSupported("endpoint " + host + ":" + port);
- }
- }
-
- private static CredentialsProvider createCredentialsProvider(String user, String password) {
- CredentialsProvider cp = new BasicCredentialsProvider();
- cp.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(user, password));
- return cp;
- }
-
- void close() throws SQLException {
+ @Override
+ public void close() throws SQLException {
try {
httpClient.close();
} catch (IOException e) {
@@ -204,7 +144,8 @@
}
}
- String connect() throws SQLException {
+ @Override
+ public String connect() throws SQLException {
String databaseVersion = pingImpl(-1, true); // TODO:review timeout
if (getLogger().isLoggable(Level.FINE)) {
getLogger().log(Level.FINE, String.format("connected to '%s' at %s", databaseVersion, queryEndpoint));
@@ -212,7 +153,8 @@
return databaseVersion;
}
- boolean ping(int timeoutSeconds) {
+ @Override
+ public boolean ping(int timeoutSeconds) {
try {
pingImpl(timeoutSeconds, false);
return true;
@@ -247,8 +189,9 @@
}
}
- QueryServiceResponse submitStatement(String sql, List<?> args, UUID executionId, SubmitStatementOptions options)
- throws SQLException {
+ @Override
+ public QueryServiceResponse submitStatement(String sql, List<?> args, UUID executionId,
+ SubmitStatementOptions options) throws SQLException {
HttpPost httpPost = new HttpPost(queryEndpoint);
httpPost.setHeader(HttpHeaders.ACCEPT, ContentType.APPLICATION_JSON
.withParameters(new BasicNameValuePair(FORMAT_LOSSLESS_ADM, Boolean.TRUE.toString())).toString());
@@ -256,7 +199,7 @@
ByteArrayOutputStreamImpl baos = new ByteArrayOutputStreamImpl(512);
try {
JsonGenerator jsonGen =
- driverContext.genericObjectWriter.getFactory().createGenerator(baos, JsonEncoding.UTF8);
+ driverContext.getGenericObjectWriter().getFactory().createGenerator(baos, JsonEncoding.UTF8);
jsonGen.writeStartObject();
jsonGen.writeStringField(CLIENT_TYPE, CLIENT_TYPE_JDBC);
jsonGen.writeStringField(MODE, MODE_DEFERRED);
@@ -284,7 +227,7 @@
}
if (args != null && !args.isEmpty()) {
jsonGen.writeFieldName(ARGS);
- driverContext.admFormatObjectWriter.writeValue(jsonGen, args);
+ driverContext.getAdmFormatObjectWriter().writeValue(jsonGen, args);
}
jsonGen.writeEndObject();
jsonGen.flush();
@@ -309,21 +252,6 @@
}
}
- public static class SubmitStatementOptions {
- public String dataverseName;
- public int timeoutSeconds;
- public boolean forceReadOnly;
- public boolean compileOnly;
- public boolean sqlCompatMode;
-
- private SubmitStatementOptions() {
- }
- }
-
- SubmitStatementOptions createSubmitStatementOptions() {
- return new SubmitStatementOptions();
- }
-
private QueryServiceResponse handlePostQueryResponse(CloseableHttpResponse httpResponse)
throws SQLException, IOException {
int httpStatus = httpResponse.getStatusLine().getStatusCode();
@@ -341,7 +269,8 @@
}
QueryServiceResponse response;
try (InputStream contentStream = httpResponse.getEntity().getContent()) {
- response = driverContext.genericObjectReader.forType(QueryServiceResponse.class).readValue(contentStream);
+ response =
+ driverContext.getGenericObjectReader().forType(QueryServiceResponse.class).readValue(contentStream);
}
QueryServiceResponse.Status status = response.status;
if (httpStatus == HttpStatus.SC_OK && status == QueryServiceResponse.Status.SUCCESS) {
@@ -358,7 +287,8 @@
}
}
- JsonParser fetchResult(QueryServiceResponse response) throws SQLException {
+ @Override
+ public JsonParser fetchResult(QueryServiceResponse response) throws SQLException {
if (response.handle == null) {
throw getErrorReporter().errorInProtocol();
}
@@ -387,7 +317,7 @@
}
HttpEntity entity = httpResponse.getEntity();
httpContentStream = entity.getContent();
- parser = driverContext.genericObjectReader.getFactory()
+ parser = driverContext.getGenericObjectReader().getFactory()
.createParser(new InputStreamWithAttachedResource(httpContentStream, httpResponse));
if (!advanceToArrayField(parser, RESULTS)) {
throw getErrorReporter().errorInProtocol();
@@ -425,29 +355,8 @@
}
}
- ArrayNode fetchExplainOnlyResult(QueryServiceResponse response,
- ADBPreparedStatement.AbstractValueSerializer stringSer) throws SQLException {
- if (response.results == null || response.results.isEmpty()) {
- throw getErrorReporter().errorInProtocol();
- }
- Object v = response.results.get(0);
- if (!(v instanceof String)) {
- throw getErrorReporter().errorInProtocol();
- }
- try (BufferedReader br = new BufferedReader(new StringReader(v.toString()))) {
- ArrayNode arrayNode = (ArrayNode) driverContext.genericObjectReader.createArrayNode();
- String line;
- while ((line = br.readLine()) != null) {
- arrayNode.addObject().put(ADBProtocol.EXPLAIN_ONLY_RESULT_COLUMN_NAME,
- stringSer.serializeToString(line));
- }
- return arrayNode;
- } catch (IOException e) {
- throw getErrorReporter().errorInResultHandling(e);
- }
- }
-
- void cancelStatementExecution(UUID executionId) throws SQLException {
+ @Override
+ public void cancelRunningStatement(UUID executionId) throws SQLException {
HttpDelete httpDelete;
try {
URIBuilder uriBuilder = new URIBuilder(activeRequestsEndpoint);
@@ -474,7 +383,23 @@
}
}
- private HttpClientContext createHttpClientContext(URI uri) {
+ @Override
+ public ADBErrorReporter getErrorReporter() {
+ return driverContext.getErrorReporter();
+ }
+
+ @Override
+ public Logger getLogger() {
+ return driverContext.getLogger();
+ }
+
+ private static CredentialsProvider createCredentialsProvider(String user, String password) {
+ CredentialsProvider cp = new BasicCredentialsProvider();
+ cp.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(user, password));
+ return cp;
+ }
+
+ private static HttpClientContext createHttpClientContext(URI uri) {
HttpClientContext hcCtx = HttpClientContext.create();
AuthCache ac = new BasicAuthCache();
ac.put(new HttpHost(uri.getHost(), uri.getPort(), uri.getScheme()), new BasicScheme());
@@ -482,134 +407,6 @@
return hcCtx;
}
- boolean isStatementCategory(QueryServiceResponse response, QueryServiceResponse.StatementCategory category) {
- return response.plans != null && category.equals(response.plans.statementCategory);
- }
-
- int getUpdateCount(QueryServiceResponse response) {
- // TODO:need to get update count through the response
- return isStatementCategory(response, QueryServiceResponse.StatementCategory.UPDATE) ? 1 : 0;
- }
-
- SQLException getErrorIfExists(QueryServiceResponse response) {
- if (response.errors != null && !response.errors.isEmpty()) {
- QueryServiceResponse.Message err = response.errors.get(0);
- return new SQLException(err.msg, null, err.code);
- }
- return null;
- }
-
- List<QueryServiceResponse.Message> getWarningIfExists(QueryServiceResponse response) {
- return response.warnings != null && !response.warnings.isEmpty() ? response.warnings : null;
- }
-
- SQLWarning createSQLWarning(List<QueryServiceResponse.Message> warnings) {
- SQLWarning sqlWarning = null;
- ListIterator<QueryServiceResponse.Message> i = warnings.listIterator(warnings.size());
- while (i.hasPrevious()) {
- QueryServiceResponse.Message w = i.previous();
- SQLWarning sw = new SQLWarning(w.msg, null, w.code);
- if (sqlWarning != null) {
- sw.setNextWarning(sqlWarning);
- }
- sqlWarning = sw;
- }
- return sqlWarning;
- }
-
- List<ADBColumn> getColumns(QueryServiceResponse response) throws SQLException {
- if (isExplainOnly(response)) {
- return Collections.singletonList(new ADBColumn(EXPLAIN_ONLY_RESULT_COLUMN_NAME, ADBDatatype.STRING, false));
- }
- QueryServiceResponse.Signature signature = response.signature;
- if (signature == null) {
- throw getErrorReporter().errorInProtocol();
- }
- List<String> nameList = signature.name;
- List<String> typeList = signature.type;
- if (nameList == null || nameList.isEmpty() || typeList == null || typeList.isEmpty()) {
- throw getErrorReporter().errorBadResultSignature();
- }
- int count = nameList.size();
- List<ADBColumn> result = new ArrayList<>(count);
- for (int i = 0; i < count; i++) {
- String columnName = nameList.get(i);
- String typeName = typeList.get(i);
- boolean optional = false;
- if (typeName.endsWith(OPTIONAL_TYPE_SUFFIX)) {
- optional = true;
- typeName = typeName.substring(0, typeName.length() - OPTIONAL_TYPE_SUFFIX.length());
- }
- ADBDatatype columnType = ADBDatatype.findByTypeName(typeName);
- if (columnType == null) {
- throw getErrorReporter().errorBadResultSignature();
- }
- result.add(new ADBColumn(columnName, columnType, optional));
- }
- return result;
- }
-
- boolean isExplainOnly(QueryServiceResponse response) {
- return response.plans != null && Boolean.TRUE.equals(response.plans.explainOnly);
- }
-
- int getStatementParameterCount(QueryServiceResponse response) throws SQLException {
- QueryServiceResponse.Plans plans = response.plans;
- if (plans == null) {
- throw getErrorReporter().errorInProtocol();
- }
- if (plans.statementParameters == null) {
- return 0;
- }
- int paramPos = 0;
- for (Object param : plans.statementParameters) {
- if (param instanceof Number) {
- paramPos = Math.max(paramPos, ((Number) param).intValue());
- } else {
- throw getErrorReporter().errorParameterNotSupported(String.valueOf(param));
- }
- }
- return paramPos;
- }
-
- JsonParser createJsonParser(JsonNode node) {
- return driverContext.genericObjectReader.treeAsTokens(node);
- }
-
- ADBErrorReporter getErrorReporter() {
- return driverContext.errorReporter;
- }
-
- Logger getLogger() {
- return driverContext.logger;
- }
-
- static ObjectMapper createObjectMapper() {
- ObjectMapper om = new ObjectMapper();
- om.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.NON_PRIVATE);
- // serialization
- om.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
-
- // deserialization
- om.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
- om.configure(DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_AS_NULL, true);
- om.enable(MapperFeature.ACCEPT_CASE_INSENSITIVE_ENUMS);
- return om;
- }
-
- protected String getQueryServiceEndpointPath() {
- return QUERY_SERVICE_ENDPOINT_PATH;
- }
-
- protected String getQueryResultEndpointPath() {
- return QUERY_RESULT_ENDPOINT_PATH;
- }
-
- protected String getActiveRequestsEndpointPath(Map<ADBDriverProperty, Object> params) {
- String path = (String) ADBDriverProperty.Common.ACTIVE_REQUESTS_PATH.fetchPropertyValue(params);
- return path != null ? path : ACTIVE_REQUESTS_ENDPOINT_PATH;
- }
-
private static void closeQuietly(Exception mainExc, java.io.Closeable... closeableList) {
for (Closeable closeable : closeableList) {
if (closeable != null) {
@@ -624,6 +421,20 @@
}
}
+ private static URI createEndpointUri(String host, int port, String path, ADBErrorReporter errorReporter)
+ throws SQLException {
+ try {
+ return new URI("http", null, host, port, path, null, null);
+ } catch (URISyntaxException e) {
+ throw errorReporter.errorParameterValueNotSupported("endpoint " + host + ":" + port);
+ }
+ }
+
+ private String getActiveRequestsEndpointPath(Map<ADBDriverProperty, Object> params) {
+ String path = (String) ADBDriverProperty.Common.ACTIVE_REQUESTS_PATH.fetchPropertyValue(params);
+ return path != null ? path : ACTIVE_REQUESTS_ENDPOINT_PATH;
+ }
+
static final class ByteArrayOutputStreamImpl extends ByteArrayOutputStream implements ContentProducer {
private ByteArrayOutputStreamImpl(int size) {
super(size);
@@ -665,45 +476,4 @@
}
}
- public static class QueryServiceResponse {
-
- public Status status;
- public Plans plans;
- public Signature signature;
- public String handle;
- public List<?> results; // currently only used for EXPLAIN results
- public List<Message> errors;
- public List<Message> warnings;
-
- public enum Status {
- RUNNING,
- SUCCESS,
- TIMEOUT,
- FAILED,
- FATAL
- }
-
- public static class Signature {
- List<String> name;
- List<String> type;
- }
-
- public static class Plans {
- StatementCategory statementCategory;
- List<Object> statementParameters;
- Boolean explainOnly;
- }
-
- public static class Message {
- int code;
- String msg;
- }
-
- public enum StatementCategory {
- QUERY,
- UPDATE,
- DDL,
- PROCEDURE
- }
- }
}
diff --git a/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBProtocolBase.java b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBProtocolBase.java
new file mode 100644
index 0000000..39db835
--- /dev/null
+++ b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBProtocolBase.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.jdbc.core;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.StringReader;
+import java.sql.SQLException;
+import java.sql.SQLWarning;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.UUID;
+import java.util.function.Function;
+import java.util.logging.Logger;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+
+public abstract class ADBProtocolBase {
+
+ public static final String STATEMENT = "statement";
+ public static final String ARGS = "args";
+ public static final String MODE = "mode";
+ public static final String READ_ONLY = "readonly";
+ public static final String DATAVERSE = "dataverse";
+ public static final String TIMEOUT = "timeout";
+ public static final String SIGNATURE = "signature";
+ public static final String COMPILE_ONLY = "compile-only";
+ public static final String CLIENT_TYPE = "client-type";
+ public static final String PLAN_FORMAT = "plan-format";
+ public static final String MAX_WARNINGS = "max-warnings";
+ public static final String SQL_COMPAT = "sql-compat";
+ public static final String CLIENT_CONTEXT_ID = "client_context_id";
+
+ public static final String MODE_DEFERRED = "deferred";
+ public static final String CLIENT_TYPE_JDBC = "jdbc";
+ public static final String RESULTS = "results";
+ public static final String FORMAT_LOSSLESS_ADM = "lossless-adm";
+ public static final String PLAN_FORMAT_STRING = "string";
+
+ private static final String DEFAULT_DATAVERSE = "Default";
+ private static final String OPTIONAL_TYPE_SUFFIX = "?";
+ private static final String EXPLAIN_ONLY_RESULT_COLUMN_NAME = "$1";
+
+ protected final ADBDriverContext driverContext;
+ protected final String user;
+ protected final int maxWarnings;
+
+ protected ADBProtocolBase(ADBDriverContext driverContext, Map<ADBDriverProperty, Object> params) {
+ this.driverContext = Objects.requireNonNull(driverContext);
+ this.user = (String) ADBDriverProperty.Common.USER.fetchPropertyValue(params);
+ Number maxWarningsNum = (Number) ADBDriverProperty.Common.MAX_WARNINGS.fetchPropertyValue(params);
+ this.maxWarnings = Math.max(maxWarningsNum.intValue(), 0);
+ }
+
+ public abstract String connect() throws SQLException;
+
+ public abstract void close() throws SQLException;
+
+ public abstract boolean ping(int timeoutSeconds);
+
+ public abstract QueryServiceResponse submitStatement(String sql, List<?> args, UUID executionId,
+ SubmitStatementOptions options) throws SQLException;
+
+ public abstract JsonParser fetchResult(QueryServiceResponse response) throws SQLException;
+
+ public abstract void cancelRunningStatement(UUID executionId) throws SQLException;
+
+ public String getUser() {
+ return user;
+ }
+
+ public ADBDriverContext getDriverContext() {
+ return driverContext;
+ }
+
+ public ADBErrorReporter getErrorReporter() {
+ return getDriverContext().getErrorReporter();
+ }
+
+ public Logger getLogger() {
+ return getDriverContext().getLogger();
+ }
+
+ public SubmitStatementOptions createSubmitStatementOptions() {
+ return new SubmitStatementOptions();
+ }
+
+ public int getUpdateCount(QueryServiceResponse response) {
+ // TODO:need to get update count through the response
+ return isStatementCategory(response, QueryServiceResponse.StatementCategory.UPDATE) ? 1 : 0;
+ }
+
+ public ArrayNode fetchExplainOnlyResult(QueryServiceResponse response, Function<String, String> lineConverter)
+ throws SQLException {
+ if (response.results == null || response.results.isEmpty()) {
+ throw getErrorReporter().errorInProtocol();
+ }
+ Object v = response.results.get(0);
+ if (!(v instanceof String)) {
+ throw getErrorReporter().errorInProtocol();
+ }
+ try (BufferedReader br = new BufferedReader(new StringReader(v.toString()))) {
+ ArrayNode arrayNode = (ArrayNode) getDriverContext().getGenericObjectReader().createArrayNode();
+ String line;
+ while ((line = br.readLine()) != null) {
+ arrayNode.addObject().put(EXPLAIN_ONLY_RESULT_COLUMN_NAME, lineConverter.apply(line));
+ }
+ return arrayNode;
+ } catch (IOException e) {
+ throw getErrorReporter().errorInResultHandling(e);
+ }
+ }
+
+ public boolean isStatementCategory(QueryServiceResponse response, QueryServiceResponse.StatementCategory category) {
+ return response.plans != null && category.equals(response.plans.statementCategory);
+ }
+
+ public SQLException getErrorIfExists(QueryServiceResponse response) {
+ if (response.errors != null && !response.errors.isEmpty()) {
+ QueryServiceResponse.Message err = response.errors.get(0);
+ return new SQLException(err.msg, null, err.code);
+ }
+ return null;
+ }
+
+ public List<QueryServiceResponse.Message> getWarningIfExists(QueryServiceResponse response) {
+ return response.warnings != null && !response.warnings.isEmpty() ? response.warnings : null;
+ }
+
+ public SQLWarning createSQLWarning(List<QueryServiceResponse.Message> warnings) {
+ SQLWarning sqlWarning = null;
+ ListIterator<QueryServiceResponse.Message> i = warnings.listIterator(warnings.size());
+ while (i.hasPrevious()) {
+ QueryServiceResponse.Message w = i.previous();
+ SQLWarning sw = new SQLWarning(w.msg, null, w.code);
+ if (sqlWarning != null) {
+ sw.setNextWarning(sqlWarning);
+ }
+ sqlWarning = sw;
+ }
+ return sqlWarning;
+ }
+
+ public List<ADBColumn> getColumns(QueryServiceResponse response) throws SQLException {
+ if (isExplainOnly(response)) {
+ return Collections.singletonList(new ADBColumn(EXPLAIN_ONLY_RESULT_COLUMN_NAME, ADBDatatype.STRING, false));
+ }
+ QueryServiceResponse.Signature signature = response.signature;
+ if (signature == null) {
+ throw getErrorReporter().errorInProtocol();
+ }
+ List<String> nameList = signature.name;
+ List<String> typeList = signature.type;
+ if (nameList == null || nameList.isEmpty() || typeList == null || typeList.isEmpty()) {
+ throw getErrorReporter().errorBadResultSignature();
+ }
+ int count = nameList.size();
+ List<ADBColumn> result = new ArrayList<>(count);
+ for (int i = 0; i < count; i++) {
+ String columnName = nameList.get(i);
+ String typeName = typeList.get(i);
+ boolean optional = false;
+ if (typeName.endsWith(OPTIONAL_TYPE_SUFFIX)) {
+ optional = true;
+ typeName = typeName.substring(0, typeName.length() - OPTIONAL_TYPE_SUFFIX.length());
+ }
+ ADBDatatype columnType = ADBDatatype.findByTypeName(typeName);
+ if (columnType == null) {
+ throw getErrorReporter().errorBadResultSignature();
+ }
+ result.add(new ADBColumn(columnName, columnType, optional));
+ }
+ return result;
+ }
+
+ public boolean isExplainOnly(QueryServiceResponse response) {
+ return response.plans != null && Boolean.TRUE.equals(response.plans.explainOnly);
+ }
+
+ public int getStatementParameterCount(QueryServiceResponse response) throws SQLException {
+ QueryServiceResponse.Plans plans = response.plans;
+ if (plans == null) {
+ throw getErrorReporter().errorInProtocol();
+ }
+ if (plans.statementParameters == null) {
+ return 0;
+ }
+ int paramPos = 0;
+ for (Object param : plans.statementParameters) {
+ if (param instanceof Number) {
+ paramPos = Math.max(paramPos, ((Number) param).intValue());
+ } else {
+ throw getErrorReporter().errorParameterNotSupported(String.valueOf(param));
+ }
+ }
+ return paramPos;
+ }
+
+ public String getDefaultDataverse() {
+ return DEFAULT_DATAVERSE;
+ }
+
+ public static class SubmitStatementOptions {
+ public String dataverseName;
+ public int timeoutSeconds;
+ public boolean forceReadOnly;
+ public boolean compileOnly;
+ public boolean sqlCompatMode;
+ }
+
+ public static class QueryServiceResponse {
+
+ public Status status;
+ public Plans plans;
+ public Signature signature;
+ public String handle;
+ public List<?> results; // currently only used for EXPLAIN results
+ public List<Message> errors;
+ public List<Message> warnings;
+
+ public enum Status {
+ RUNNING,
+ SUCCESS,
+ TIMEOUT,
+ FAILED,
+ FATAL
+ }
+
+ public enum StatementCategory {
+ QUERY,
+ UPDATE,
+ DDL,
+ PROCEDURE
+ }
+
+ public static class Signature {
+ List<String> name;
+ List<String> type;
+ }
+
+ public static class Plans {
+ StatementCategory statementCategory;
+ List<Object> statementParameters;
+ Boolean explainOnly;
+ }
+
+ public static class Message {
+ int code;
+ String msg;
+ }
+ }
+}
diff --git a/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBResultSet.java b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBResultSet.java
index 965855c..8ba5b83 100644
--- a/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBResultSet.java
+++ b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBResultSet.java
@@ -364,9 +364,9 @@
private ObjectReader getComplexColumnReader() {
if (complexColumnReader == null) {
- ADBDriverContext ctx = metadata.statement.connection.protocol.driverContext;
+ ADBDriverContext ctx = metadata.statement.connection.protocol.getDriverContext();
ADBRowStore tmpStore = createRowStore(1);
- complexColumnReader = tmpStore.createComplexColumnObjectReader(ctx.admFormatObjectReader);
+ complexColumnReader = tmpStore.createComplexColumnObjectReader(ctx.getAdmFormatObjectReader());
}
return complexColumnReader;
}
diff --git a/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBRowStore.java b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBRowStore.java
index 29957cd..ad9d691 100644
--- a/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBRowStore.java
+++ b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBRowStore.java
@@ -66,6 +66,7 @@
final class ADBRowStore {
+ static final char TEXT_DELIMITER = ':';
private static final String ROW_STORE_ATTR_NAME = ADBRowStore.class.getSimpleName();
private static final int FLOAT_NAN_BITS = Float.floatToIntBits(Float.NaN);
@@ -154,7 +155,7 @@
objectStore[columnIndex] = new String(textChars, nonTaggedOffset, nonTaggedLength);
break;
case DURATION:
- int delimiterOffset = indexOf(ADBProtocol.TEXT_DELIMITER, textChars, nonTaggedOffset, nonTaggedEnd);
+ int delimiterOffset = indexOf(TEXT_DELIMITER, textChars, nonTaggedOffset, nonTaggedEnd);
if (delimiterOffset < 0 || delimiterOffset == nonTaggedEnd - 1) {
throw getErrorReporter().errorInProtocol();
}
@@ -1035,7 +1036,7 @@
jsonGenBuffer = new StringWriter();
try {
//TODO:FIXME:need to configure generator to print java.sql.Date/Times properly
- jsonGen = resultSet.metadata.statement.connection.protocol.driverContext.genericObjectWriter
+ jsonGen = resultSet.metadata.statement.connection.protocol.getDriverContext().getGenericObjectWriter()
.getFactory().createGenerator(jsonGenBuffer);
} catch (IOException e) {
throw getErrorReporter().errorInResultHandling(e);
@@ -1056,7 +1057,7 @@
return templateReader.withAttribute(ROW_STORE_ATTR_NAME, this);
}
- static void configureDeserialization(ObjectMapper objectMapper, SimpleModule serdeModule) {
+ static void configureADMFormatDeserialization(ObjectMapper objectMapper, SimpleModule serdeModule) {
objectMapper.configure(DeserializationFeature.USE_LONG_FOR_INTS, true);
serdeModule.setDeserializerModifier(createADMFormatDeserializerModifier());
}
@@ -1146,7 +1147,7 @@
parsedLength = 0;
return ADBDatatype.STRING.getTypeTag();
}
- if (textChars[textOffset] == ADBProtocol.TEXT_DELIMITER) {
+ if (textChars[textOffset] == TEXT_DELIMITER) {
// any string
parsedLength = 1;
return ADBDatatype.STRING.getTypeTag();
@@ -1165,7 +1166,7 @@
if (textLength < typeTagLength + delimiterLength) {
throw getErrorReporter().errorInProtocol();
}
- if (textChars[textOffset + typeTagLength] != ADBProtocol.TEXT_DELIMITER) {
+ if (textChars[textOffset + typeTagLength] != TEXT_DELIMITER) {
throw getErrorReporter().errorInProtocol();
}
parsedLength = typeTagLength + delimiterLength;
diff --git a/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBStatement.java b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBStatement.java
index 61072e8..ce706a4 100644
--- a/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBStatement.java
+++ b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBStatement.java
@@ -83,14 +83,14 @@
// common result fields
protected int updateCount = -1;
- protected List<ADBProtocol.QueryServiceResponse.Message> warnings;
+ protected List<ADBProtocolBase.QueryServiceResponse.Message> warnings;
// executeQuery() result fields
protected final ConcurrentLinkedQueue<ADBResultSet> resultSetsWithResources;
protected final ConcurrentLinkedQueue<WeakReference<ADBResultSet>> resultSetsWithoutResources;
// execute() result field
- protected ADBProtocol.QueryServiceResponse executeResponse;
+ protected ADBProtocolBase.QueryServiceResponse executeResponse;
protected ADBResultSet executeResultSet;
// Lifecycle
@@ -157,12 +157,12 @@
protected ADBResultSet executeQueryImpl(String sql, List<?> args) throws SQLException {
// note: we're not assigning executeResponse field at this method
try {
- ADBProtocol.SubmitStatementOptions stmtOptions = createSubmitStatementOptions();
+ ADBProtocolBase.SubmitStatementOptions stmtOptions = createSubmitStatementOptions();
stmtOptions.forceReadOnly = true;
- ADBProtocol.QueryServiceResponse response =
+ ADBProtocolBase.QueryServiceResponse response =
connection.protocol.submitStatement(sql, args, executionId, stmtOptions);
boolean isQuery = connection.protocol.isStatementCategory(response,
- ADBProtocol.QueryServiceResponse.StatementCategory.QUERY);
+ ADBProtocolBase.QueryServiceResponse.StatementCategory.QUERY);
if (!isQuery) {
throw getErrorReporter().errorInvalidStatementCategory();
}
@@ -218,11 +218,11 @@
protected int executeUpdateImpl(String sql, List<Object> args) throws SQLException {
try {
- ADBProtocol.SubmitStatementOptions stmtOptions = createSubmitStatementOptions();
- ADBProtocol.QueryServiceResponse response =
+ ADBProtocolBase.SubmitStatementOptions stmtOptions = createSubmitStatementOptions();
+ ADBProtocolBase.QueryServiceResponse response =
connection.protocol.submitStatement(sql, args, executionId, stmtOptions);
boolean isQuery = connection.protocol.isStatementCategory(response,
- ADBProtocol.QueryServiceResponse.StatementCategory.QUERY);
+ ADBProtocolBase.QueryServiceResponse.StatementCategory.QUERY);
// TODO: remove result set on the server (both query and update returning cases)
if (isQuery) {
throw getErrorReporter().errorInvalidStatementCategory();
@@ -258,12 +258,12 @@
protected boolean executeImpl(String sql, List<Object> args) throws SQLException {
try {
- ADBProtocol.SubmitStatementOptions stmtOptions = createSubmitStatementOptions();
- ADBProtocol.QueryServiceResponse response =
+ ADBProtocolBase.SubmitStatementOptions stmtOptions = createSubmitStatementOptions();
+ ADBProtocolBase.QueryServiceResponse response =
connection.protocol.submitStatement(sql, args, executionId, stmtOptions);
warnings = connection.protocol.getWarningIfExists(response);
boolean isQuery = connection.protocol.isStatementCategory(response,
- ADBProtocol.QueryServiceResponse.StatementCategory.QUERY);
+ ADBProtocolBase.QueryServiceResponse.StatementCategory.QUERY);
if (isQuery) {
updateCount = -1;
executeResponse = response;
@@ -281,7 +281,7 @@
@Override
public void cancel() throws SQLException {
checkClosed();
- connection.protocol.cancelStatementExecution(executionId);
+ connection.protocol.cancelRunningStatement(executionId);
}
@Override
@@ -308,8 +308,8 @@
executionId = UUID.randomUUID();
}
- protected ADBProtocol.SubmitStatementOptions createSubmitStatementOptions() {
- ADBProtocol.SubmitStatementOptions stmtOptions = connection.protocol.createSubmitStatementOptions();
+ protected ADBProtocolBase.SubmitStatementOptions createSubmitStatementOptions() {
+ ADBProtocolBase.SubmitStatementOptions stmtOptions = connection.protocol.createSubmitStatementOptions();
stmtOptions.dataverseName = connection.getDataverseCanonicalName();
stmtOptions.sqlCompatMode = connection.sqlCompatMode;
stmtOptions.timeoutSeconds = queryTimeoutSeconds;
@@ -343,7 +343,7 @@
@Override
public ADBResultSet getResultSet() throws SQLException {
checkClosed();
- ADBProtocol.QueryServiceResponse response = executeResponse;
+ ADBProtocolBase.QueryServiceResponse response = executeResponse;
if (response == null) {
return null;
}
@@ -406,14 +406,15 @@
// ResultSet lifecycle
- private ADBResultSet fetchResultSet(ADBProtocol.QueryServiceResponse execResponse) throws SQLException {
+ private ADBResultSet fetchResultSet(ADBProtocolBase.QueryServiceResponse execResponse) throws SQLException {
List<ADBColumn> columns = connection.protocol.getColumns(execResponse);
if (getLogger().isLoggable(Level.FINER)) {
getLogger().log(Level.FINE, "result schema " + columns);
}
if (connection.protocol.isExplainOnly(execResponse)) {
AbstractValueSerializer stringSer = getADMFormatSerializer(String.class);
- ArrayNode explainResult = connection.protocol.fetchExplainOnlyResult(execResponse, stringSer);
+ ArrayNode explainResult =
+ connection.protocol.fetchExplainOnlyResult(execResponse, stringSer::serializeToString);
return createSystemResultSet(columns, explainResult);
} else {
JsonParser rowParser = connection.protocol.fetchResult(execResponse);
@@ -422,12 +423,12 @@
}
protected ADBResultSet createSystemResultSet(List<ADBColumn> columns, ArrayNode values) {
- JsonParser rowParser = connection.protocol.createJsonParser(values);
+ JsonParser rowParser = connection.protocol.getDriverContext().getGenericObjectReader().treeAsTokens(values);
return createResultSetImpl(columns, rowParser, false, 0);
}
protected ADBResultSet createEmptyResultSet() {
- ArrayNode empty = (ArrayNode) connection.protocol.driverContext.genericObjectReader.createArrayNode();
+ ArrayNode empty = (ArrayNode) connection.protocol.getDriverContext().getGenericObjectReader().createArrayNode();
return createSystemResultSet(Collections.emptyList(), empty);
}
@@ -642,7 +643,7 @@
// Serialization
- static void configureSerialization(SimpleModule serdeModule) {
+ static void configureADMFormatSerialization(SimpleModule serdeModule) {
serdeModule.setSerializerModifier(createADMFormatSerializerModifier());
}
@@ -725,7 +726,7 @@
@Override
protected String serializeToString(Object value) {
- return ADBProtocol.TEXT_DELIMITER + String.valueOf(value);
+ return ADBRowStore.TEXT_DELIMITER + String.valueOf(value);
}
};
}
@@ -908,7 +909,7 @@
protected final String serializeToString(Object value) {
StringBuilder textBuilder = new StringBuilder(64); // TODO:optimize?
printByteAsHex(adbType.getTypeTag(), textBuilder);
- textBuilder.append(ADBProtocol.TEXT_DELIMITER);
+ textBuilder.append(ADBRowStore.TEXT_DELIMITER);
serializeNonTaggedValue(value, textBuilder);
return textBuilder.toString();
}
diff --git a/asterixdb-jdbc/asterix-jdbc-driver/src/main/java/org/apache/asterix/jdbc/Driver.java b/asterixdb-jdbc/asterix-jdbc-driver/src/main/java/org/apache/asterix/jdbc/Driver.java
index 4d175ed..f13ac05 100644
--- a/asterixdb-jdbc/asterix-jdbc-driver/src/main/java/org/apache/asterix/jdbc/Driver.java
+++ b/asterixdb-jdbc/asterix-jdbc-driver/src/main/java/org/apache/asterix/jdbc/Driver.java
@@ -19,9 +19,16 @@
package org.apache.asterix.jdbc;
-import org.apache.asterix.jdbc.core.ADBDriverBase;
+import java.sql.SQLException;
+import java.util.Map;
-public final class Driver extends ADBDriverBase implements java.sql.Driver {
+import org.apache.asterix.jdbc.core.ADBDriverBase;
+import org.apache.asterix.jdbc.core.ADBDriverContext;
+import org.apache.asterix.jdbc.core.ADBDriverProperty;
+import org.apache.asterix.jdbc.core.ADBProtocol;
+import org.apache.asterix.jdbc.core.ADBProtocolBase;
+
+public class Driver extends ADBDriverBase implements java.sql.Driver {
private static final String DRIVER_SCHEME = "asterixdb:";
@@ -35,4 +42,10 @@
public Driver() {
super(DRIVER_SCHEME, DEFAULT_API_PORT);
}
+
+ @Override
+ protected ADBProtocolBase createProtocol(String host, int port, Map<ADBDriverProperty, Object> properties,
+ ADBDriverContext driverContext) throws SQLException {
+ return new ADBProtocol(host, port, properties, driverContext);
+ }
}