diff --git a/asterixdb-jdbc/LICENSE b/asterixdb-jdbc/LICENSE
new file mode 100644
index 0000000..261eeb9
--- /dev/null
+++ b/asterixdb-jdbc/LICENSE
@@ -0,0 +1,201 @@
+                                 Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "[]"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright [yyyy] [name of copyright owner]
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
diff --git a/asterixdb-jdbc/NOTICE b/asterixdb-jdbc/NOTICE
new file mode 100644
index 0000000..0f871c8
--- /dev/null
+++ b/asterixdb-jdbc/NOTICE
@@ -0,0 +1,8 @@
+Apache AsterixDB JDBC Driver
+Copyright 2021 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+The Initial Developer of the driver software is Couchbase, Inc.
+Copyright 2021 Couchbase, Inc.
diff --git a/asterixdb-jdbc/asterix-jdbc-core/pom.xml b/asterixdb-jdbc/asterix-jdbc-core/pom.xml
new file mode 100644
index 0000000..87fb7c8
--- /dev/null
+++ b/asterixdb-jdbc/asterix-jdbc-core/pom.xml
@@ -0,0 +1,72 @@
+<!--
+ ! Licensed to the Apache Software Foundation (ASF) under one
+ ! or more contributor license agreements.  See the NOTICE file
+ ! distributed with this work for additional information
+ ! regarding copyright ownership.  The ASF licenses this file
+ ! to you under the Apache License, Version 2.0 (the
+ ! "License"); you may not use this file except in compliance
+ ! with the License.  You may obtain a copy of the License at
+ !
+ !   http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing,
+ ! software distributed under the License is distributed on an
+ ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ! KIND, either express or implied.  See the License for the
+ ! specific language governing permissions and limitations
+ ! under the License.
+ !-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <artifactId>apache-asterixdb</artifactId>
+    <groupId>org.apache.asterix</groupId>
+    <version>0.9.7-SNAPSHOT</version>
+  </parent>
+  <artifactId>asterix-jdbc-core</artifactId>
+
+  <licenses>
+    <license>
+      <name>Apache License, Version 2.0</name>
+      <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+      <distribution>repo</distribution>
+      <comments>A business-friendly OSS license</comments>
+    </license>
+  </licenses>
+
+  <properties>
+    <root.dir>${basedir}/..</root.dir>
+    <source.jdk.version>1.8</source.jdk.version>
+    <target.jdk.version>1.8</target.jdk.version>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.httpcomponents</groupId>
+      <artifactId>httpclient</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.httpcomponents</groupId>
+      <artifactId>httpcore</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-core</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-databind</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-annotations</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+</project>
diff --git a/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBColumn.java b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBColumn.java
new file mode 100644
index 0000000..eb2de01
--- /dev/null
+++ b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBColumn.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.jdbc.core;
+
+import java.util.Objects;
+
+final class ADBColumn {
+
+    private final String name;
+
+    private final ADBDatatype type;
+
+    private final boolean optional;
+
+    ADBColumn(String name, ADBDatatype type, boolean optional) {
+        this.name = Objects.requireNonNull(name);
+        this.type = Objects.requireNonNull(type);
+        this.optional = optional || type.isNullOrMissing() || type == ADBDatatype.ANY;
+    }
+
+    String getName() {
+        return name;
+    }
+
+    ADBDatatype getType() {
+        return type;
+    }
+
+    boolean isOptional() {
+        return optional;
+    }
+
+    @Override
+    public String toString() {
+        return String.format("%s:%s%s", name, type.getTypeName(), optional ? "?" : "");
+    }
+}
diff --git a/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBConnection.java b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBConnection.java
new file mode 100644
index 0000000..46e38bc
--- /dev/null
+++ b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBConnection.java
@@ -0,0 +1,556 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.jdbc.core;
+
+import java.sql.Array;
+import java.sql.Blob;
+import java.sql.CallableStatement;
+import java.sql.Clob;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.NClob;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLClientInfoException;
+import java.sql.SQLException;
+import java.sql.SQLPermission;
+import java.sql.SQLWarning;
+import java.sql.SQLXML;
+import java.sql.Savepoint;
+import java.sql.Statement;
+import java.sql.Struct;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class ADBConnection extends ADBWrapperSupport implements Connection {
+
+    final ADBProtocol protocol;
+
+    final String url;
+
+    final String databaseVersion;
+
+    private final AtomicBoolean closed;
+
+    private final ConcurrentLinkedQueue<ADBStatement> statements;
+
+    private volatile SQLWarning warning;
+
+    private volatile ADBMetaStatement metaStatement;
+
+    volatile String catalog;
+
+    volatile String schema;
+
+    // Lifecycle
+
+    protected ADBConnection(ADBProtocol protocol, String url, String databaseVersion, String catalog, String schema,
+            SQLWarning connectWarning) {
+        this.url = Objects.requireNonNull(url);
+        this.protocol = Objects.requireNonNull(protocol);
+        this.databaseVersion = databaseVersion;
+        this.statements = new ConcurrentLinkedQueue<>();
+        this.warning = connectWarning;
+        this.catalog = catalog;
+        this.schema = schema;
+        this.closed = new AtomicBoolean(false);
+    }
+
+    @Override
+    public void close() throws SQLException {
+        closeImpl(null);
+    }
+
+    @Override
+    public void abort(Executor executor) throws SQLException {
+        if (executor == null) {
+            throw getErrorReporter().errorParameterValueNotSupported("executor");
+        }
+        SecurityManager sec = System.getSecurityManager();
+        if (sec != null) {
+            sec.checkPermission(new SQLPermission("callAbort"));
+        }
+        closeImpl(executor);
+    }
+
+    void closeImpl(Executor executor) throws SQLException {
+        boolean wasClosed = closed.getAndSet(true);
+        if (wasClosed) {
+            return;
+        }
+        if (executor == null) {
+            closeStatementsAndProtocol();
+        } else {
+            executor.execute(() -> {
+                try {
+                    closeStatementsAndProtocol();
+                } catch (SQLException e) {
+                    if (getLogger().isLoggable(Level.FINE)) {
+                        getLogger().log(Level.FINE, e.getMessage(), e);
+                    }
+                }
+            });
+        }
+    }
+
+    private void closeStatementsAndProtocol() throws SQLException {
+        SQLException err = null;
+        try {
+            closeRegisteredStatements();
+        } catch (SQLException e) {
+            err = e;
+        }
+        try {
+            protocol.close();
+        } catch (SQLException e) {
+            if (err != null) {
+                e.addSuppressed(err);
+            }
+            err = e;
+        }
+        if (err != null) {
+            throw err;
+        }
+    }
+
+    @Override
+    public boolean isClosed() {
+        return closed.get();
+    }
+
+    private void checkClosed() throws SQLException {
+        if (isClosed()) {
+            throw getErrorReporter().errorObjectClosed(Connection.class);
+        }
+    }
+
+    // Connectivity
+
+    @Override
+    public boolean isValid(int timeoutSeconds) throws SQLException {
+        if (isClosed()) {
+            return false;
+        }
+        if (timeoutSeconds < 0) {
+            throw getErrorReporter().errorParameterValueNotSupported("timeoutSeconds");
+        }
+        return protocol.ping(timeoutSeconds);
+    }
+
+    @Override
+    public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(Connection.class, "setNetworkTimeout");
+    }
+
+    @Override
+    public int getNetworkTimeout() throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(Connection.class, "getNetworkTimeout");
+    }
+
+    // Metadata
+
+    @Override
+    public DatabaseMetaData getMetaData() throws SQLException {
+        checkClosed();
+        ADBMetaStatement metaStatement = getOrCreateMetaStatement();
+        return createDatabaseMetaData(metaStatement);
+    }
+
+    private ADBMetaStatement getOrCreateMetaStatement() {
+        ADBMetaStatement stmt = metaStatement;
+        if (stmt == null) {
+            synchronized (this) {
+                stmt = metaStatement;
+                if (stmt == null) {
+                    stmt = createMetaStatement();
+                    registerStatement(stmt);
+                    metaStatement = stmt;
+                }
+            }
+        }
+        return stmt;
+    }
+
+    protected ADBMetaStatement createMetaStatement() {
+        return new ADBMetaStatement(this);
+    }
+
+    protected ADBDatabaseMetaData createDatabaseMetaData(ADBMetaStatement metaStatement) {
+        return new ADBDatabaseMetaData(metaStatement, databaseVersion);
+    }
+
+    // Statement construction
+
+    @Override
+    public Statement createStatement() throws SQLException {
+        checkClosed();
+        return createStatementImpl();
+    }
+
+    @Override
+    public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException {
+        return createStatement(resultSetType, resultSetConcurrency, getHoldability());
+    }
+
+    @Override
+    public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability)
+            throws SQLException {
+        checkClosed();
+        checkResultSetConfig(resultSetType, resultSetConcurrency, resultSetHoldability);
+        return createStatementImpl();
+    }
+
+    private void checkResultSetConfig(int resultSetType, int resultSetConcurrency, int resultSetHoldability)
+            throws SQLException {
+        boolean ok = resultSetType == ResultSet.TYPE_FORWARD_ONLY && resultSetConcurrency == ResultSet.CONCUR_READ_ONLY;
+        if (!ok) {
+            throw getErrorReporter().errorParameterValueNotSupported("resultSetType/resultSetConcurrency");
+        }
+        if (resultSetHoldability != ADBResultSet.RESULT_SET_HOLDABILITY) {
+            if (getLogger().isLoggable(Level.FINE)) {
+                getLogger().log(Level.FINE,
+                        getErrorReporter().warningParameterValueNotSupported("ResultSetHoldability"));
+            }
+        }
+    }
+
+    private ADBStatement createStatementImpl() {
+        ADBStatement stmt = new ADBStatement(this, catalog, schema);
+        registerStatement(stmt);
+        return stmt;
+    }
+
+    @Override
+    public PreparedStatement prepareStatement(String sql) throws SQLException {
+        checkClosed();
+        return prepareStatementImpl(sql);
+    }
+
+    @Override
+    public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency)
+            throws SQLException {
+        return prepareStatement(sql, resultSetType, resultSetConcurrency, getHoldability());
+    }
+
+    @Override
+    public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency,
+            int resultSetHoldability) throws SQLException {
+        checkClosed();
+        checkResultSetConfig(resultSetType, resultSetConcurrency, resultSetHoldability);
+        return prepareStatementImpl(sql);
+    }
+
+    @Override
+    public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(Connection.class, "prepareStatement");
+    }
+
+    @Override
+    public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(Connection.class, "prepareStatement");
+    }
+
+    @Override
+    public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(Connection.class, "prepareStatement");
+    }
+
+    private ADBPreparedStatement prepareStatementImpl(String sql) throws SQLException {
+        ADBPreparedStatement stmt = new ADBPreparedStatement(this, sql, catalog, schema);
+        registerStatement(stmt);
+        return stmt;
+    }
+
+    @Override
+    public CallableStatement prepareCall(String sql) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(Connection.class, "prepareCall");
+    }
+
+    @Override
+    public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(Connection.class, "prepareCall");
+    }
+
+    @Override
+    public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency,
+            int resultSetHoldability) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(Connection.class, "prepareCall");
+    }
+
+    @Override
+    public String nativeSQL(String sql) throws SQLException {
+        checkClosed();
+        return sql;
+    }
+
+    @Override
+    public String getCatalog() throws SQLException {
+        checkClosed();
+        return catalog;
+    }
+
+    @Override
+    public void setCatalog(String catalog) throws SQLException {
+        checkClosed();
+        this.catalog = catalog;
+    }
+
+    @Override
+    public String getSchema() throws SQLException {
+        checkClosed();
+        return schema;
+    }
+
+    @Override
+    public void setSchema(String schema) throws SQLException {
+        checkClosed();
+        this.schema = schema;
+    }
+
+    // Statement lifecycle
+
+    private void registerStatement(ADBStatement stmt) {
+        statements.add(Objects.requireNonNull(stmt));
+    }
+
+    void deregisterStatement(ADBStatement stmt) {
+        statements.remove(Objects.requireNonNull(stmt));
+    }
+
+    private void closeRegisteredStatements() throws SQLException {
+        SQLException err = null;
+        ADBStatement statement;
+        while ((statement = statements.poll()) != null) {
+            try {
+                statement.closeImpl(true, false);
+            } catch (SQLException e) {
+                if (err != null) {
+                    e.addSuppressed(err);
+                }
+                err = e;
+            }
+        }
+        if (err != null) {
+            throw err;
+        }
+    }
+
+    // Transaction control
+
+    @Override
+    public int getTransactionIsolation() throws SQLException {
+        checkClosed();
+        return Connection.TRANSACTION_READ_COMMITTED;
+    }
+
+    @Override
+    public void setTransactionIsolation(int level) throws SQLException {
+        checkClosed();
+        switch (level) {
+            case Connection.TRANSACTION_READ_COMMITTED:
+                break;
+            case Connection.TRANSACTION_READ_UNCOMMITTED:
+            case Connection.TRANSACTION_REPEATABLE_READ:
+            case Connection.TRANSACTION_SERIALIZABLE:
+                if (getLogger().isLoggable(Level.FINE)) {
+                    getLogger().log(Level.FINE,
+                            getErrorReporter().warningParameterValueNotSupported("TransactionIsolationLevel"));
+                }
+                break;
+            default:
+                throw getErrorReporter().errorParameterValueNotSupported("TransactionIsolationLevel");
+        }
+    }
+
+    @Override
+    public int getHoldability() throws SQLException {
+        checkClosed();
+        return ResultSet.HOLD_CURSORS_OVER_COMMIT;
+    }
+
+    @Override
+    public void setHoldability(int holdability) throws SQLException {
+        checkClosed();
+        switch (holdability) {
+            case ResultSet.HOLD_CURSORS_OVER_COMMIT:
+                break;
+            case ResultSet.CLOSE_CURSORS_AT_COMMIT:
+                if (getLogger().isLoggable(Level.FINE)) {
+                    getLogger().log(Level.FINE, getErrorReporter().warningParameterValueNotSupported("Holdability"));
+                }
+                break;
+            default:
+                throw getErrorReporter().errorParameterValueNotSupported("Holdability");
+        }
+    }
+
+    @Override
+    public boolean getAutoCommit() throws SQLException {
+        checkClosed();
+        return true;
+    }
+
+    @Override
+    public void setAutoCommit(boolean autoCommit) throws SQLException {
+        checkClosed();
+    }
+
+    @Override
+    public void commit() throws SQLException {
+        checkClosed();
+        throw getErrorReporter().errorIncompatibleMode("AutoCommit");
+    }
+
+    @Override
+    public void rollback() throws SQLException {
+        checkClosed();
+        throw getErrorReporter().errorIncompatibleMode("AutoCommit");
+    }
+
+    @Override
+    public Savepoint setSavepoint() throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(Connection.class, "setSavepoint");
+    }
+
+    @Override
+    public Savepoint setSavepoint(String name) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(Connection.class, "setSavepoint");
+    }
+
+    @Override
+    public void releaseSavepoint(Savepoint savepoint) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(Connection.class, "releaseSavepoint");
+    }
+
+    @Override
+    public void rollback(Savepoint savepoint) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(Connection.class, "rollback");
+    }
+
+    // Value construction
+
+    @Override
+    public Clob createClob() throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(Connection.class, "createClob");
+    }
+
+    @Override
+    public Blob createBlob() throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(Connection.class, "createBlob");
+    }
+
+    @Override
+    public NClob createNClob() throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(Connection.class, "createNClob");
+    }
+
+    @Override
+    public Array createArrayOf(String typeName, Object[] elements) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(Connection.class, "createArrayOf");
+    }
+
+    @Override
+    public Struct createStruct(String typeName, Object[] attributes) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(Connection.class, "createStruct");
+    }
+
+    @Override
+    public SQLXML createSQLXML() throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(Connection.class, "createSQLXML");
+    }
+
+    @Override
+    public Map<String, Class<?>> getTypeMap() throws SQLException {
+        checkClosed();
+        return Collections.emptyMap();
+    }
+
+    @Override
+    public void setTypeMap(Map<String, Class<?>> map) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(Connection.class, "setTypeMap");
+    }
+
+    // Unsupported hints (ignored)
+
+    @Override
+    public boolean isReadOnly() throws SQLException {
+        checkClosed();
+        return false;
+    }
+
+    @Override
+    public void setReadOnly(boolean readOnly) throws SQLException {
+        checkClosed();
+    }
+
+    // Errors and warnings
+
+    @Override
+    public SQLWarning getWarnings() throws SQLException {
+        checkClosed();
+        return warning;
+    }
+
+    @Override
+    public void clearWarnings() throws SQLException {
+        checkClosed();
+        warning = null;
+    }
+
+    @Override
+    protected ADBErrorReporter getErrorReporter() {
+        return protocol.driverContext.errorReporter;
+    }
+
+    protected Logger getLogger() {
+        return protocol.driverContext.logger;
+    }
+
+    // Miscellaneous unsupported features (error is raised)
+
+    @Override
+    public String getClientInfo(String name) throws SQLException {
+        checkClosed();
+        return null;
+    }
+
+    @Override
+    public Properties getClientInfo() throws SQLException {
+        checkClosed();
+        return new Properties();
+    }
+
+    @Override
+    public void setClientInfo(Properties properties) throws SQLClientInfoException {
+        throw getErrorReporter().errorClientInfoMethodNotSupported(Connection.class, "setClientInfo");
+    }
+
+    @Override
+    public void setClientInfo(String name, String value) throws SQLClientInfoException {
+        throw getErrorReporter().errorClientInfoMethodNotSupported(Connection.class, "setClientInfo");
+    }
+}
diff --git a/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBDatabaseMetaData.java b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBDatabaseMetaData.java
new file mode 100644
index 0000000..a090498
--- /dev/null
+++ b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBDatabaseMetaData.java
@@ -0,0 +1,1038 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.jdbc.core;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.sql.RowIdLifetime;
+import java.sql.SQLException;
+import java.util.Objects;
+
+final class ADBDatabaseMetaData extends ADBWrapperSupport implements DatabaseMetaData {
+
+    /*
+     * See org.apache.asterix.metadata.utils.MetadataConstants.METADATA_OBJECT_NAME_LENGTH_LIMIT_UTF8
+     */
+    private static final int METADATA_OBJECT_NAME_LENGTH_LIMIT_UTF8 = 251;
+
+    private final ADBMetaStatement metaStatement;
+
+    private final String databaseVersionText;
+
+    private volatile ADBProductVersion databaseVersion;
+
+    ADBDatabaseMetaData(ADBMetaStatement metaStatement, String databaseVersionText) {
+        this.metaStatement = Objects.requireNonNull(metaStatement);
+        this.databaseVersionText = databaseVersionText;
+    }
+
+    // Driver name and version
+
+    @Override
+    public String getDriverName() {
+        return metaStatement.connection.protocol.driverContext.driverVersion.productName;
+    }
+
+    @Override
+    public String getDriverVersion() {
+        return metaStatement.connection.protocol.driverContext.driverVersion.productVersion;
+    }
+
+    @Override
+    public int getDriverMajorVersion() {
+        return metaStatement.connection.protocol.driverContext.driverVersion.majorVersion;
+    }
+
+    @Override
+    public int getDriverMinorVersion() {
+        return metaStatement.connection.protocol.driverContext.driverVersion.minorVersion;
+    }
+
+    @Override
+    public int getJDBCMajorVersion() {
+        return ADBDriverBase.JDBC_MAJOR_VERSION;
+    }
+
+    @Override
+    public int getJDBCMinorVersion() {
+        return ADBDriverBase.JDBC_MINOR_VERSION;
+    }
+
+    // Database name and version
+
+    @Override
+    public String getDatabaseProductName() {
+        return getDatabaseVersion().productName;
+    }
+
+    @Override
+    public String getDatabaseProductVersion() {
+        return getDatabaseVersion().productVersion;
+    }
+
+    @Override
+    public int getDatabaseMajorVersion() {
+        return getDatabaseVersion().majorVersion;
+    }
+
+    @Override
+    public int getDatabaseMinorVersion() {
+        return getDatabaseVersion().minorVersion;
+    }
+
+    private ADBProductVersion getDatabaseVersion() {
+        ADBProductVersion result = databaseVersion;
+        if (result == null) {
+            databaseVersion = result = ADBProductVersion.parseDatabaseVersion(databaseVersionText);
+        }
+        return result;
+    }
+
+    // Database objects
+
+    // Catalogs and schemas
+
+    @Override
+    public ADBResultSet getCatalogs() throws SQLException {
+        return metaStatement.executeGetCatalogsQuery();
+    }
+
+    @Override
+    public int getMaxCatalogNameLength() {
+        return METADATA_OBJECT_NAME_LENGTH_LIMIT_UTF8;
+    }
+
+    @Override
+    public ADBResultSet getSchemas() throws SQLException {
+        return getSchemas(metaStatement.connection.catalog, null);
+    }
+
+    @Override
+    public ADBResultSet getSchemas(String catalog, String schemaPattern) throws SQLException {
+        return metaStatement.executeGetSchemasQuery(catalog, schemaPattern);
+    }
+
+    @Override
+    public int getMaxSchemaNameLength() {
+        return METADATA_OBJECT_NAME_LENGTH_LIMIT_UTF8;
+    }
+
+    // Tables
+
+    @Override
+    public ADBResultSet getTableTypes() throws SQLException {
+        return metaStatement.executeGetTableTypesQuery();
+    }
+
+    @Override
+    public ADBResultSet getTables(String catalog, String schemaPattern, String tableNamePattern, String[] types)
+            throws SQLException {
+        return metaStatement.executeGetTablesQuery(catalog, schemaPattern, tableNamePattern, types);
+    }
+
+    @Override
+    public ADBResultSet getSuperTables(String catalog, String schemaPattern, String tableNamePattern)
+            throws SQLException {
+        return metaStatement.executeEmptyResultQuery();
+    }
+
+    @Override
+    public int getMaxTableNameLength() {
+        return METADATA_OBJECT_NAME_LENGTH_LIMIT_UTF8;
+    }
+
+    @Override
+    public int getMaxColumnsInTable() {
+        return 0;
+    }
+
+    @Override
+    public int getMaxRowSize() {
+        return 0;
+    }
+
+    @Override
+    public boolean doesMaxRowSizeIncludeBlobs() {
+        return true;
+    }
+
+    // Columns
+
+    @Override
+    public ADBResultSet getColumns(String catalog, String schemaPattern, String tableNamePattern,
+            String columnNamePattern) throws SQLException {
+        return metaStatement.executeGetColumnsQuery(catalog, schemaPattern, tableNamePattern, columnNamePattern);
+    }
+
+    @Override
+    public ADBResultSet getPseudoColumns(String catalog, String schemaPattern, String tableNamePattern,
+            String columnNamePattern) throws SQLException {
+        return metaStatement.executeEmptyResultQuery();
+    }
+
+    @Override
+    public ADBResultSet getVersionColumns(String catalog, String schema, String table) throws SQLException {
+        return metaStatement.executeEmptyResultQuery();
+    }
+
+    @Override
+    public ADBResultSet getBestRowIdentifier(String catalog, String schema, String table, int scope, boolean nullable)
+            throws SQLException {
+        // TODO:primary keys?
+        return metaStatement.executeEmptyResultQuery();
+    }
+
+    @Override
+    public int getMaxColumnNameLength() {
+        return 0;
+    }
+
+    // Keys
+
+    @Override
+    public ADBResultSet getPrimaryKeys(String catalog, String schema, String table) throws SQLException {
+        return metaStatement.executeGetPrimaryKeysQuery(catalog, schema, table);
+    }
+
+    @Override
+    public ADBResultSet getImportedKeys(String catalog, String schema, String table) throws SQLException {
+        return metaStatement.executeEmptyResultQuery();
+    }
+
+    @Override
+    public ADBResultSet getExportedKeys(String catalog, String schema, String table) throws SQLException {
+        return metaStatement.executeEmptyResultQuery();
+    }
+
+    @Override
+    public ADBResultSet getCrossReference(String parentCatalog, String parentSchema, String parentTable,
+            String foreignCatalog, String foreignSchema, String foreignTable) throws SQLException {
+        return metaStatement.executeEmptyResultQuery();
+    }
+
+    // Indexes
+
+    @Override
+    public ADBResultSet getIndexInfo(String catalog, String schema, String table, boolean unique, boolean approximate)
+            throws SQLException {
+        return metaStatement.executeEmptyResultQuery();
+    }
+
+    @Override
+    public int getMaxColumnsInIndex() {
+        return 0;
+    }
+
+    @Override
+    public int getMaxIndexLength() {
+        return 0;
+    }
+
+    // Datatypes
+
+    @Override
+    public ADBResultSet getTypeInfo() throws SQLException {
+        return metaStatement.executeGetTypeInfoQuery();
+    }
+
+    @Override
+    public ADBResultSet getSuperTypes(String catalog, String schemaPattern, String typeNamePattern)
+            throws SQLException {
+        return metaStatement.executeEmptyResultQuery();
+    }
+
+    @Override
+    public ADBResultSet getUDTs(String catalog, String schemaPattern, String typeNamePattern, int[] types)
+            throws SQLException {
+        return metaStatement.executeEmptyResultQuery();
+    }
+
+    @Override
+    public ADBResultSet getAttributes(String catalog, String schemaPattern, String typeNamePattern,
+            String attributeNamePattern) throws SQLException {
+        return metaStatement.executeEmptyResultQuery();
+    }
+
+    @Override
+    public RowIdLifetime getRowIdLifetime() {
+        return RowIdLifetime.ROWID_UNSUPPORTED;
+    }
+
+    @Override
+    public long getMaxLogicalLobSize() {
+        return 0;
+    }
+
+    @Override
+    public boolean supportsRefCursors() {
+        return false;
+    }
+
+    // User-defined functions and procedures
+
+    @Override
+    public boolean supportsStoredProcedures() {
+        return false;
+    }
+
+    @Override
+    public ADBResultSet getFunctions(String catalog, String schemaPattern, String functionNamePattern)
+            throws SQLException {
+        return metaStatement.executeEmptyResultQuery();
+    }
+
+    @Override
+    public ADBResultSet getFunctionColumns(String catalog, String schemaPattern, String functionNamePattern,
+            String columnNamePattern) throws SQLException {
+        return metaStatement.executeEmptyResultQuery();
+    }
+
+    @Override
+    public ADBResultSet getProcedures(String catalog, String schemaPattern, String procedureNamePattern)
+            throws SQLException {
+        return metaStatement.executeEmptyResultQuery();
+    }
+
+    @Override
+    public ADBResultSet getProcedureColumns(String catalog, String schemaPattern, String procedureNamePattern,
+            String columnNamePattern) throws SQLException {
+        return metaStatement.executeEmptyResultQuery();
+    }
+
+    @Override
+    public int getMaxProcedureNameLength() {
+        return METADATA_OBJECT_NAME_LENGTH_LIMIT_UTF8;
+    }
+
+    // Security
+
+    @Override
+    public ADBResultSet getTablePrivileges(String catalog, String schemaPattern, String tableNamePattern)
+            throws SQLException {
+        return metaStatement.executeEmptyResultQuery();
+    }
+
+    @Override
+    public ADBResultSet getColumnPrivileges(String catalog, String schema, String table, String columnNamePattern)
+            throws SQLException {
+        return metaStatement.executeEmptyResultQuery();
+    }
+
+    // Other database objects
+
+    @Override
+    public ADBResultSet getClientInfoProperties() throws SQLException {
+        return metaStatement.executeEmptyResultQuery();
+    }
+
+    // SQL dialect: general
+
+    @Override
+    public boolean supportsMinimumSQLGrammar() {
+        return true;
+    }
+
+    @Override
+    public boolean supportsCoreSQLGrammar() {
+        return true;
+    }
+
+    @Override
+    public boolean supportsExtendedSQLGrammar() {
+        return false;
+    }
+
+    @Override
+    public boolean supportsANSI92EntryLevelSQL() {
+        return true;
+    }
+
+    @Override
+    public boolean supportsANSI92IntermediateSQL() {
+        return false;
+    }
+
+    @Override
+    public boolean supportsANSI92FullSQL() {
+        return false;
+    }
+
+    @Override
+    public String getSQLKeywords() {
+        // keywords that are not also SQL:2003 keywords
+        return "adapter,apply,asc,autogenerated,btree,closed,compaction,compact,correlate,collection,dataset,"
+                + "dataverse,definition,desc,disconnect,div,explain,enforced,every,feed,flatten,fulltext,hints,if,"
+                + "index,ingestion,internal,keyword,key,known,letting,let,limit,load,missing,mod,nodegroup,ngram,"
+                + "offset,path,policy,pre-sorted,raw,refresh,returning,rtree,run,satisfies,secondary,some,stop,"
+                + "synonym,temporary,type,upsert,use,view,write";
+    }
+
+    @Override
+    public String getCatalogTerm() {
+        return "catalog";
+    }
+
+    @Override
+    public String getSchemaTerm() {
+        return "schema";
+    }
+
+    @Override
+    public String getProcedureTerm() {
+        return "procedure";
+    }
+
+    @Override
+    public int getMaxStatementLength() {
+        return 0;
+    }
+
+    // SQL dialect: identifiers
+
+    @Override
+    public String getExtraNameCharacters() {
+        return "";
+    }
+
+    @Override
+    public String getIdentifierQuoteString() {
+        return "`";
+    }
+
+    @Override
+    public boolean supportsMixedCaseIdentifiers() {
+        return true;
+    }
+
+    @Override
+    public boolean storesMixedCaseIdentifiers() {
+        return false;
+    }
+
+    @Override
+    public boolean supportsMixedCaseQuotedIdentifiers() {
+        return true;
+    }
+
+    @Override
+    public boolean storesMixedCaseQuotedIdentifiers() {
+        return false;
+    }
+
+    @Override
+    public boolean storesLowerCaseIdentifiers() {
+        return false;
+    }
+
+    @Override
+    public boolean storesLowerCaseQuotedIdentifiers() {
+        return false;
+    }
+
+    @Override
+    public boolean storesUpperCaseIdentifiers() {
+        return false;
+    }
+
+    @Override
+    public boolean storesUpperCaseQuotedIdentifiers() {
+        return false;
+    }
+
+    // SQL dialect: literals and parameters
+
+    @Override
+    public int getMaxBinaryLiteralLength() {
+        return 0;
+    }
+
+    @Override
+    public int getMaxCharLiteralLength() {
+        return 0;
+    }
+
+    @Override
+    public boolean supportsNamedParameters() {
+        // Procedures (CallableStatement) are not supported
+        return false;
+    }
+
+    // SQL dialect: functions and operators
+
+    @Override
+    public String getNumericFunctions() {
+        // NOTE: JDBC escape clause is not yet supported
+        // "add,div,mod,mult,neg,sub,abs,acos,asin,atan,atan2,ceil,cos,deg,degrees,e,exp,ln,log,floor,inf,nan,neginf,pi,posinf,power,rad,radians,random,round,sign,sin,sqrt,tan,trunc";
+        return "";
+    }
+
+    @Override
+    public String getStringFunctions() {
+        // NOTE: JDBC escape clause is not yet supported
+        // "contains,initcap,length,lower,ltrim,position,pos,regex_contains,regex_like,regex_position,regex_pos,regex_replace,repeat,replace,rtrim,split,substr,title,trim,upper";
+        return "";
+    }
+
+    @Override
+    public String getSystemFunctions() {
+        // NOTE: JDBC escape clause is not yet supported
+        return "";
+    }
+
+    @Override
+    public String getTimeDateFunctions() {
+        // TODO:review
+        return "current_date,current_time,current_datetime";
+    }
+
+    @Override
+    public boolean supportsConvert() {
+        return false;
+    }
+
+    @Override
+    public boolean supportsConvert(int fromType, int toType) {
+        return false;
+    }
+
+    @Override
+    public String getSearchStringEscape() {
+        return "\\";
+    }
+
+    @Override
+    public boolean supportsLikeEscapeClause() {
+        return false;
+    }
+
+    @Override
+    public boolean nullPlusNonNullIsNull() {
+        return false;
+    }
+
+    @Override
+    public boolean supportsStoredFunctionsUsingCallSyntax() {
+        return false;
+    }
+
+    // SQL dialect: SELECT clause
+
+    @Override
+    public int getMaxColumnsInSelect() {
+        return 0;
+    }
+
+    @Override
+    public boolean supportsColumnAliasing() {
+        return true;
+    }
+
+    // SQL dialect: FROM clause
+
+    @Override
+    public boolean allTablesAreSelectable() {
+        return true;
+    }
+
+    @Override
+    public int getMaxTablesInSelect() {
+        return 0;
+    }
+
+    @Override
+    public boolean isCatalogAtStart() {
+        return true;
+    }
+
+    @Override
+    public String getCatalogSeparator() {
+        return ".";
+    }
+
+    @Override
+    public boolean supportsTableCorrelationNames() {
+        return true;
+    }
+
+    @Override
+    public boolean supportsDifferentTableCorrelationNames() {
+        return true;
+    }
+
+    // SQL dialect: JOIN clause
+
+    @Override
+    public boolean supportsOuterJoins() {
+        return true;
+    }
+
+    @Override
+    public boolean supportsLimitedOuterJoins() {
+        return true;
+    }
+
+    @Override
+    public boolean supportsFullOuterJoins() {
+        return false;
+    }
+
+    // SQL dialect: ORDER BY clause
+
+    @Override
+    public int getMaxColumnsInOrderBy() {
+        return 0;
+    }
+
+    @Override
+    public boolean supportsOrderByUnrelated() {
+        return true;
+    }
+
+    @Override
+    public boolean supportsExpressionsInOrderBy() {
+        return true;
+    }
+
+    @Override
+    public boolean nullsAreSortedHigh() {
+        return false;
+    }
+
+    @Override
+    public boolean nullsAreSortedLow() {
+        return true;
+    }
+
+    @Override
+    public boolean nullsAreSortedAtStart() {
+        return false;
+    }
+
+    @Override
+    public boolean nullsAreSortedAtEnd() {
+        return false;
+    }
+
+    // SQL dialect: GROUP BY clause
+
+    @Override
+    public boolean supportsGroupBy() {
+        return true;
+    }
+
+    @Override
+    public boolean supportsGroupByUnrelated() {
+        return true;
+    }
+
+    @Override
+    public boolean supportsGroupByBeyondSelect() {
+        return true;
+    }
+
+    @Override
+    public int getMaxColumnsInGroupBy() {
+        return 0;
+    }
+
+    // SQL dialect: Subquery
+
+    @Override
+    public boolean supportsSubqueriesInComparisons() {
+        return true;
+    }
+
+    @Override
+    public boolean supportsSubqueriesInExists() {
+        return true;
+    }
+
+    @Override
+    public boolean supportsSubqueriesInIns() {
+        return true;
+    }
+
+    @Override
+    public boolean supportsSubqueriesInQuantifieds() {
+        return true;
+    }
+
+    @Override
+    public boolean supportsCorrelatedSubqueries() {
+        return true;
+    }
+
+    // SQL dialect: Set operations
+
+    @Override
+    public boolean supportsUnion() {
+        return false;
+    }
+
+    @Override
+    public boolean supportsUnionAll() {
+        return true;
+    }
+
+    // SQL dialect: DML statements
+
+    @Override
+    public boolean supportsCatalogsInDataManipulation() {
+        return true;
+    }
+
+    @Override
+    public boolean supportsSchemasInDataManipulation() {
+        return true;
+    }
+
+    @Override
+    public boolean supportsPositionedDelete() {
+        return false;
+    }
+
+    @Override
+    public boolean supportsPositionedUpdate() {
+        return false;
+    }
+
+    @Override
+    public boolean supportsSelectForUpdate() {
+        return false;
+    }
+
+    // SQL dialect: DDL statements
+
+    // DDL: CREATE DATASET
+
+    @Override
+    public boolean supportsCatalogsInTableDefinitions() {
+        return true;
+    }
+
+    @Override
+    public boolean supportsSchemasInTableDefinitions() {
+        return true;
+    }
+
+    @Override
+    public boolean supportsNonNullableColumns() {
+        return true;
+    }
+
+    @Override
+    public boolean supportsAlterTableWithAddColumn() {
+        return false;
+    }
+
+    @Override
+    public boolean supportsAlterTableWithDropColumn() {
+        return false;
+    }
+
+    // DDL: CREATE INDEX
+
+    @Override
+    public boolean supportsCatalogsInIndexDefinitions() {
+        return true;
+    }
+
+    @Override
+    public boolean supportsSchemasInIndexDefinitions() {
+        return true;
+    }
+
+    // DDL: GRANT / REVOKE (not supported)
+
+    @Override
+    public boolean supportsCatalogsInPrivilegeDefinitions() {
+        return false;
+    }
+
+    @Override
+    public boolean supportsSchemasInPrivilegeDefinitions() {
+        return false;
+    }
+
+    @Override
+    public boolean supportsIntegrityEnhancementFacility() {
+        return false;
+    }
+
+    // SQL dialect: User-defined functions and procedures
+
+    @Override
+    public boolean allProceduresAreCallable() {
+        return false;
+    }
+
+    @Override
+    public boolean supportsCatalogsInProcedureCalls() {
+        return false;
+    }
+
+    @Override
+    public boolean supportsSchemasInProcedureCalls() {
+        return false;
+    }
+
+    // Transactions
+
+    @Override
+    public boolean supportsTransactions() {
+        return false;
+    }
+
+    @Override
+    public boolean supportsMultipleTransactions() {
+        return true;
+    }
+
+    @Override
+    public int getDefaultTransactionIsolation() {
+        return Connection.TRANSACTION_READ_COMMITTED;
+    }
+
+    @Override
+    public boolean supportsTransactionIsolationLevel(int level) {
+        return Connection.TRANSACTION_READ_COMMITTED == level;
+    }
+
+    @Override
+    public boolean supportsDataDefinitionAndDataManipulationTransactions() {
+        return false;
+    }
+
+    @Override
+    public boolean supportsDataManipulationTransactionsOnly() {
+        return true;
+    }
+
+    @Override
+    public boolean supportsOpenStatementsAcrossCommit() {
+        return true;
+    }
+
+    @Override
+    public boolean supportsOpenCursorsAcrossCommit() {
+        return false;
+    }
+
+    @Override
+    public boolean supportsOpenStatementsAcrossRollback() {
+        return true;
+    }
+
+    @Override
+    public boolean supportsOpenCursorsAcrossRollback() {
+        return false;
+    }
+
+    @Override
+    public boolean dataDefinitionCausesTransactionCommit() {
+        return false;
+    }
+
+    @Override
+    public boolean dataDefinitionIgnoredInTransactions() {
+        return false;
+    }
+
+    @Override
+    public boolean supportsSavepoints() {
+        return false;
+    }
+
+    @Override
+    public boolean autoCommitFailureClosesAllResultSets() {
+        return false;
+    }
+
+    // Connection
+
+    @Override
+    public Connection getConnection() throws SQLException {
+        return metaStatement.connection;
+    }
+
+    @Override
+    public int getMaxConnections() {
+        return 0;
+    }
+
+    @Override
+    public String getURL() {
+        return metaStatement.connection.url;
+    }
+
+    @Override
+    public String getUserName() {
+        return metaStatement.connection.protocol.user;
+    }
+
+    @Override
+    public int getMaxUserNameLength() {
+        return 0;
+    }
+
+    // Statement
+
+    @Override
+    public int getMaxStatements() {
+        return 0;
+    }
+
+    @Override
+    public boolean supportsStatementPooling() {
+        return false;
+    }
+
+    @Override
+    public boolean supportsBatchUpdates() {
+        return false;
+    }
+
+    @Override
+    public boolean supportsGetGeneratedKeys() {
+        return false;
+    }
+
+    @Override
+    public boolean generatedKeyAlwaysReturned() {
+        return false;
+    }
+
+    @Override
+    public boolean supportsMultipleResultSets() {
+        return false;
+    }
+
+    @Override
+    public boolean supportsMultipleOpenResults() {
+        return false;
+    }
+
+    @Override
+    public boolean locatorsUpdateCopy() {
+        return false;
+    }
+
+    // ResultSet
+
+    @Override
+    public boolean supportsResultSetType(int type) {
+        return type == ResultSet.TYPE_FORWARD_ONLY;
+    }
+
+    @Override
+    public boolean supportsResultSetConcurrency(int type, int concurrency) {
+        return type == ResultSet.TYPE_FORWARD_ONLY && concurrency == ResultSet.CONCUR_READ_ONLY;
+    }
+
+    @Override
+    public boolean supportsResultSetHoldability(int holdability) {
+        return holdability == ADBResultSet.RESULT_SET_HOLDABILITY;
+    }
+
+    @Override
+    public int getResultSetHoldability() {
+        return ADBResultSet.RESULT_SET_HOLDABILITY;
+    }
+
+    @Override
+    public boolean ownInsertsAreVisible(int type) {
+        return false;
+    }
+
+    @Override
+    public boolean othersInsertsAreVisible(int type) {
+        return false;
+    }
+
+    @Override
+    public boolean insertsAreDetected(int type) {
+        return false;
+    }
+
+    @Override
+    public boolean ownUpdatesAreVisible(int type) {
+        return false;
+    }
+
+    @Override
+    public boolean othersUpdatesAreVisible(int type) {
+        return false;
+    }
+
+    @Override
+    public boolean updatesAreDetected(int type) {
+        return false;
+    }
+
+    @Override
+    public boolean ownDeletesAreVisible(int type) {
+        return false;
+    }
+
+    @Override
+    public boolean othersDeletesAreVisible(int type) {
+        return false;
+    }
+
+    @Override
+    public boolean deletesAreDetected(int type) {
+        return false;
+    }
+
+    @Override
+    public int getMaxCursorNameLength() {
+        return 0;
+    }
+
+    // Miscellaneous
+
+    @Override
+    public boolean isReadOnly() {
+        return false;
+    }
+
+    @Override
+    public boolean usesLocalFiles() {
+        return false;
+    }
+
+    @Override
+    public boolean usesLocalFilePerTable() {
+        return false;
+    }
+
+    // Errors and warnings
+
+    @Override
+    public int getSQLStateType() {
+        return sqlStateSQL;
+    }
+
+    @Override
+    protected ADBErrorReporter getErrorReporter() {
+        return metaStatement.getErrorReporter();
+    }
+}
diff --git a/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBDatatype.java b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBDatatype.java
new file mode 100644
index 0000000..e6634b5
--- /dev/null
+++ b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBDatatype.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.jdbc.core;
+
+import java.sql.JDBCType;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+enum ADBDatatype {
+    TINYINT(1, "int8", JDBCType.TINYINT, Byte.class),
+    SMALLINT(2, "int16", JDBCType.SMALLINT, Short.class),
+    INTEGER(3, "int32", JDBCType.INTEGER, Integer.class),
+    BIGINT(4, "int64", JDBCType.BIGINT, Long.class),
+
+    //UINT8(5, null, JDBCType.OTHER),
+    //UINT16(6, null, JDBCType.OTHER),
+    //UINT32(7, null, JDBCType.OTHER),
+    //UINT64(8, null, JDBCType.OTHER),
+    //BINARY(9, "binary", JDBCType.VARBINARY, byte[].class),
+    //BITARRAY(10, null, JDBCType.VARBINARY),
+
+    FLOAT(11, "float", JDBCType.REAL, Float.class),
+    DOUBLE(12, "double", JDBCType.DOUBLE, Double.class),
+    STRING(13, "string", JDBCType.VARCHAR, String.class),
+    MISSING(14, "missing", JDBCType.OTHER, Void.class), // don't report as JDBCType.NULL
+    BOOLEAN(15, "boolean", JDBCType.BOOLEAN, Boolean.class),
+    DATETIME(16, "datetime", JDBCType.TIMESTAMP, java.sql.Timestamp.class),
+    DATE(17, "date", JDBCType.DATE, java.sql.Date.class),
+    TIME(18, "time", JDBCType.TIME, java.sql.Time.class),
+    DURATION(19, "duration", JDBCType.OTHER, String.class),
+
+    //POINT(20, "point", JDBCType.OTHER, Object.class),
+    //POINT3D(21, "point3d", JDBCType.OTHER, Object.class),
+
+    ARRAY(22, "array", JDBCType.OTHER, List.class),
+    MULTISET(23, "multiset", JDBCType.OTHER, List.class),
+    OBJECT(24, "object", JDBCType.OTHER, Map.class),
+
+    //SPARSOBJECT(25, null, null, JDBCType.OTHER),
+    //UNION(26, null, JDBCType.OTHER),
+    //ENUM(27, null, JDBCType.OTHER),
+    //TYPE(28, null, JDBCType.OTHER),
+
+    ANY(29, "any", JDBCType.OTHER, String.class),
+
+    //LINE(30, "line", JDBCType.OTHER, Object.class),
+    //POLYGON(31, "polygon", JDBCType.OTHER, Object.class),
+    //CIRCLE(32, "circle", JDBCType.OTHER, Object.class),
+    //RECTANGLE(33, "rectangle", JDBCType.OTHER, Object.class),
+    //INTERVAL(34, "interval", JDBCType.OTHER, Object.class),
+    //SYSTEM_NULL(35, null, null, JDBCType.OTHER),
+
+    YEARMONTHDURATION(36, "year-month-duration", JDBCType.OTHER, java.time.Period.class),
+    DAYTIMEDURATION(37, "day-time-duration", JDBCType.OTHER, java.time.Duration.class),
+    UUID(38, "uuid", JDBCType.OTHER, java.util.UUID.class),
+
+    //SHORTWITHOUTTYPEINFO(40, null, null, JDBCType.OTHER),
+
+    NULL(41, "null", JDBCType.NULL, Void.class);
+
+    //GEOMETRY(42, "geometry", JDBCType.OTHER, Object.class)
+
+    private static final ADBDatatype[] BY_TYPE_TAG;
+
+    private static final Map<String, ADBDatatype> BY_TYPE_NAME;
+
+    private final byte typeTag;
+
+    private final String typeName;
+
+    private final JDBCType jdbcType;
+
+    private final Class<?> javaClass;
+
+    ADBDatatype(int typeTag, String typeName, JDBCType jdbcType, Class<?> javaClass) {
+        this.typeTag = (byte) typeTag;
+        this.typeName = Objects.requireNonNull(typeName);
+        this.jdbcType = Objects.requireNonNull(jdbcType);
+        this.javaClass = Objects.requireNonNull(javaClass);
+    }
+
+    byte getTypeTag() {
+        return typeTag;
+    }
+
+    String getTypeName() {
+        return typeName;
+    }
+
+    JDBCType getJdbcType() {
+        return jdbcType;
+    }
+
+    Class<?> getJavaClass() {
+        return javaClass;
+    }
+
+    @Override
+    public String toString() {
+        return getTypeName();
+    }
+
+    boolean isDerived() {
+        return this == OBJECT || isList();
+    }
+
+    boolean isList() {
+        return this == ARRAY || this == MULTISET;
+    }
+
+    boolean isNullOrMissing() {
+        return this == NULL || this == MISSING;
+    }
+
+    static {
+        ADBDatatype[] allTypes = ADBDatatype.values();
+        ADBDatatype[] byTypeTag = new ADBDatatype[findMaxTypeTag(allTypes) + 1];
+        Map<String, ADBDatatype> byTypeName = new HashMap<>();
+        for (ADBDatatype t : allTypes) {
+            byTypeTag[t.typeTag] = t;
+            byTypeName.put(t.typeName, t);
+        }
+        BY_TYPE_TAG = byTypeTag;
+        BY_TYPE_NAME = byTypeName;
+    }
+
+    public static ADBDatatype findByTypeTag(byte typeTag) {
+        return typeTag >= 0 && typeTag < BY_TYPE_TAG.length ? BY_TYPE_TAG[typeTag] : null;
+    }
+
+    public static ADBDatatype findByTypeName(String typeName) {
+        return BY_TYPE_NAME.get(typeName);
+    }
+
+    private static int findMaxTypeTag(ADBDatatype[] allTypes) {
+        int maxTypeTag = 0;
+        for (ADBDatatype type : allTypes) {
+            if (type.typeTag < 0) {
+                throw new IllegalStateException(type.getTypeName());
+            }
+            maxTypeTag = Math.max(type.typeTag, maxTypeTag);
+        }
+        return maxTypeTag;
+    }
+
+    static String getDerivedRecordName(ADBDatatype type) {
+        switch (type) {
+            case OBJECT:
+                return "Record";
+            case ARRAY:
+                return "OrderedList";
+            case MULTISET:
+                return "UnorderedList";
+            default:
+                throw new IllegalArgumentException(String.valueOf(type));
+        }
+    }
+}
diff --git a/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBDriverBase.java b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBDriverBase.java
new file mode 100644
index 0000000..5d89612
--- /dev/null
+++ b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBDriverBase.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.jdbc.core;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.DriverPropertyInfo;
+import java.sql.SQLException;
+import java.sql.SQLWarning;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.logging.ConsoleHandler;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.http.NameValuePair;
+import org.apache.http.client.utils.URLEncodedUtils;
+
+public abstract class ADBDriverBase {
+
+    static final int JDBC_MAJOR_VERSION = 4;
+
+    static final int JDBC_MINOR_VERSION = 2;
+
+    static final String JDBC_SCHEME = "jdbc:";
+
+    static final String LOGGING_PROPERTY_SUFFIX = ".log.stderr";
+
+    protected final String urlScheme;
+
+    protected final int defaultApiPort;
+
+    protected volatile ADBDriverContext context;
+
+    public ADBDriverBase(String driverScheme, int defaultApiPort) {
+        this.urlScheme = JDBC_SCHEME + Objects.requireNonNull(driverScheme);
+        this.defaultApiPort = defaultApiPort;
+    }
+
+    protected static void registerDriver(java.sql.Driver driver) {
+        try {
+            DriverManager.registerDriver(driver);
+        } catch (SQLException e) {
+            DriverManager.println(String.format("Error registering driver %s. %s", driver.getClass().getName(), e));
+        }
+    }
+
+    private static void parseConnectionProperties(List<NameValuePair> inProps1, Properties inProps2,
+            ADBDriverContext driverContext, Map<ADBDriverProperty, Object> outProperties, SQLWarning outWarning)
+            throws SQLException {
+        for (NameValuePair pair : inProps1) {
+            String name = pair.getName();
+            String value = pair.getValue();
+            parseConnectionProperty(name, value, driverContext, outProperties, outWarning);
+        }
+        if (inProps2 != null) {
+            for (Enumeration<?> en = inProps2.propertyNames(); en.hasMoreElements();) {
+                String name = en.nextElement().toString();
+                String value = inProps2.getProperty(name);
+                parseConnectionProperty(name, value, driverContext, outProperties, outWarning);
+            }
+        }
+    }
+
+    private static void parseConnectionProperty(String name, String textValue, ADBDriverContext driverContext,
+            Map<ADBDriverProperty, Object> outProperties, SQLWarning outWarning) throws SQLException {
+        ADBDriverProperty property = driverContext.supportedProperties.get(name);
+        if (property == null) {
+            outWarning.setNextWarning(new SQLWarning(driverContext.errorReporter.warningParameterNotSupported(name)));
+            return;
+        }
+        if (textValue == null || textValue.isEmpty()) {
+            return;
+        }
+        Object value;
+        try {
+            value = Objects.requireNonNull(property.getValueParser().apply(textValue));
+        } catch (RuntimeException e) {
+            throw driverContext.errorReporter.errorParameterValueNotSupported(name);
+        }
+        outProperties.put(property, value);
+    }
+
+    private static Logger getParentLogger(Class<?> driverClass) {
+        return Logger.getLogger(driverClass.getPackage().getName());
+    }
+
+    protected static void setupLogging(Class<? extends java.sql.Driver> driverClass) {
+        String logLevel = System.getProperty(driverClass.getPackage().getName() + LOGGING_PROPERTY_SUFFIX);
+        if (logLevel == null) {
+            return;
+        }
+        Level level;
+        try {
+            level = Boolean.TRUE.toString().equals(logLevel) ? Level.ALL : Level.parse(logLevel.toUpperCase());
+        } catch (IllegalArgumentException e) {
+            // ignore
+            return;
+        }
+
+        ConsoleHandler ch = new ConsoleHandler();
+        ch.setLevel(level);
+        Logger parentLogger = getParentLogger(driverClass);
+        parentLogger.setLevel(level);
+        parentLogger.addHandler(ch);
+    }
+
+    public boolean acceptsURL(String url) {
+        return url.startsWith(urlScheme);
+    }
+
+    public Connection connect(String url, Properties info) throws SQLException {
+        if (!acceptsURL(url)) {
+            return null;
+        }
+        URI subUri;
+        try {
+            subUri = new URI(url.substring(JDBC_SCHEME.length()));
+        } catch (URISyntaxException e) {
+            throw createErrorReporter().errorParameterValueNotSupported("URL");
+        }
+        String host = subUri.getHost();
+        if (host == null) {
+            throw createErrorReporter().errorParameterValueNotSupported("URL");
+        }
+        int port = subUri.getPort();
+        if (port <= 0) {
+            port = defaultApiPort;
+        }
+
+        String catalog = ADBProtocol.DEFAULT_DATAVERSE;
+        String schema = null;
+        String path = subUri.getPath();
+        if (path != null && path.length() > 1 && path.startsWith("/")) {
+            String[] dataverse = path.substring(1).split("/");
+            switch (dataverse.length) {
+                case 2:
+                    schema = dataverse[1];
+                    // fall thru to 1
+                case 1:
+                    catalog = dataverse[0];
+                    break;
+            }
+        }
+
+        List<NameValuePair> urlParams = URLEncodedUtils.parse(subUri, StandardCharsets.UTF_8);
+
+        ADBDriverContext driverContext = getOrCreateDriverContext();
+        Map<ADBDriverProperty, Object> properties = new HashMap<>();
+        SQLWarning warning = new SQLWarning();
+        parseConnectionProperties(urlParams, info, driverContext, properties, warning);
+        warning = warning.getNextWarning() != null ? warning.getNextWarning() : null;
+
+        ADBProtocol protocol = createProtocol(host, port, properties, driverContext);
+        try {
+            String databaseVersion = protocol.connect();
+            return createConnection(protocol, url, databaseVersion, catalog, schema, warning);
+        } catch (SQLException e) {
+            try {
+                protocol.close();
+            } catch (SQLException e2) {
+                e.addSuppressed(e2);
+            }
+            throw e;
+        }
+    }
+
+    public DriverPropertyInfo[] getPropertyInfo(String url, Properties info) {
+        Collection<ADBDriverProperty> supportedProperties = getOrCreateDriverContext().supportedProperties.values();
+        DriverPropertyInfo[] result = new DriverPropertyInfo[supportedProperties.size()];
+        int i = 0;
+        for (ADBDriverProperty property : supportedProperties) {
+            result[i++] = new DriverPropertyInfo(property.getPropertyName(),
+                    property.getDefaultValue() != null ? property.getDefaultValue().toString() : null);
+        }
+        return result;
+    }
+
+    public int getMajorVersion() {
+        return getOrCreateDriverContext().driverVersion.majorVersion;
+    }
+
+    public int getMinorVersion() {
+        return getOrCreateDriverContext().driverVersion.minorVersion;
+    }
+
+    public boolean jdbcCompliant() {
+        return false;
+    }
+
+    public Logger getParentLogger() {
+        return getParentLogger(getClass());
+    }
+
+    private ADBDriverContext getOrCreateDriverContext() {
+        ADBDriverContext ctx = context;
+        if (ctx == null) {
+            synchronized (this) {
+                ctx = context;
+                if (ctx == null) {
+                    context = ctx = createDriverContext();
+                }
+            }
+        }
+        return ctx;
+    }
+
+    protected ADBDriverContext createDriverContext() {
+        return new ADBDriverContext(getClass(), getDriverSupportedProperties(), createErrorReporter());
+    }
+
+    protected Collection<ADBDriverProperty> getDriverSupportedProperties() {
+        return Arrays.asList(ADBDriverProperty.Common.values());
+    }
+
+    protected ADBErrorReporter createErrorReporter() {
+        return new ADBErrorReporter();
+    }
+
+    protected ADBProtocol createProtocol(String host, int port, Map<ADBDriverProperty, Object> properties,
+            ADBDriverContext driverContext) throws SQLException {
+        return new ADBProtocol(host, port, properties, driverContext);
+    }
+
+    protected ADBConnection createConnection(ADBProtocol protocol, String url, String databaseVersion, String catalog,
+            String schema, SQLWarning connectWarning) {
+        return new ADBConnection(protocol, url, databaseVersion, catalog, schema, connectWarning);
+    }
+}
diff --git a/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBDriverContext.java b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBDriverContext.java
new file mode 100644
index 0000000..89f01e9
--- /dev/null
+++ b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBDriverContext.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.jdbc.core;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.logging.Logger;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectReader;
+import com.fasterxml.jackson.databind.ObjectWriter;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+
+final class ADBDriverContext {
+
+    final Class<? extends ADBDriverBase> driverClass;
+
+    final ADBErrorReporter errorReporter;
+
+    final ADBProductVersion driverVersion;
+
+    final Map<String, ADBDriverProperty> supportedProperties;
+
+    final Logger logger;
+
+    final ObjectReader genericObjectReader;
+
+    final ObjectWriter genericObjectWriter;
+
+    final ObjectReader admFormatObjectReader;
+
+    final ObjectWriter admFormatObjectWriter;
+
+    ADBDriverContext(Class<? extends ADBDriverBase> driverClass,
+            Collection<ADBDriverProperty> driverSupportedProperties, ADBErrorReporter errorReporter) {
+        this.driverClass = Objects.requireNonNull(driverClass);
+        this.errorReporter = Objects.requireNonNull(errorReporter);
+        this.logger = Logger.getLogger(driverClass.getName());
+        this.driverVersion = ADBProductVersion.parseDriverVersion(driverClass.getPackage());
+        this.supportedProperties = createPropertyIndexByName(driverSupportedProperties);
+
+        ObjectMapper genericObjectMapper = ADBProtocol.createObjectMapper();
+        this.genericObjectReader = genericObjectMapper.reader();
+        this.genericObjectWriter = genericObjectMapper.writer();
+        ObjectMapper admFormatObjectMapper = createADMFormatObjectMapper();
+        this.admFormatObjectReader = admFormatObjectMapper.reader();
+        this.admFormatObjectWriter = admFormatObjectMapper.writer();
+    }
+
+    protected ObjectMapper createADMFormatObjectMapper() {
+        ObjectMapper mapper = new ObjectMapper();
+        SimpleModule serdeModule = new SimpleModule(driverClass.getName());
+        ADBStatement.configureSerialization(serdeModule);
+        ADBRowStore.configureDeserialization(mapper, serdeModule);
+        mapper.registerModule(serdeModule);
+        return mapper;
+    }
+
+    private Map<String, ADBDriverProperty> createPropertyIndexByName(Collection<ADBDriverProperty> properties) {
+        Map<String, ADBDriverProperty> m = new LinkedHashMap<>();
+        for (ADBDriverProperty p : properties) {
+            m.put(p.getPropertyName(), p);
+        }
+        return Collections.unmodifiableMap(m);
+    }
+}
diff --git a/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBDriverProperty.java b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBDriverProperty.java
new file mode 100644
index 0000000..1d1e67e
--- /dev/null
+++ b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBDriverProperty.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.jdbc.core;
+
+import java.util.Objects;
+import java.util.function.Function;
+
+interface ADBDriverProperty {
+
+    String getPropertyName();
+
+    Function<String, ?> getValueParser();
+
+    Object getDefaultValue();
+
+    enum Common implements ADBDriverProperty {
+
+        USER("user", Function.identity(), null),
+        PASSWORD("password", Function.identity(), null),
+        CONNECT_TIMEOUT("connectTimeout", Integer::parseInt, null),
+        SOCKET_TIMEOUT("socketTimeout", Integer::parseInt, null),
+        MAX_WARNINGS("maxWarnings", Integer::parseInt, 10);
+
+        private final String propertyName;
+
+        private final Function<String, ?> valueParser;
+
+        private final Object defaultValue;
+
+        Common(String propertyName, Function<String, ?> valueParser, Object defaultValue) {
+            this.propertyName = Objects.requireNonNull(propertyName);
+            this.valueParser = Objects.requireNonNull(valueParser);
+            this.defaultValue = defaultValue;
+        }
+
+        @Override
+        public String getPropertyName() {
+            return propertyName;
+        }
+
+        public Function<String, ?> getValueParser() {
+            return valueParser;
+        }
+
+        public Object getDefaultValue() {
+            return defaultValue;
+        }
+
+        @Override
+        public String toString() {
+            return getPropertyName();
+        }
+    }
+}
diff --git a/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBErrorReporter.java b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBErrorReporter.java
new file mode 100644
index 0000000..f31e18a
--- /dev/null
+++ b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBErrorReporter.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.jdbc.core;
+
+import java.io.IOException;
+import java.sql.SQLClientInfoException;
+import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
+import java.sql.SQLInvalidAuthorizationSpecException;
+import java.sql.SQLNonTransientConnectionException;
+import java.sql.SQLTimeoutException;
+import java.sql.SQLTransientConnectionException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+import org.apache.http.conn.ConnectTimeoutException;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+
+final class ADBErrorReporter {
+
+    private static final List<Class<? extends IOException>> TRANSIENT_CONNECTION_ERRORS =
+            Arrays.asList(java.net.NoRouteToHostException.class, org.apache.http.NoHttpResponseException.class,
+                    org.apache.http.conn.HttpHostConnectException.class);
+
+    protected SQLException errorObjectClosed(Class<?> jdbcInterface) {
+        return new SQLException(String.format("%s is closed", jdbcInterface.getSimpleName()));
+    }
+
+    protected SQLFeatureNotSupportedException errorMethodNotSupported(Class<?> jdbcInterface, String methodName) {
+        return new SQLFeatureNotSupportedException(
+                String.format("Method %s.%s() is not supported", jdbcInterface.getName(), methodName));
+    }
+
+    protected SQLClientInfoException errorClientInfoMethodNotSupported(Class<?> jdbcInterface, String methodName) {
+        return new SQLClientInfoException(
+                String.format("Method %s.%s() is not supported", jdbcInterface.getName(), methodName),
+                Collections.emptyMap());
+    }
+
+    protected SQLException errorParameterNotSupported(String parameterName) {
+        return new SQLException(String.format("Unsupported parameter %s", parameterName));
+    }
+
+    protected String warningParameterNotSupported(String parameterName) {
+        return String.format("Unsupported parameter %s", parameterName);
+    }
+
+    protected SQLException errorParameterValueNotSupported(String parameterName) {
+        return new SQLException(String.format("Unsupported or invalid value of %s parameter", parameterName));
+    }
+
+    protected String warningParameterValueNotSupported(String parameterName) {
+        return String.format("Ignored unsupported or invalid value of %s parameter", parameterName);
+    }
+
+    protected SQLException errorIncompatibleMode(String mode) {
+        return new SQLException(String.format("Operation cannot be performed in %s mode", mode));
+    }
+
+    protected SQLException errorInProtocol() {
+        return new SQLNonTransientConnectionException("Protocol error", SQLState.CONNECTION_FAILURE.code);
+    }
+
+    protected SQLException errorInProtocol(String badValue) {
+        return new SQLNonTransientConnectionException(String.format("Protocol error. Unexpected %s", badValue),
+                SQLState.CONNECTION_FAILURE.code);
+    }
+
+    protected SQLException errorInProtocol(JsonProcessingException e) {
+        return new SQLNonTransientConnectionException(String.format("Protocol error. %s", getMessage(e)),
+                SQLState.CONNECTION_FAILURE.code, e);
+    }
+
+    protected SQLException errorInConnection(String badValue) {
+        return new SQLNonTransientConnectionException(String.format("Connection error. Unexpected %s", badValue),
+                SQLState.CONNECTION_FAILURE.code);
+    }
+
+    protected SQLException errorInConnection(IOException e) {
+        String message = String.format("Connection error. %s", getMessage(e));
+        return e instanceof ConnectTimeoutException ? errorTimeout(message, e)
+                : couldBeTransientConnectionError(e)
+                        ? new SQLTransientConnectionException(message, SQLState.CONNECTION_FAILURE.code, e)
+                        : new SQLNonTransientConnectionException(message, SQLState.CONNECTION_FAILURE.code, e);
+    }
+
+    protected SQLException errorClosingResource(IOException e) {
+        return new SQLException(String.format("Error closing resources. %s", getMessage(e)), e);
+    }
+
+    protected SQLInvalidAuthorizationSpecException errorAuth() {
+        return new SQLInvalidAuthorizationSpecException("Authentication/authorization error",
+                SQLState.CONNECTION_REJECTED.code);
+    }
+
+    protected SQLException errorColumnNotFound(String columnNameOrNumber) {
+        return new SQLException(String.format("Column %s was not found", columnNameOrNumber));
+    }
+
+    protected SQLException errorUnexpectedColumnValue(ADBDatatype type, String columnName) {
+        return new SQLException(
+                String.format("Unexpected value of type %s for column %s", type.getTypeName(), columnName));
+    }
+
+    protected SQLException errorUnwrapTypeMismatch(Class<?> iface) {
+        return new SQLException(String.format("Cannot unwrap to %s", iface.getName()));
+    }
+
+    protected SQLException errorInvalidStatementCategory() {
+        return new SQLException("Invalid statement category");
+    }
+
+    protected SQLException errorUnexpectedType(Class<?> type) {
+        return new SQLException(String.format("Unexpected type %s", type.getName()), SQLState.INVALID_DATE_TYPE.code);
+    }
+
+    protected SQLException errorUnexpectedType(byte typeTag) {
+        return new SQLException(String.format("Unexpected type %s", typeTag), SQLState.INVALID_DATE_TYPE.code);
+    }
+
+    protected SQLException errorUnexpectedType(ADBDatatype type) {
+        return new SQLException(String.format("Unexpected type %s", type.getTypeName()),
+                SQLState.INVALID_DATE_TYPE.code);
+    }
+
+    protected SQLException errorInvalidValueOfType(ADBDatatype type) {
+        return new SQLException(String.format("Invalid value of type %s", type), SQLState.INVALID_DATE_TYPE.code);
+    }
+
+    protected SQLException errorNoResult() {
+        return new SQLException("Result is unavailable");
+    }
+
+    protected SQLException errorBadResultSignature() {
+        return new SQLException("Cannot infer result columns");
+    }
+
+    protected SQLException errorNoCurrentRow() {
+        return new SQLException("No current row", SQLState.INVALID_CURSOR_POSITION.code);
+    }
+
+    protected SQLException errorInRequestGeneration(IOException e) {
+        return new SQLException(String.format("Cannot create request. %s", getMessage(e)), e);
+    }
+
+    protected SQLException errorInResultHandling(IOException e) {
+        return new SQLException(String.format("Cannot reading result. %s", getMessage(e)), e);
+    }
+
+    protected SQLTimeoutException errorTimeout() {
+        return new SQLTimeoutException();
+    }
+
+    protected SQLTimeoutException errorTimeout(String message, IOException cause) {
+        return new SQLTimeoutException(message, cause);
+    }
+
+    protected boolean couldBeTransientConnectionError(IOException e) {
+        if (e != null) {
+            for (Class<? extends IOException> c : TRANSIENT_CONNECTION_ERRORS) {
+                if (c.isInstance(e)) {
+                    return true;
+                }
+            }
+
+        }
+        return false;
+    }
+
+    protected String getMessage(Exception e) {
+        String message = e != null ? e.getMessage() : null;
+        return message != null ? message : "";
+    }
+
+    public enum SQLState {
+        CONNECTION_REJECTED("08004"),
+        CONNECTION_FAILURE("08006"),
+        INVALID_DATE_TYPE("HY004"),
+        INVALID_CURSOR_POSITION("HY108");
+
+        private final String code;
+
+        SQLState(String code) {
+            this.code = Objects.requireNonNull(code);
+        }
+    }
+}
diff --git a/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBMetaStatement.java b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBMetaStatement.java
new file mode 100644
index 0000000..7fa8127
--- /dev/null
+++ b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBMetaStatement.java
@@ -0,0 +1,350 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.jdbc.core;
+
+import java.sql.DatabaseMetaData;
+import java.sql.JDBCType;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.List;
+
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+public class ADBMetaStatement extends ADBStatement {
+
+    public static final String SCHEMALESS = "SCHEMALESS";
+    public static final String TABLE = "TABLE";
+    public static final String VIEW = "VIEW";
+
+    protected ADBMetaStatement(ADBConnection connection) {
+        super(connection, null, null);
+    }
+
+    ADBResultSet executeGetCatalogsQuery() throws SQLException {
+        checkClosed();
+
+        StringBuilder sql = new StringBuilder(256);
+
+        sql.append("select TABLE_CAT ");
+        sql.append("from Metadata.`Dataverse` ");
+        sql.append("let name = decode_dataverse_name(DataverseName), ");
+        sql.append("TABLE_CAT = name[0] ");
+        sql.append("where array_length(name) between 1 and 2 ");
+        sql.append("group by TABLE_CAT ");
+        sql.append("order by TABLE_CAT");
+
+        return executeQueryImpl(sql.toString(), null);
+    }
+
+    ADBResultSet executeGetSchemasQuery(String catalog, String schemaPattern) throws SQLException {
+        checkClosed();
+
+        StringBuilder sql = new StringBuilder(512);
+        sql.append("select TABLE_SCHEM, TABLE_CATALOG ");
+        sql.append("from Metadata.`Dataverse` ");
+        sql.append("let name = decode_dataverse_name(DataverseName), ");
+        sql.append("TABLE_CATALOG = name[0], ");
+        sql.append("TABLE_SCHEM = case array_length(name) when 1 then null else name[1] end ");
+        sql.append("where array_length(name) between 1 and 2 ");
+        if (catalog != null) {
+            sql.append("and TABLE_CATALOG = $1 ");
+        }
+        if (schemaPattern != null) {
+            sql.append("and if_null(TABLE_SCHEM, '') like $2 ");
+        }
+        sql.append("order by TABLE_CATALOG, TABLE_SCHEM");
+
+        return executeQueryImpl(sql.toString(), Arrays.asList(catalog, schemaPattern));
+    }
+
+    ADBResultSet executeGetTablesQuery(String catalog, String schemaPattern, String tableNamePattern, String[] types)
+            throws SQLException {
+        checkClosed();
+
+        String datasetTermTabular = getDatasetTerm(true);
+        String datasetTermNonTabular = getDatasetTerm(false);
+        String viewTermTabular = getViewTerm(true);
+        String viewTermNonTabular = getViewTerm(false);
+
+        StringBuilder sql = new StringBuilder(1024);
+        sql.append("select TABLE_CAT, TABLE_SCHEM, TABLE_NAME, TABLE_TYPE, null REMARKS, null TYPE_CAT, ");
+        sql.append("null TYPE_SCHEM, null TYPE_NAME, null SELF_REFERENCING_COL_NAME, null REF_GENERATION ");
+        sql.append("from Metadata.`Dataset` ds join Metadata.`Datatype` dt ");
+        sql.append("on ds.DatatypeDataverseName = dt.DataverseName and ds.DatatypeName = dt.DatatypeName ");
+        sql.append("let dvname = decode_dataverse_name(ds.DataverseName), ");
+        sql.append("isDataset = (ds.DatasetType = 'INTERNAL' or ds.DatasetType = 'EXTERNAL'), ");
+        sql.append("isView = ds.DatasetType = 'VIEW', ");
+        sql.append("hasFields = array_length(dt.Derived.Record.Fields) > 0, ");
+        sql.append("TABLE_CAT = dvname[0], ");
+        sql.append("TABLE_SCHEM = case array_length(dvname) when 1 then null else dvname[1] end, ");
+        sql.append("TABLE_NAME = ds.DatasetName, ");
+        sql.append("TABLE_TYPE = case ");
+        sql.append("when isDataset then (case when hasFields then '").append(datasetTermTabular).append("' else '")
+                .append(datasetTermNonTabular).append("' end) ");
+        sql.append("when isView then (case when hasFields then '").append(viewTermTabular).append("' else '")
+                .append(viewTermNonTabular).append("' end) ");
+        sql.append("else null end ");
+        sql.append("where array_length(dvname) between 1 and 2 ");
+        if (catalog != null) {
+            sql.append("and TABLE_CAT = $1 ");
+        }
+        if (schemaPattern != null) {
+            sql.append("and if_null(TABLE_SCHEM, '') like $2 ");
+        }
+        if (tableNamePattern != null) {
+            sql.append("and TABLE_NAME like $3 ");
+        }
+        sql.append("and TABLE_TYPE ").append(types != null ? "in $4" : "is not null").append(" ");
+        sql.append("order by TABLE_TYPE, TABLE_CAT, TABLE_SCHEM, TABLE_NAME");
+
+        return executeQueryImpl(sql.toString(),
+                Arrays.asList(catalog, schemaPattern, tableNamePattern, types != null ? Arrays.asList(types) : null));
+    }
+
+    ADBResultSet executeGetColumnsQuery(String catalog, String schemaPattern, String tableNamePattern,
+            String columnNamePattern) throws SQLException {
+        checkClosed();
+
+        StringBuilder sql = new StringBuilder(2048);
+        sql.append("select TABLE_CAT, TABLE_SCHEM, TABLE_NAME, COLUMN_NAME, DATA_TYPE, TYPE_NAME, COLUMN_SIZE, ");
+        sql.append("1 BUFFER_LENGTH, null DECIMAL_DIGITS, 2 NUM_PREC_RADIX, NULLABLE, ");
+        sql.append("null REMARKS, null COLUMN_DEF, DATA_TYPE SQL_DATA_TYPE,");
+        sql.append("0 SQL_DATETIME_SUB, COLUMN_SIZE CHAR_OCTET_LENGTH, ORDINAL_POSITION, ");
+        sql.append("case NULLABLE when 0 then 'NO' else 'YES' end IS_NULLABLE, ");
+        sql.append("null SCOPE_CATALOG, null SCOPE_SCHEMA, null SCOPE_TABLE, null SOURCE_DATA_TYPE, ");
+        sql.append("'NO' IS_AUTOINCREMENT, 'NO' IS_GENERATEDCOLUMN ");
+        sql.append("from Metadata.`Dataset` ds ");
+        sql.append("join Metadata.`Datatype` dt ");
+        sql.append("on ds.DatatypeDataverseName = dt.DataverseName and ds.DatatypeName = dt.DatatypeName ");
+        sql.append("unnest dt.Derived.Record.Fields as field at fieldpos ");
+        sql.append("left join Metadata.`Datatype` dt2 ");
+        sql.append(
+                "on field.FieldType = dt2.DatatypeName and ds.DataverseName = dt2.DataverseName and dt2.Derived is known ");
+        sql.append("let dvname = decode_dataverse_name(ds.DataverseName), ");
+        sql.append("TABLE_CAT = dvname[0], ");
+        sql.append("TABLE_SCHEM = case array_length(dvname) when 1 then null else dvname[1] end, ");
+        sql.append("TABLE_NAME = ds.DatasetName, ");
+        sql.append("COLUMN_NAME = field.FieldName, ");
+        sql.append("TYPE_NAME = case ");
+        for (ADBDatatype nestedType : new ADBDatatype[] { ADBDatatype.OBJECT, ADBDatatype.ARRAY,
+                ADBDatatype.MULTISET }) {
+            sql.append(String.format("when dt2.Derived.%s is known then '%s' ",
+                    ADBDatatype.getDerivedRecordName(nestedType), nestedType.getTypeName()));
+        }
+        sql.append("else field.FieldType end, ");
+        sql.append("DATA_TYPE = ");
+        sql.append("case TYPE_NAME ");
+        for (ADBDatatype type : ADBDatatype.values()) {
+            JDBCType jdbcType = type.getJdbcType();
+            if (type.isNullOrMissing() || jdbcType.equals(JDBCType.OTHER)) {
+                // will be handled by the 'else' clause
+                continue;
+            }
+            sql.append("when '").append(type.getTypeName()).append("' ");
+            sql.append("then ").append(jdbcType.getVendorTypeNumber()).append(" ");
+        }
+        sql.append("else ").append(JDBCType.OTHER.getVendorTypeNumber()).append(" end, ");
+
+        sql.append("COLUMN_SIZE = case field.FieldType when 'string' then 32767 else 8 end, "); // TODO:based on type
+        sql.append("ORDINAL_POSITION = fieldpos, ");
+        sql.append("NULLABLE = case when field.IsNullable or field.IsMissable then 1 else 0 end ");
+        sql.append("where array_length(dvname) between 1 and 2 ");
+
+        sql.append("and array_length(dt.Derived.Record.Fields) > 0 ");
+        if (catalog != null) {
+            sql.append("and TABLE_CAT = $1 ");
+        }
+        if (schemaPattern != null) {
+            sql.append("and if_null(TABLE_SCHEM, '') like $2 ");
+        }
+        if (tableNamePattern != null) {
+            sql.append("and TABLE_NAME like $3 ");
+        }
+        if (columnNamePattern != null) {
+            sql.append("and COLUMN_NAME like $4 ");
+        }
+        sql.append("order by TABLE_CAT, TABLE_SCHEM, TABLE_NAME, ORDINAL_POSITION");
+
+        return executeQueryImpl(sql.toString(),
+                Arrays.asList(catalog, schemaPattern, tableNamePattern, columnNamePattern));
+    }
+
+    ADBResultSet executeGetPrimaryKeysQuery(String catalog, String schema, String table) throws SQLException {
+        checkClosed();
+
+        StringBuilder sql = new StringBuilder(1024);
+        sql.append("select TABLE_CAT, TABLE_SCHEM, TABLE_NAME, COLUMN_NAME, KEY_SEQ, null PK_NAME ");
+        sql.append("from Metadata.`Dataset` ds unnest ds.InternalDetails.PrimaryKey pki at pkipos ");
+        sql.append("let dvname = decode_dataverse_name(ds.DataverseName), ");
+        sql.append("TABLE_CAT = dvname[0], ");
+        sql.append("TABLE_SCHEM = case array_length(dvname) when 1 then null else dvname[1] end, ");
+        sql.append("TABLE_NAME = ds.DatasetName, ");
+        sql.append("COLUMN_NAME = pki[0], ");
+        sql.append("KEY_SEQ = pkipos ");
+        sql.append("where array_length(dvname) between 1 and 2 ");
+        sql.append("and (every pk in ds.InternalDetails.PrimaryKey satisfies array_length(pk) = 1 end) ");
+        sql.append("and (every si in ds.InternalDetails.KeySourceIndicator satisfies si = 0 end ) ");
+        if (catalog != null) {
+            sql.append("and TABLE_CAT = $1 ");
+        }
+        if (schema != null) {
+            sql.append("and if_null(TABLE_SCHEM, '') like $2 ");
+        }
+        if (table != null) {
+            sql.append("and TABLE_NAME like $3 ");
+        }
+        sql.append("order by COLUMN_NAME");
+
+        return executeQueryImpl(sql.toString(), Arrays.asList(catalog, schema, table));
+    }
+
+    ADBResultSet executeGetTableTypesQuery() throws SQLException {
+        checkClosed();
+
+        LinkedHashSet<String> tableTypes = new LinkedHashSet<>();
+        tableTypes.add(getDatasetTerm(true));
+        tableTypes.add(getDatasetTerm(false));
+        tableTypes.add(getViewTerm(true));
+        tableTypes.add(getViewTerm(false));
+
+        List<ADBColumn> columns = Collections.singletonList(new ADBColumn("TABLE_TYPE", ADBDatatype.STRING, false));
+
+        AbstractValueSerializer stringSer = getADMFormatSerializer(String.class);
+        ArrayNode result = (ArrayNode) connection.protocol.driverContext.genericObjectReader.createArrayNode();
+        for (String tableType : tableTypes) {
+            result.addObject().put("TABLE_TYPE", stringSer.serializeToString(tableType));
+        }
+
+        return createSystemResultSet(columns, result);
+    }
+
+    ADBResultSet executeGetTypeInfoQuery() throws SQLException {
+        checkClosed();
+
+        AbstractValueSerializer int16Ser = getADMFormatSerializer(Short.class);
+        AbstractValueSerializer int32Ser = getADMFormatSerializer(Integer.class);
+        AbstractValueSerializer stringSer = getADMFormatSerializer(String.class);
+
+        List<ADBColumn> columns = new ArrayList<>();
+        columns.add(new ADBColumn("TYPE_NAME", ADBDatatype.STRING, false));
+        columns.add(new ADBColumn("DATA_TYPE", ADBDatatype.INTEGER, false));
+        columns.add(new ADBColumn("PRECISION", ADBDatatype.INTEGER, true));
+        columns.add(new ADBColumn("LITERAL_PREFIX", ADBDatatype.STRING, true));
+        columns.add(new ADBColumn("LITERAL_SUFFIX", ADBDatatype.STRING, true));
+        columns.add(new ADBColumn("CREATE_PARAMS", ADBDatatype.STRING, true));
+        columns.add(new ADBColumn("NULLABLE", ADBDatatype.SMALLINT, true));
+        columns.add(new ADBColumn("CASE_SENSITIVE", ADBDatatype.BOOLEAN, true));
+        columns.add(new ADBColumn("SEARCHABLE", ADBDatatype.SMALLINT, true));
+        columns.add(new ADBColumn("UNSIGNED_ATTRIBUTE", ADBDatatype.BOOLEAN, true));
+        columns.add(new ADBColumn("FIXED_PREC_SCALE", ADBDatatype.BOOLEAN, true));
+        columns.add(new ADBColumn("AUTO_INCREMENT", ADBDatatype.BOOLEAN, true));
+        columns.add(new ADBColumn("LOCAL_TYPE_NAME", ADBDatatype.STRING, true));
+        columns.add(new ADBColumn("MINIMUM_SCALE", ADBDatatype.SMALLINT, true));
+        columns.add(new ADBColumn("MAXIMUM_SCALE", ADBDatatype.SMALLINT, true));
+        columns.add(new ADBColumn("SQL_DATA_TYPE", ADBDatatype.INTEGER, true));
+        columns.add(new ADBColumn("SQL_DATETIME_SUB", ADBDatatype.INTEGER, true));
+        columns.add(new ADBColumn("NUM_PREC_RADIX", ADBDatatype.INTEGER, true));
+
+        ArrayNode result = (ArrayNode) connection.protocol.driverContext.genericObjectReader.createArrayNode();
+        populateTypeInfo(result.addObject(), ADBDatatype.BOOLEAN, 1, null, null, null, null, null, null, int16Ser,
+                int32Ser, stringSer);
+        populateTypeInfo(result.addObject(), ADBDatatype.TINYINT, 3, 10, 0, 0, false, null, null, int16Ser, int32Ser,
+                stringSer);
+        populateTypeInfo(result.addObject(), ADBDatatype.SMALLINT, 5, 10, 0, 0, false, null, null, int16Ser, int32Ser,
+                stringSer);
+        populateTypeInfo(result.addObject(), ADBDatatype.INTEGER, 10, 10, 0, 0, false, null, null, int16Ser, int32Ser,
+                stringSer);
+        populateTypeInfo(result.addObject(), ADBDatatype.BIGINT, 19, 10, 0, 0, false, null, null, int16Ser, int32Ser,
+                stringSer);
+        populateTypeInfo(result.addObject(), ADBDatatype.FLOAT, 7, 2, 0, 0, false, null, null, int16Ser, int32Ser,
+                stringSer);
+        populateTypeInfo(result.addObject(), ADBDatatype.DOUBLE, 15, 2, 0, 0, false, null, null, int16Ser, int32Ser,
+                stringSer);
+        populateTypeInfo(result.addObject(), ADBDatatype.DATE, 32, null, 0, 0, false, null, null, int16Ser, int32Ser,
+                stringSer); // TODO:precision
+        populateTypeInfo(result.addObject(), ADBDatatype.TIME, 32, null, 0, 0, false, null, null, int16Ser, int32Ser,
+                stringSer); // TODO:precision
+        populateTypeInfo(result.addObject(), ADBDatatype.DATETIME, 32, null, 0, 0, false, null, null, int16Ser,
+                int32Ser, stringSer); // TODO:precision
+        populateTypeInfo(result.addObject(), ADBDatatype.YEARMONTHDURATION, 32, null, 0, 0, false, null, null, int16Ser,
+                int32Ser, stringSer); // TODO:precision
+        populateTypeInfo(result.addObject(), ADBDatatype.DAYTIMEDURATION, 32, null, 0, 0, false, null, null, int16Ser,
+                int32Ser, stringSer); // TODO:precision
+        populateTypeInfo(result.addObject(), ADBDatatype.DURATION, 32, null, 0, 0, false, null, null, int16Ser,
+                int32Ser, stringSer); // TODO:precision
+        populateTypeInfo(result.addObject(), ADBDatatype.STRING, 32767, null, null, null, true, "'", "'", int16Ser,
+                int32Ser, stringSer);
+        populateTypeInfo(result.addObject(), ADBDatatype.ARRAY, 32767, null, 0, 0, false, null, null, int16Ser,
+                int32Ser, stringSer);
+        populateTypeInfo(result.addObject(), ADBDatatype.OBJECT, 32767, null, 0, 0, false, null, null, int16Ser,
+                int32Ser, stringSer);
+
+        return createSystemResultSet(columns, result);
+    }
+
+    private void populateTypeInfo(ObjectNode typeInfo, ADBDatatype type, int precision, Integer precisionRadix,
+            Integer minScale, Integer maxScale, Boolean searchable, String literalPrefix, String literalSuffix,
+            ADBPreparedStatement.AbstractValueSerializer int16Ser,
+            ADBPreparedStatement.AbstractValueSerializer int32Ser,
+            ADBPreparedStatement.AbstractValueSerializer stringSer) {
+        typeInfo.put("TYPE_NAME", stringSer.serializeToString(type.getTypeName()));
+        typeInfo.put("DATA_TYPE", int32Ser.serializeToString(type.getJdbcType().getVendorTypeNumber()));
+        typeInfo.put("PRECISION", int32Ser.serializeToString(precision));
+        typeInfo.put("LITERAL_PREFIX", literalPrefix != null ? stringSer.serializeToString(literalPrefix) : null);
+        typeInfo.put("LITERAL_SUFFIX", literalSuffix != null ? stringSer.serializeToString(literalSuffix) : null);
+        typeInfo.putNull("CREATE_PARAMS");
+        typeInfo.put("NULLABLE", int16Ser.serializeToString((short) DatabaseMetaData.typeNullable));
+        typeInfo.put("CASE_SENSITIVE", false);
+        typeInfo.put("SEARCHABLE",
+                int16Ser.serializeToString((short) (searchable == null ? DatabaseMetaData.typePredNone
+                        : searchable ? DatabaseMetaData.typeSearchable : DatabaseMetaData.typePredBasic)));
+        typeInfo.put("UNSIGNED_ATTRIBUTE", false);
+        typeInfo.put("FIXED_PREC_SCALE", false);
+        typeInfo.putNull("AUTO_INCREMENT");
+        typeInfo.putNull("LOCAL_TYPE_NAME");
+        typeInfo.put("MINIMUM_SCALE", minScale != null ? int16Ser.serializeToString(minScale.shortValue()) : null);
+        typeInfo.put("MAXIMUM_SCALE", maxScale != null ? int16Ser.serializeToString(maxScale.shortValue()) : null);
+        typeInfo.put("SQL_DATA_TYPE", int32Ser.serializeToString(type.getTypeTag()));
+        typeInfo.putNull("SQL_DATETIME_SUB");
+        typeInfo.put("NUM_PREC_RADIX", int32Ser.serializeToString(precisionRadix != null ? precisionRadix : 10));
+    }
+
+    ADBResultSet executeEmptyResultQuery() throws SQLException {
+        checkClosed();
+        return createEmptyResultSet();
+    }
+
+    @Override
+    ADBStatement getResultSetStatement(ADBResultSet rs) {
+        return null;
+    }
+
+    protected String getDatasetTerm(boolean tabular) {
+        return tabular ? TABLE : SCHEMALESS + " " + TABLE;
+    }
+
+    protected String getViewTerm(boolean tabular) {
+        return tabular ? VIEW : SCHEMALESS + " " + VIEW;
+    }
+}
diff --git a/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBParameterMetaData.java b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBParameterMetaData.java
new file mode 100644
index 0000000..d612b3f
--- /dev/null
+++ b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBParameterMetaData.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.jdbc.core;
+
+import java.sql.ParameterMetaData;
+import java.sql.Types;
+import java.util.Objects;
+
+final class ADBParameterMetaData extends ADBWrapperSupport implements ParameterMetaData {
+
+    final ADBPreparedStatement statement;
+
+    final int parameterCount;
+
+    public ADBParameterMetaData(ADBPreparedStatement statement, int parameterCount) {
+        this.statement = Objects.requireNonNull(statement);
+        this.parameterCount = parameterCount;
+    }
+
+    @Override
+    public int getParameterCount() {
+        return parameterCount;
+    }
+
+    @Override
+    public int getParameterMode(int parameterIndex) {
+        return parameterModeIn;
+    }
+
+    @Override
+    public int getParameterType(int parameterIndex) {
+        return Types.OTHER; // any
+    }
+
+    @Override
+    public String getParameterTypeName(int parameterIndex) {
+        return "";
+    }
+
+    @Override
+    public String getParameterClassName(int parameterIndex) {
+        return Object.class.getName();
+    }
+
+    @Override
+    public int isNullable(int parameterIndex) {
+        return parameterNullable;
+    }
+
+    @Override
+    public boolean isSigned(int parameterIndex) {
+        return false;
+    }
+
+    @Override
+    public int getPrecision(int parameterIndex) {
+        return 0;
+    }
+
+    @Override
+    public int getScale(int parameterIndex) {
+        return 0;
+    }
+
+    @Override
+    protected ADBErrorReporter getErrorReporter() {
+        return statement.getErrorReporter();
+    }
+}
diff --git a/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBPreparedStatement.java b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBPreparedStatement.java
new file mode 100644
index 0000000..98129fd
--- /dev/null
+++ b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBPreparedStatement.java
@@ -0,0 +1,496 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.jdbc.core;
+
+import java.io.InputStream;
+import java.io.Reader;
+import java.math.BigDecimal;
+import java.net.URL;
+import java.sql.Array;
+import java.sql.Blob;
+import java.sql.Clob;
+import java.sql.NClob;
+import java.sql.ParameterMetaData;
+import java.sql.PreparedStatement;
+import java.sql.Ref;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.RowId;
+import java.sql.SQLException;
+import java.sql.SQLType;
+import java.sql.SQLXML;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.List;
+
+final class ADBPreparedStatement extends ADBStatement implements PreparedStatement {
+
+    final String sql;
+
+    final List<Object> args;
+
+    final List<ADBColumn> resultColumns;
+
+    ADBPreparedStatement(ADBConnection connection, String sql, String catalog, String schema) throws SQLException {
+        super(connection, catalog, schema);
+        // TODO:timeout
+        ADBProtocol.QueryServiceResponse response =
+                connection.protocol.submitStatement(sql, null, false, true, 0, catalog, schema);
+        int parameterCount = connection.protocol.getStatementParameterCount(response);
+        boolean isQuery = connection.protocol.isStatementCategory(response,
+                ADBProtocol.QueryServiceResponse.StatementCategory.QUERY);
+        List<ADBColumn> columns = isQuery ? connection.protocol.getColumns(response) : Collections.emptyList();
+        this.sql = sql;
+        this.args = Arrays.asList(new Object[parameterCount]);
+        this.resultColumns = columns;
+    }
+
+    // Metadata
+
+    @Override
+    public ParameterMetaData getParameterMetaData() throws SQLException {
+        checkClosed();
+        return new ADBParameterMetaData(this, args.size());
+    }
+
+    @Override
+    public ResultSetMetaData getMetaData() throws SQLException {
+        checkClosed();
+        return new ADBResultSetMetaData(this, resultColumns);
+    }
+
+    // Execution
+
+    @Override
+    public ResultSet executeQuery() throws SQLException {
+        checkClosed();
+        return executeQueryImpl(sql, args);
+    }
+
+    @Override
+    public ADBResultSet executeQuery(String sql) throws SQLException {
+        // Prohibited on PreparedStatement
+        throw getErrorReporter().errorMethodNotSupported(PreparedStatement.class, "executeQuery");
+    }
+
+    @Override
+    public long executeLargeUpdate() throws SQLException {
+        return executeUpdateImpl(sql, args);
+    }
+
+    @Override
+    public int executeUpdate() throws SQLException {
+        return executeUpdateImpl(sql, args);
+    }
+
+    @Override
+    public long executeLargeUpdate(String sql) throws SQLException {
+        // Prohibited on PreparedStatement
+        throw getErrorReporter().errorMethodNotSupported(PreparedStatement.class, "executeLargeUpdate");
+    }
+
+    @Override
+    public int executeUpdate(String sql) throws SQLException {
+        // Prohibited on PreparedStatement
+        throw getErrorReporter().errorMethodNotSupported(PreparedStatement.class, "executeUpdate");
+    }
+
+    @Override
+    public long executeLargeUpdate(String sql, int autoGeneratedKeys) throws SQLException {
+        // Prohibited on PreparedStatement
+        throw getErrorReporter().errorMethodNotSupported(PreparedStatement.class, "executeLargeUpdate");
+    }
+
+    @Override
+    public int executeUpdate(String sql, int autoGeneratedKeys) throws SQLException {
+        // Prohibited on PreparedStatement
+        throw getErrorReporter().errorMethodNotSupported(PreparedStatement.class, "executeUpdate");
+    }
+
+    @Override
+    public long executeLargeUpdate(String sql, int[] columnIndexes) throws SQLException {
+        // Prohibited on PreparedStatement
+        throw getErrorReporter().errorMethodNotSupported(PreparedStatement.class, "executeLargeUpdate");
+    }
+
+    @Override
+    public int executeUpdate(String sql, int[] columnIndexes) throws SQLException {
+        // Prohibited on PreparedStatement
+        throw getErrorReporter().errorMethodNotSupported(PreparedStatement.class, "executeUpdate");
+    }
+
+    @Override
+    public long executeLargeUpdate(String sql, String[] columnNames) throws SQLException {
+        // Prohibited on PreparedStatement
+        throw getErrorReporter().errorMethodNotSupported(PreparedStatement.class, "executeLargeUpdate");
+    }
+
+    @Override
+    public int executeUpdate(String sql, String[] columnNames) throws SQLException {
+        // Prohibited on PreparedStatement
+        throw getErrorReporter().errorMethodNotSupported(PreparedStatement.class, "executeUpdate");
+    }
+
+    @Override
+    public boolean execute() throws SQLException {
+        return executeImpl(sql, args);
+    }
+
+    @Override
+    public boolean execute(String sql) throws SQLException {
+        // Prohibited on PreparedStatement
+        throw getErrorReporter().errorMethodNotSupported(PreparedStatement.class, "execute");
+    }
+
+    @Override
+    public boolean execute(String sql, int autoGeneratedKeys) throws SQLException {
+        // Prohibited on PreparedStatement
+        throw getErrorReporter().errorMethodNotSupported(PreparedStatement.class, "execute");
+    }
+
+    @Override
+    public boolean execute(String sql, int[] columnIndexes) throws SQLException {
+        // Prohibited on PreparedStatement
+        throw getErrorReporter().errorMethodNotSupported(PreparedStatement.class, "execute");
+    }
+
+    @Override
+    public boolean execute(String sql, String[] columnNames) throws SQLException {
+        // Prohibited on PreparedStatement
+        throw getErrorReporter().errorMethodNotSupported(PreparedStatement.class, "execute");
+    }
+
+    @Override
+    public void addBatch() throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(PreparedStatement.class, "addBatch");
+    }
+
+    @Override
+    public void setEscapeProcessing(boolean enable) throws SQLException {
+        checkClosed();
+        // ignore as the statement has already been parsed
+    }
+
+    // Parameter bindings
+
+    @Override
+    public void clearParameters() throws SQLException {
+        checkClosed();
+        for (int i = 0, n = args.size(); i < n; i++) {
+            args.set(i, null);
+        }
+    }
+
+    private int argIndex(int parameterIndex) throws SQLException {
+        boolean ok = 0 < parameterIndex && parameterIndex <= args.size();
+        if (!ok) {
+            throw getErrorReporter().errorParameterValueNotSupported("parameterIndex");
+        }
+        return parameterIndex - 1;
+    }
+
+    private void setArg(int parameterIndex, Object v) throws SQLException {
+        args.set(argIndex(parameterIndex), v);
+    }
+
+    // Basic types
+
+    @Override
+    public void setNull(int parameterIndex, int sqlType) throws SQLException {
+        checkClosed();
+        setArg(parameterIndex, null);
+    }
+
+    @Override
+    public void setNull(int parameterIndex, int sqlType, String typeName) throws SQLException {
+        checkClosed();
+        setNull(parameterIndex, sqlType);
+    }
+
+    @Override
+    public void setBoolean(int parameterIndex, boolean v) throws SQLException {
+        checkClosed();
+        setArg(parameterIndex, v);
+    }
+
+    @Override
+    public void setByte(int parameterIndex, byte v) throws SQLException {
+        checkClosed();
+        setArg(parameterIndex, v);
+    }
+
+    @Override
+    public void setShort(int parameterIndex, short v) throws SQLException {
+        checkClosed();
+        setArg(parameterIndex, v);
+    }
+
+    @Override
+    public void setInt(int parameterIndex, int v) throws SQLException {
+        checkClosed();
+        setArg(parameterIndex, v);
+    }
+
+    @Override
+    public void setLong(int parameterIndex, long v) throws SQLException {
+        checkClosed();
+        setArg(parameterIndex, v);
+    }
+
+    @Override
+    public void setFloat(int parameterIndex, float v) throws SQLException {
+        checkClosed();
+        setArg(parameterIndex, v);
+    }
+
+    @Override
+    public void setDouble(int parameterIndex, double v) throws SQLException {
+        checkClosed();
+        setArg(parameterIndex, v);
+    }
+
+    @Override
+    public void setBigDecimal(int parameterIndex, BigDecimal v) throws SQLException {
+        checkClosed();
+        setArg(parameterIndex, v);
+    }
+
+    @Override
+    public void setString(int parameterIndex, String v) throws SQLException {
+        checkClosed();
+        setArg(parameterIndex, v);
+    }
+
+    @Override
+    public void setNString(int parameterIndex, String v) throws SQLException {
+        checkClosed();
+        setArg(parameterIndex, v);
+    }
+
+    @Override
+    public void setDate(int parameterIndex, java.sql.Date v) throws SQLException {
+        checkClosed();
+        setArg(parameterIndex, v);
+    }
+
+    @Override
+    public void setDate(int parameterIndex, java.sql.Date v, Calendar cal) throws SQLException {
+        checkClosed();
+        setDate(parameterIndex, v);
+    }
+
+    @Override
+    public void setTime(int parameterIndex, java.sql.Time v) throws SQLException {
+        checkClosed();
+        setArg(parameterIndex, v);
+    }
+
+    @Override
+    public void setTime(int parameterIndex, java.sql.Time v, Calendar cal) throws SQLException {
+        checkClosed();
+        setTime(parameterIndex, v);
+    }
+
+    @Override
+    public void setTimestamp(int parameterIndex, java.sql.Timestamp v) throws SQLException {
+        checkClosed();
+        setArg(parameterIndex, v);
+    }
+
+    @Override
+    public void setTimestamp(int parameterIndex, java.sql.Timestamp v, Calendar cal) throws SQLException {
+        checkClosed();
+        setTimestamp(parameterIndex, v);
+    }
+
+    // Generic (setObject)
+
+    @Override
+    public void setObject(int parameterIndex, Object v) throws SQLException {
+        checkClosed();
+        if (v == null || isSetObjectCompatible(v.getClass())) {
+            setArg(parameterIndex, v);
+        } else {
+            throw getErrorReporter().errorParameterValueNotSupported("object");
+        }
+    }
+
+    @Override
+    public void setObject(int parameterIndex, Object v, int targetSqlType) throws SQLException {
+        setObject(parameterIndex, v); // TODO:revisit
+    }
+
+    @Override
+    public void setObject(int parameterIndex, Object v, SQLType targetSqlType) throws SQLException {
+        setObject(parameterIndex, v, targetSqlType.getVendorTypeNumber()); // TODO:revisit
+    }
+
+    @Override
+    public void setObject(int parameterIndex, Object v, int targetSqlType, int scaleOrLength) throws SQLException {
+        setObject(parameterIndex, v, targetSqlType); // TODO:revisit
+    }
+
+    @Override
+    public void setObject(int parameterIndex, Object v, SQLType targetSqlType, int scaleOrLength) throws SQLException {
+        setObject(parameterIndex, v, targetSqlType.getVendorTypeNumber()); // TODO:revisit
+    }
+
+    // Unsupported
+
+    @Override
+    public void setBytes(int parameterIndex, byte[] v) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(PreparedStatement.class, "setBytes");
+    }
+
+    @Override
+    public void setRef(int parameterIndex, Ref x) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(PreparedStatement.class, "setRef");
+    }
+
+    @Override
+    public void setRowId(int parameterIndex, RowId x) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(PreparedStatement.class, "setRowId");
+    }
+
+    @Override
+    public void setURL(int parameterIndex, URL v) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(PreparedStatement.class, "setURL");
+    }
+
+    // Unsupported - streams
+
+    @Override
+    public void setAsciiStream(int parameterIndex, InputStream v) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(PreparedStatement.class, "setAsciiStream");
+    }
+
+    @Override
+    public void setAsciiStream(int parameterIndex, InputStream x, int length) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(PreparedStatement.class, "setAsciiStream");
+    }
+
+    @Override
+    public void setAsciiStream(int parameterIndex, InputStream v, long length) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(PreparedStatement.class, "setAsciiStream");
+    }
+
+    @Override
+    public void setBinaryStream(int parameterIndex, InputStream x) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(PreparedStatement.class, "setBinaryStream");
+    }
+
+    @Override
+    public void setBinaryStream(int parameterIndex, InputStream x, int length) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(PreparedStatement.class, "setBinaryStream");
+    }
+
+    @Override
+    public void setBinaryStream(int parameterIndex, InputStream x, long length) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(PreparedStatement.class, "setBinaryStream");
+    }
+
+    @Override
+    public void setCharacterStream(int parameterIndex, Reader reader) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(PreparedStatement.class, "setCharacterStream");
+    }
+
+    @Override
+    public void setCharacterStream(int parameterIndex, Reader reader, long length) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(PreparedStatement.class, "setCharacterStream");
+    }
+
+    @Override
+    public void setCharacterStream(int parameterIndex, Reader reader, int length) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(PreparedStatement.class, "setCharacterStream");
+    }
+
+    @Override
+    public void setNCharacterStream(int parameterIndex, Reader value) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(PreparedStatement.class, "setNCharacterStream");
+    }
+
+    @Override
+    public void setNCharacterStream(int parameterIndex, Reader value, long length) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(PreparedStatement.class, "setNCharacterStream");
+    }
+
+    @Override
+    public void setUnicodeStream(int parameterIndex, InputStream x, int length) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(PreparedStatement.class, "setUnicodeStream");
+    }
+
+    // Unsupported - LOB, Array, SQLXML
+
+    @Override
+    public void setArray(int parameterIndex, Array x) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(PreparedStatement.class, "setArray");
+    }
+
+    @Override
+    public void setBlob(int parameterIndex, Blob x) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(PreparedStatement.class, "setBlob");
+    }
+
+    @Override
+    public void setBlob(int parameterIndex, InputStream inputStream) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(PreparedStatement.class, "setBlob");
+    }
+
+    @Override
+    public void setBlob(int parameterIndex, InputStream inputStream, long length) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(PreparedStatement.class, "setBlob");
+    }
+
+    @Override
+    public void setClob(int parameterIndex, Clob x) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(PreparedStatement.class, "setClob");
+    }
+
+    @Override
+    public void setClob(int parameterIndex, Reader reader) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(PreparedStatement.class, "setClob");
+    }
+
+    @Override
+    public void setClob(int parameterIndex, Reader reader, long length) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(PreparedStatement.class, "setClob");
+    }
+
+    @Override
+    public void setNClob(int parameterIndex, NClob value) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(PreparedStatement.class, "setNClob");
+    }
+
+    @Override
+    public void setNClob(int parameterIndex, Reader reader) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(PreparedStatement.class, "setNClob");
+    }
+
+    @Override
+    public void setNClob(int parameterIndex, Reader reader, long length) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(PreparedStatement.class, "setNClob");
+    }
+
+    @Override
+    public void setSQLXML(int parameterIndex, SQLXML xmlObject) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(PreparedStatement.class, "setSQLXML");
+    }
+}
diff --git a/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBProductVersion.java b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBProductVersion.java
new file mode 100644
index 0000000..5255d75
--- /dev/null
+++ b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBProductVersion.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.jdbc.core;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+final class ADBProductVersion {
+
+    public static final String ASTERIXDB = "Apache AsterixDB";
+
+    private static final Pattern DATABASE_VERSION_PATTERN =
+            Pattern.compile("(?<name>[^/]+)(?:/(?<ver>(?:(?<vermj>\\d+)(?:\\.(?<vermn>\\d+))?)?.*))?");
+
+    final String productName;
+
+    final String productVersion;
+
+    final int majorVersion;
+
+    final int minorVersion;
+
+    private ADBProductVersion(String productName, String productVersion, int majorVersion, int minorVersion) {
+        this.productName = productName != null ? productName : ASTERIXDB;
+        this.productVersion = productVersion != null ? productVersion : majorVersion + "." + minorVersion;
+        this.majorVersion = majorVersion;
+        this.minorVersion = minorVersion;
+    }
+
+    static ADBProductVersion parseDriverVersion(Package driverPackage) {
+        int majorVersion = 0, minorVersion = 0;
+        String productName = driverPackage.getImplementationTitle();
+        if (productName == null) {
+            productName = ASTERIXDB;
+        }
+        String productVersion = driverPackage.getImplementationVersion();
+        if (productVersion != null) {
+            String[] v = productVersion.split("\\.");
+            try {
+                majorVersion = Integer.parseInt(v[0]);
+                if (v.length > 1) {
+                    minorVersion = Integer.parseInt(v[1]);
+                }
+            } catch (NumberFormatException e) {
+                // ignore
+            }
+        }
+        return new ADBProductVersion(productName, productVersion, majorVersion, minorVersion);
+    }
+
+    static ADBProductVersion parseDatabaseVersion(String serverVersion) {
+        String dbProductName = null;
+        String dbProductVersion = null;
+        int dbMajorVersion = 0;
+        int dbMinorVersion = 0;
+        if (serverVersion != null) {
+            Matcher m = DATABASE_VERSION_PATTERN.matcher(serverVersion);
+            if (m.matches()) {
+                dbProductName = m.group("name");
+                dbProductVersion = m.group("ver");
+                String vermj = m.group("vermj");
+                String vermn = m.group("vermn");
+                if (vermj != null) {
+                    try {
+                        dbMajorVersion = Integer.parseInt(vermj);
+                    } catch (NumberFormatException e) {
+                        // ignore (overflow)
+                    }
+                }
+                if (vermn != null) {
+                    try {
+                        dbMinorVersion = Integer.parseInt(vermn);
+                    } catch (NumberFormatException e) {
+                        // ignore (overflow)
+                    }
+                }
+            }
+        }
+        return new ADBProductVersion(dbProductName, dbProductVersion, dbMajorVersion, dbMinorVersion);
+    }
+}
diff --git a/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBProtocol.java b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBProtocol.java
new file mode 100644
index 0000000..e8a36f5
--- /dev/null
+++ b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBProtocol.java
@@ -0,0 +1,641 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.jdbc.core;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayOutputStream;
+import java.io.Closeable;
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.StringReader;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.sql.SQLException;
+import java.sql.SQLWarning;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.http.Header;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpHeaders;
+import org.apache.http.HttpHost;
+import org.apache.http.HttpStatus;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.AuthCache;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpOptions;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.protocol.HttpClientContext;
+import org.apache.http.config.SocketConfig;
+import org.apache.http.conn.HttpClientConnectionManager;
+import org.apache.http.entity.ContentProducer;
+import org.apache.http.entity.ContentType;
+import org.apache.http.entity.EntityTemplate;
+import org.apache.http.impl.auth.BasicScheme;
+import org.apache.http.impl.client.BasicAuthCache;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
+import org.apache.http.message.BasicNameValuePair;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.PropertyAccessor;
+import com.fasterxml.jackson.core.JsonEncoding;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.MapperFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.databind.exc.InvalidDefinitionException;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+
+public class ADBProtocol {
+
+    private static final String QUERY_ENDPOINT_PATH = "/query/service";
+    private static final String QUERY_RESULT_ENDPOINT_PATH = "/query/service/result";
+
+    private static final String STATEMENT = "statement";
+    private static final String ARGS = "args";
+    private static final String MODE = "mode";
+    private static final String READ_ONLY = "readonly";
+    private static final String DATAVERSE = "dataverse";
+    private static final String TIMEOUT = "timeout";
+    private static final String SIGNATURE = "signature";
+    private static final String COMPILE_ONLY = "compile-only";
+    private static final String CLIENT_TYPE = "client-type";
+    private static final String PLAN_FORMAT = "plan-format";
+    private static final String MAX_WARNINGS = "max-warnings";
+
+    private static final String MODE_DEFERRED = "deferred";
+    private static final String CLIENT_TYPE_JDBC = "jdbc";
+    private static final String RESULTS = "results";
+    private static final String FORMAT_LOSSLESS_ADM = "lossless-adm";
+    private static final String PLAN_FORMAT_STRING = "string";
+
+    private static final String OPTIONAL_TYPE_SUFFIX = "?";
+    static final char TEXT_DELIMITER = ':';
+    static final String EXPLAIN_ONLY_RESULT_COLUMN_NAME = "$1";
+    static final String DEFAULT_DATAVERSE = "Default";
+
+    final ADBDriverContext driverContext;
+    final HttpClientConnectionManager httpConnectionManager;
+    final HttpClientContext httpClientContext;
+    final CloseableHttpClient httpClient;
+    final URI queryEndpoint;
+    final URI queryResultEndpoint;
+    final String user;
+    final int maxWarnings;
+
+    protected ADBProtocol(String host, int port, Map<ADBDriverProperty, Object> params, ADBDriverContext driverContext)
+            throws SQLException {
+        URI queryEndpoint = createEndpointUri(host, port, QUERY_ENDPOINT_PATH, driverContext.errorReporter);
+        URI queryResultEndpoint =
+                createEndpointUri(host, port, QUERY_RESULT_ENDPOINT_PATH, driverContext.errorReporter);
+        PoolingHttpClientConnectionManager httpConnectionManager = new PoolingHttpClientConnectionManager();
+        int maxConnections = Math.max(16, Runtime.getRuntime().availableProcessors());
+        httpConnectionManager.setDefaultMaxPerRoute(maxConnections);
+        httpConnectionManager.setMaxTotal(maxConnections);
+        SocketConfig.Builder socketConfigBuilder = null;
+        Number socketTimeoutMillis = (Number) params.get(ADBDriverProperty.Common.SOCKET_TIMEOUT);
+        if (socketTimeoutMillis != null) {
+            socketConfigBuilder = SocketConfig.custom();
+            socketConfigBuilder.setSoTimeout(socketTimeoutMillis.intValue());
+        }
+        if (socketConfigBuilder != null) {
+            httpConnectionManager.setDefaultSocketConfig(socketConfigBuilder.build());
+        }
+        RequestConfig.Builder requestConfigBuilder = RequestConfig.custom();
+        Number connectTimeoutMillis = (Number) params.get(ADBDriverProperty.Common.CONNECT_TIMEOUT);
+        if (connectTimeoutMillis != null) {
+            requestConfigBuilder.setConnectionRequestTimeout(connectTimeoutMillis.intValue());
+            requestConfigBuilder.setConnectTimeout(connectTimeoutMillis.intValue());
+        }
+        if (socketTimeoutMillis != null) {
+            requestConfigBuilder.setSocketTimeout(socketTimeoutMillis.intValue());
+        }
+        RequestConfig requestConfig = requestConfigBuilder.build();
+
+        HttpClientBuilder httpClientBuilder = HttpClientBuilder.create();
+        httpClientBuilder.setConnectionManager(httpConnectionManager);
+        httpClientBuilder.setConnectionManagerShared(true);
+        httpClientBuilder.setDefaultRequestConfig(requestConfig);
+        String user = (String) params.get(ADBDriverProperty.Common.USER);
+        if (user != null) {
+            String password = (String) params.get(ADBDriverProperty.Common.PASSWORD);
+            httpClientBuilder.setDefaultCredentialsProvider(createCredentialsProvider(user, password));
+        }
+
+        Number maxWarnings = ((Number) params.getOrDefault(ADBDriverProperty.Common.MAX_WARNINGS,
+                ADBDriverProperty.Common.MAX_WARNINGS.getDefaultValue()));
+
+        this.user = user;
+        this.queryEndpoint = queryEndpoint;
+        this.queryResultEndpoint = queryResultEndpoint;
+        this.httpConnectionManager = httpConnectionManager;
+        this.httpClient = httpClientBuilder.build();
+        this.httpClientContext = createHttpClientContext(queryEndpoint);
+        this.driverContext = Objects.requireNonNull(driverContext);
+        this.maxWarnings = Math.max(maxWarnings.intValue(), 0);
+    }
+
+    private static URI createEndpointUri(String host, int port, String path, ADBErrorReporter errorReporter)
+            throws SQLException {
+        try {
+            return new URI("http", null, host, port, path, null, null);
+        } catch (URISyntaxException e) {
+            throw errorReporter.errorParameterValueNotSupported("endpoint " + host + ":" + port);
+        }
+    }
+
+    private static CredentialsProvider createCredentialsProvider(String user, String password) {
+        CredentialsProvider cp = new BasicCredentialsProvider();
+        cp.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(user, password));
+        return cp;
+    }
+
+    void close() throws SQLException {
+        try {
+            httpClient.close();
+        } catch (IOException e) {
+            throw getErrorReporter().errorClosingResource(e);
+        } finally {
+            httpConnectionManager.shutdown();
+        }
+    }
+
+    String connect() throws SQLException {
+        String databaseVersion = pingImpl(-1, true); // TODO:review timeout
+        if (getLogger().isLoggable(Level.FINE)) {
+            getLogger().log(Level.FINE, String.format("connected to '%s' at %s", databaseVersion, queryEndpoint));
+        }
+        return databaseVersion;
+    }
+
+    boolean ping(int timeoutSeconds) {
+        try {
+            pingImpl(timeoutSeconds, false);
+            return true;
+        } catch (SQLException e) {
+            return false;
+        }
+    }
+
+    private String pingImpl(int timeoutSeconds, boolean fetchDatabaseVersion) throws SQLException {
+        //TODO: support timeoutSeconds: -1 = use default, 0 = indefinite ?
+        HttpOptions httpOptions = new HttpOptions(queryEndpoint);
+        try (CloseableHttpResponse response = httpClient.execute(httpOptions, httpClientContext)) {
+            int statusCode = response.getStatusLine().getStatusCode();
+            switch (statusCode) {
+                case HttpStatus.SC_OK:
+                    String databaseVersion = null;
+                    if (fetchDatabaseVersion) {
+                        Header serverHeader = response.getFirstHeader(HttpHeaders.SERVER);
+                        if (serverHeader != null) {
+                            databaseVersion = serverHeader.getValue();
+                        }
+                    }
+                    return databaseVersion;
+                case HttpStatus.SC_UNAUTHORIZED:
+                case HttpStatus.SC_FORBIDDEN:
+                    throw getErrorReporter().errorAuth();
+                default:
+                    throw getErrorReporter().errorInConnection(String.valueOf(response.getStatusLine()));
+            }
+        } catch (IOException e) {
+            throw getErrorReporter().errorInConnection(e);
+        }
+    }
+
+    QueryServiceResponse submitStatement(String sql, List<?> args, boolean forceReadOnly, boolean compileOnly,
+            int timeoutSeconds, String catalog, String schema) throws SQLException {
+        HttpPost httpPost = new HttpPost(queryEndpoint);
+        httpPost.setHeader(HttpHeaders.ACCEPT, ContentType.APPLICATION_JSON
+                .withParameters(new BasicNameValuePair(FORMAT_LOSSLESS_ADM, Boolean.TRUE.toString())).toString());
+
+        ByteArrayOutputStreamImpl baos = new ByteArrayOutputStreamImpl(512);
+        try {
+            JsonGenerator jsonGen = driverContext.genericObjectWriter.createGenerator(baos, JsonEncoding.UTF8);
+            jsonGen.writeStartObject();
+            jsonGen.writeStringField(CLIENT_TYPE, CLIENT_TYPE_JDBC);
+            jsonGen.writeStringField(MODE, MODE_DEFERRED);
+            jsonGen.writeStringField(STATEMENT, sql);
+            jsonGen.writeBooleanField(SIGNATURE, true);
+            jsonGen.writeStringField(PLAN_FORMAT, PLAN_FORMAT_STRING);
+            jsonGen.writeNumberField(MAX_WARNINGS, maxWarnings);
+            if (compileOnly) {
+                jsonGen.writeBooleanField(COMPILE_ONLY, true);
+            }
+            if (forceReadOnly) {
+                jsonGen.writeBooleanField(READ_ONLY, true);
+            }
+            if (timeoutSeconds > 0) {
+                jsonGen.writeStringField(TIMEOUT, timeoutSeconds + "s");
+            }
+            if (catalog != null) {
+                jsonGen.writeStringField(DATAVERSE, schema != null ? catalog + "/" + schema : catalog);
+            }
+            if (args != null && !args.isEmpty()) {
+                jsonGen.writeFieldName(ARGS);
+                driverContext.admFormatObjectWriter.writeValue(jsonGen, args);
+            }
+            jsonGen.writeEndObject();
+            jsonGen.flush();
+        } catch (InvalidDefinitionException e) {
+            throw getErrorReporter().errorUnexpectedType(e.getType().getRawClass());
+        } catch (IOException e) {
+            throw getErrorReporter().errorInRequestGeneration(e);
+        }
+
+        System.err.printf("<ADB_DRIVER_SQL>%n%s%n</ADB_DRIVER_SQL>%n", sql);
+
+        if (getLogger().isLoggable(Level.FINE)) {
+            getLogger().log(Level.FINE, String.format("%s { %s } with args { %s }", compileOnly ? "compile" : "execute",
+                    sql, args != null ? args : ""));
+        }
+
+        httpPost.setEntity(new EntityTemplateImpl(baos, ContentType.APPLICATION_JSON));
+        try (CloseableHttpResponse httpResponse = httpClient.execute(httpPost, httpClientContext)) {
+            return handlePostQueryResponse(httpResponse);
+        } catch (JsonProcessingException e) {
+            throw getErrorReporter().errorInProtocol(e);
+        } catch (IOException e) {
+            throw getErrorReporter().errorInConnection(e);
+        }
+    }
+
+    private QueryServiceResponse handlePostQueryResponse(CloseableHttpResponse httpResponse)
+            throws SQLException, IOException {
+        int httpStatus = httpResponse.getStatusLine().getStatusCode();
+        switch (httpStatus) {
+            case HttpStatus.SC_OK:
+            case HttpStatus.SC_BAD_REQUEST:
+            case HttpStatus.SC_INTERNAL_SERVER_ERROR:
+            case HttpStatus.SC_SERVICE_UNAVAILABLE:
+                break;
+            case HttpStatus.SC_UNAUTHORIZED:
+            case HttpStatus.SC_FORBIDDEN:
+                throw getErrorReporter().errorAuth();
+            default:
+                throw getErrorReporter().errorInProtocol(httpResponse.getStatusLine().toString());
+        }
+        QueryServiceResponse response;
+        try (InputStream contentStream = httpResponse.getEntity().getContent()) {
+            response = driverContext.genericObjectReader.readValue(contentStream, QueryServiceResponse.class);
+        }
+        QueryServiceResponse.Status status = response.status;
+        if (httpStatus == HttpStatus.SC_OK && status == QueryServiceResponse.Status.SUCCESS) {
+            return response;
+        }
+        if (status == QueryServiceResponse.Status.TIMEOUT) {
+            throw getErrorReporter().errorTimeout();
+        }
+        SQLException exc = getErrorIfExists(response);
+        if (exc != null) {
+            throw exc;
+        } else {
+            throw getErrorReporter().errorInProtocol(httpResponse.getStatusLine().toString());
+        }
+    }
+
+    JsonParser fetchResult(QueryServiceResponse response) throws SQLException {
+        if (response.handle == null) {
+            throw getErrorReporter().errorInProtocol();
+        }
+        int p = response.handle.lastIndexOf("/");
+        if (p < 0) {
+            throw getErrorReporter().errorInProtocol(response.handle);
+        }
+        String handlePath = response.handle.substring(p);
+        URI resultRequestURI;
+        try {
+            resultRequestURI = new URI(queryResultEndpoint + handlePath);
+        } catch (URISyntaxException e) {
+            throw getErrorReporter().errorInProtocol(handlePath);
+        }
+        HttpGet httpGet = new HttpGet(resultRequestURI);
+        httpGet.setHeader(HttpHeaders.ACCEPT, ContentType.APPLICATION_JSON.getMimeType());
+
+        CloseableHttpResponse httpResponse = null;
+        InputStream httpContentStream = null;
+        JsonParser parser = null;
+        try {
+            httpResponse = httpClient.execute(httpGet, httpClientContext);
+            int httpStatus = httpResponse.getStatusLine().getStatusCode();
+            if (httpStatus != HttpStatus.SC_OK) {
+                throw getErrorReporter().errorNoResult();
+            }
+            HttpEntity entity = httpResponse.getEntity();
+            httpContentStream = entity.getContent();
+            parser = driverContext.genericObjectReader
+                    .createParser(new InputStreamWithAttachedResource(httpContentStream, httpResponse));
+            if (!advanceToArrayField(parser, RESULTS)) {
+                throw getErrorReporter().errorInProtocol();
+            }
+            parser.configure(JsonParser.Feature.AUTO_CLOSE_SOURCE, true);
+            return parser;
+        } catch (SQLException e) {
+            closeQuietly(e, parser, httpContentStream, httpResponse);
+            throw e;
+        } catch (JsonProcessingException e) {
+            closeQuietly(e, parser, httpContentStream, httpResponse);
+            throw getErrorReporter().errorInProtocol(e);
+        } catch (IOException e) {
+            closeQuietly(e, parser, httpContentStream, httpResponse);
+            throw getErrorReporter().errorInConnection(e);
+        }
+    }
+
+    private boolean advanceToArrayField(JsonParser parser, String fieldName) throws IOException {
+        if (parser.nextToken() != JsonToken.START_OBJECT) {
+            return false;
+        }
+        for (;;) {
+            JsonToken token = parser.nextValue();
+            if (token == null || token == JsonToken.END_OBJECT) {
+                return false;
+            }
+            if (parser.currentName().equals(fieldName)) {
+                return token == JsonToken.START_ARRAY;
+            } else if (token.isStructStart()) {
+                parser.skipChildren();
+            } else {
+                parser.nextToken();
+            }
+        }
+    }
+
+    ArrayNode fetchExplainOnlyResult(QueryServiceResponse response,
+            ADBPreparedStatement.AbstractValueSerializer stringSer) throws SQLException {
+        if (response.results == null || response.results.isEmpty()) {
+            throw getErrorReporter().errorInProtocol();
+        }
+        Object v = response.results.get(0);
+        if (!(v instanceof String)) {
+            throw getErrorReporter().errorInProtocol();
+        }
+        try (BufferedReader br = new BufferedReader(new StringReader(v.toString()))) {
+            ArrayNode arrayNode = (ArrayNode) driverContext.genericObjectReader.createArrayNode();
+            String line;
+            while ((line = br.readLine()) != null) {
+                arrayNode.addObject().put(ADBProtocol.EXPLAIN_ONLY_RESULT_COLUMN_NAME,
+                        stringSer.serializeToString(line));
+            }
+            return arrayNode;
+        } catch (IOException e) {
+            throw getErrorReporter().errorInResultHandling(e);
+        }
+    }
+
+    private HttpClientContext createHttpClientContext(URI uri) {
+        HttpClientContext hcCtx = HttpClientContext.create();
+        AuthCache ac = new BasicAuthCache();
+        ac.put(new HttpHost(uri.getHost(), uri.getPort(), uri.getScheme()), new BasicScheme());
+        hcCtx.setAuthCache(ac);
+        return hcCtx;
+    }
+
+    boolean isStatementCategory(QueryServiceResponse response, QueryServiceResponse.StatementCategory category) {
+        return response.plans != null && category.equals(response.plans.statementCategory);
+    }
+
+    int getUpdateCount(QueryServiceResponse response) {
+        // TODO:need to get update count through the response
+        return isStatementCategory(response, QueryServiceResponse.StatementCategory.UPDATE) ? 1 : 0;
+    }
+
+    SQLException getErrorIfExists(QueryServiceResponse response) {
+        if (response.errors != null && !response.errors.isEmpty()) {
+            QueryServiceResponse.Message err = response.errors.get(0);
+            return new SQLException(err.msg, null, err.code);
+        }
+        return null;
+    }
+
+    List<QueryServiceResponse.Message> getWarningIfExists(QueryServiceResponse response) {
+        return response.warnings != null && !response.warnings.isEmpty() ? response.warnings : null;
+    }
+
+    SQLWarning createSQLWarning(List<QueryServiceResponse.Message> warnings) {
+        SQLWarning sqlWarning = null;
+        ListIterator<QueryServiceResponse.Message> i = warnings.listIterator(warnings.size());
+        while (i.hasPrevious()) {
+            QueryServiceResponse.Message w = i.previous();
+            SQLWarning sw = new SQLWarning(w.msg, null, w.code);
+            if (sqlWarning != null) {
+                sw.setNextWarning(sqlWarning);
+            }
+            sqlWarning = sw;
+        }
+        return sqlWarning;
+    }
+
+    List<ADBColumn> getColumns(QueryServiceResponse response) throws SQLException {
+        if (isExplainOnly(response)) {
+            return Collections.singletonList(new ADBColumn(EXPLAIN_ONLY_RESULT_COLUMN_NAME, ADBDatatype.STRING, false));
+        }
+        QueryServiceResponse.Signature signature = response.signature;
+        if (signature == null) {
+            throw getErrorReporter().errorInProtocol();
+        }
+        List<String> nameList = signature.name;
+        List<String> typeList = signature.type;
+        if (nameList == null || nameList.isEmpty() || typeList == null || typeList.isEmpty()) {
+            throw getErrorReporter().errorBadResultSignature();
+        }
+        int count = nameList.size();
+        List<ADBColumn> result = new ArrayList<>(count);
+        for (int i = 0; i < count; i++) {
+            String columnName = nameList.get(i);
+            String typeName = typeList.get(i);
+            boolean optional = false;
+            if (typeName.endsWith(OPTIONAL_TYPE_SUFFIX)) {
+                optional = true;
+                typeName = typeName.substring(0, typeName.length() - OPTIONAL_TYPE_SUFFIX.length());
+            }
+            ADBDatatype columnType = ADBDatatype.findByTypeName(typeName);
+            if (columnType == null) {
+                throw getErrorReporter().errorBadResultSignature();
+            }
+            result.add(new ADBColumn(columnName, columnType, optional));
+        }
+        return result;
+    }
+
+    boolean isExplainOnly(QueryServiceResponse response) {
+        return response.plans != null && Boolean.TRUE.equals(response.plans.explainOnly);
+    }
+
+    int getStatementParameterCount(QueryServiceResponse response) throws SQLException {
+        QueryServiceResponse.Plans plans = response.plans;
+        if (plans == null) {
+            throw getErrorReporter().errorInProtocol();
+        }
+        if (plans.statementParameters == null) {
+            return 0;
+        }
+        int paramPos = 0;
+        for (Object param : plans.statementParameters) {
+            if (param instanceof Number) {
+                paramPos = Math.max(paramPos, ((Number) param).intValue());
+            } else {
+                throw getErrorReporter().errorParameterNotSupported(String.valueOf(param));
+            }
+        }
+        return paramPos;
+    }
+
+    JsonParser createJsonParser(JsonNode node) {
+        return driverContext.genericObjectReader.treeAsTokens(node);
+    }
+
+    ADBErrorReporter getErrorReporter() {
+        return driverContext.errorReporter;
+    }
+
+    Logger getLogger() {
+        return driverContext.logger;
+    }
+
+    static ObjectMapper createObjectMapper() {
+        ObjectMapper om = new ObjectMapper();
+        om.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.NON_PRIVATE);
+        // serialization
+        om.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
+
+        // deserialization
+        om.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+        om.configure(DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_AS_NULL, true);
+        om.enable(MapperFeature.ACCEPT_CASE_INSENSITIVE_ENUMS);
+        return om;
+    }
+
+    private static void closeQuietly(Exception mainExc, java.io.Closeable... closeableList) {
+        for (Closeable closeable : closeableList) {
+            if (closeable != null) {
+                try {
+                    closeable.close();
+                } catch (IOException e) {
+                    if (mainExc != null) {
+                        mainExc.addSuppressed(e);
+                    }
+                }
+            }
+        }
+    }
+
+    static final class ByteArrayOutputStreamImpl extends ByteArrayOutputStream implements ContentProducer {
+        private ByteArrayOutputStreamImpl(int size) {
+            super(size);
+        }
+    }
+
+    static final class EntityTemplateImpl extends EntityTemplate {
+
+        private final long contentLength;
+
+        private EntityTemplateImpl(ByteArrayOutputStreamImpl baos, ContentType contentType) {
+            super(baos);
+            contentLength = baos.size();
+            setContentType(contentType.toString());
+        }
+
+        @Override
+        public long getContentLength() {
+            return contentLength;
+        }
+    }
+
+    static final class InputStreamWithAttachedResource extends FilterInputStream {
+
+        private final Closeable resource;
+
+        private InputStreamWithAttachedResource(InputStream delegate, Closeable resource) {
+            super(delegate);
+            this.resource = Objects.requireNonNull(resource);
+        }
+
+        @Override
+        public void close() throws IOException {
+            try {
+                super.close();
+            } finally {
+                resource.close();
+            }
+        }
+    }
+
+    public static class QueryServiceResponse {
+
+        public Status status;
+        public Plans plans;
+        public Signature signature;
+        public String handle;
+        public List<?> results; // currently only used for EXPLAIN results
+        public List<Message> errors;
+        public List<Message> warnings;
+
+        public enum Status {
+            RUNNING,
+            SUCCESS,
+            TIMEOUT,
+            FAILED,
+            FATAL
+        }
+
+        public static class Signature {
+            List<String> name;
+            List<String> type;
+        }
+
+        public static class Plans {
+            StatementCategory statementCategory;
+            List<Object> statementParameters;
+            Boolean explainOnly;
+        }
+
+        public static class Message {
+            int code;
+            String msg;
+        }
+
+        public enum StatementCategory {
+            QUERY,
+            UPDATE,
+            DDL,
+            PROCEDURE
+        }
+    }
+}
diff --git a/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBResultSet.java b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBResultSet.java
new file mode 100644
index 0000000..965855c
--- /dev/null
+++ b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBResultSet.java
@@ -0,0 +1,1536 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.jdbc.core;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Reader;
+import java.math.BigDecimal;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.sql.Array;
+import java.sql.Blob;
+import java.sql.Clob;
+import java.sql.Date;
+import java.sql.NClob;
+import java.sql.Ref;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.RowId;
+import java.sql.SQLException;
+import java.sql.SQLType;
+import java.sql.SQLWarning;
+import java.sql.SQLXML;
+import java.sql.Statement;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.Calendar;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.ObjectReader;
+
+final class ADBResultSet extends ADBWrapperSupport implements java.sql.ResultSet {
+
+    static final int RESULT_SET_HOLDABILITY = HOLD_CURSORS_OVER_COMMIT;
+
+    static final int ST_BEFORE_FIRST = 0;
+    static final int ST_NEXT = 1;
+    static final int ST_AFTER_LAST = 2;
+
+    // lifecycle
+    final AtomicBoolean closed = new AtomicBoolean(false);
+
+    // metadata
+    final ADBResultSetMetaData metadata;
+
+    // navigation
+    final JsonParser rowParser;
+    final boolean rowParserOwnsResources;
+    final long maxRows;
+
+    int state;
+    long rowNumber;
+    ADBRowStore rowStore;
+    ObjectReader complexColumnReader;
+    int columnIndexOfLatestGet;
+
+    // Lifecycle
+
+    ADBResultSet(ADBResultSetMetaData metadata, JsonParser rowParser, boolean rowParserOwnsResources, long maxRows) {
+        this.metadata = Objects.requireNonNull(metadata);
+        this.rowParser = Objects.requireNonNull(rowParser);
+        this.rowParserOwnsResources = rowParserOwnsResources;
+        this.maxRows = maxRows;
+        this.state = ST_BEFORE_FIRST;
+    }
+
+    @Override
+    public void close() throws SQLException {
+        closeImpl(true);
+    }
+
+    void closeImpl(boolean notifyStatement) throws SQLException {
+        boolean wasClosed = closed.getAndSet(true);
+        if (wasClosed) {
+            return;
+        }
+        try {
+            rowParser.close();
+        } catch (IOException e) {
+            throw getErrorReporter().errorClosingResource(e);
+        } finally {
+            if (notifyStatement) {
+                metadata.statement.deregisterResultSet(this);
+            }
+        }
+    }
+
+    @Override
+    public boolean isClosed() {
+        return closed.get();
+    }
+
+    private void checkClosed() throws SQLException {
+        if (isClosed()) {
+            throw getErrorReporter().errorObjectClosed(ResultSet.class);
+        }
+    }
+
+    // Metadata
+
+    @Override
+    public ResultSetMetaData getMetaData() throws SQLException {
+        checkClosed();
+        return metadata;
+    }
+
+    // Navigation
+
+    @Override
+    public boolean next() throws SQLException {
+        checkClosed();
+        try {
+            switch (state) {
+                case ST_BEFORE_FIRST:
+                    JsonToken token = rowParser.hasCurrentToken() ? rowParser.currentToken() : rowParser.nextToken();
+                    if (token != JsonToken.START_ARRAY) {
+                        throw getErrorReporter().errorInProtocol(String.valueOf(token));
+                    }
+                    initRowStore();
+                    state = ST_NEXT;
+                    // fall thru to ST_NEXT
+                case ST_NEXT:
+                    token = rowParser.nextToken();
+                    switch (token) {
+                        case START_OBJECT:
+                            if (maxRows > 0 && rowNumber == maxRows) {
+                                state = ST_AFTER_LAST;
+                                return false;
+                            } else {
+                                readRow();
+                                rowNumber++;
+                                return true;
+                            }
+                        case END_ARRAY:
+                            state = ST_AFTER_LAST;
+                            return false;
+                        default:
+                            throw getErrorReporter().errorInProtocol(String.valueOf(token));
+                    }
+                case ST_AFTER_LAST:
+                    return false;
+                default:
+                    throw new IllegalStateException(String.valueOf(state));
+            }
+        } catch (JsonProcessingException e) {
+            throw getErrorReporter().errorInProtocol(e);
+        } catch (IOException e) {
+            throw getErrorReporter().errorInConnection(e);
+        }
+    }
+
+    private void initRowStore() {
+        rowStore = createRowStore(metadata.getColumnCount());
+    }
+
+    protected ADBRowStore createRowStore(int columnCount) {
+        return new ADBRowStore(this, columnCount);
+    }
+
+    private void readRow() throws SQLException {
+        rowStore.reset();
+        columnIndexOfLatestGet = -1;
+        if (rowParser.currentToken() != JsonToken.START_OBJECT) {
+            throw new IllegalStateException();
+        }
+        try {
+            while (rowParser.nextToken() == JsonToken.FIELD_NAME) {
+                String fieldName = rowParser.getCurrentName();
+                int columnIndex = metadata.findColumnIndexByName(fieldName);
+                boolean isKnownColumn = columnIndex >= 0;
+                ADBColumn column = isKnownColumn ? metadata.getColumnByIndex(columnIndex) : null;
+
+                switch (rowParser.nextToken()) {
+                    case VALUE_NULL:
+                        if (isKnownColumn) {
+                            typeCheck(column, ADBDatatype.NULL);
+                            rowStore.putNullColumn(columnIndex);
+                        }
+                        break;
+                    case VALUE_TRUE:
+                        if (isKnownColumn) {
+                            typeCheck(column, ADBDatatype.BOOLEAN);
+                            rowStore.putBooleanColumn(columnIndex, true);
+                        }
+                        break;
+                    case VALUE_FALSE:
+                        if (isKnownColumn) {
+                            typeCheck(column, ADBDatatype.BOOLEAN);
+                            rowStore.putBooleanColumn(columnIndex, false);
+                        }
+                        break;
+                    case VALUE_NUMBER_INT:
+                        if (isKnownColumn) {
+                            typeCheck(column, ADBDatatype.BIGINT);
+                            long v = rowParser.getLongValue();
+                            rowStore.putInt64Column(columnIndex, v);
+                        }
+                        break;
+                    case VALUE_STRING:
+                        if (isKnownColumn) {
+                            typeCheck(column, ADBDatatype.STRING);
+                            char[] textChars = rowParser.getTextCharacters();
+                            int textOffset = rowParser.getTextOffset();
+                            int textLength = rowParser.getTextLength();
+                            rowStore.putColumn(columnIndex, textChars, textOffset, textLength);
+                        }
+                        break;
+                    case START_OBJECT:
+                        if (isKnownColumn) {
+                            typeCheck(column, ADBDatatype.OBJECT);
+                            Map<?, ?> valueMap = getComplexColumnReader().readValue(rowParser, Map.class);
+                            rowStore.putRecordColumn(columnIndex, valueMap);
+                        } else {
+                            rowParser.skipChildren();
+                        }
+                        break;
+                    case START_ARRAY:
+                        if (isKnownColumn) {
+                            typeCheck(column, ADBDatatype.ARRAY);
+                            List<?> valueList = getComplexColumnReader().readValue(rowParser, List.class);
+                            rowStore.putArrayColumn(columnIndex, valueList);
+                        } else {
+                            rowParser.skipChildren();
+                        }
+                        break;
+                    default:
+                        throw getErrorReporter().errorInProtocol(String.valueOf(rowParser.currentToken()));
+                }
+            }
+        } catch (JsonProcessingException e) {
+            throw getErrorReporter().errorInProtocol(e);
+        } catch (IOException e) {
+            throw getErrorReporter().errorInConnection(e);
+        }
+    }
+
+    private void typeCheck(ADBColumn column, ADBDatatype parsedType) throws SQLException {
+        ADBDatatype columnType = column.getType();
+
+        boolean typeMatch;
+        switch (parsedType) {
+            case NULL:
+                typeMatch = column.isOptional();
+                break;
+            case STRING:
+                // special handling for parsed 'string' because it can contain any primitive type.
+                // we only need to check that the expected type is not derived (i.e primitive/null/missing/any)
+                typeMatch = !columnType.isDerived();
+                break;
+            case ARRAY:
+                typeMatch = columnType == ADBDatatype.ANY || columnType.isList();
+                break;
+            case BOOLEAN:
+            case BIGINT:
+            case OBJECT:
+                typeMatch = columnType == ADBDatatype.ANY || columnType == parsedType;
+                break;
+            default:
+                // unexpected
+                throw getErrorReporter().errorInProtocol(parsedType.toString());
+        }
+        if (!typeMatch) {
+            throw getErrorReporter().errorUnexpectedColumnValue(parsedType, column.getName());
+        }
+    }
+
+    @Override
+    public void beforeFirst() throws SQLException {
+        checkClosed();
+        throw getErrorReporter().errorIncompatibleMode("FORWARD_ONLY");
+    }
+
+    @Override
+    public void afterLast() throws SQLException {
+        checkClosed();
+        throw getErrorReporter().errorIncompatibleMode("FORWARD_ONLY");
+    }
+
+    @Override
+    public boolean first() throws SQLException {
+        checkClosed();
+        throw getErrorReporter().errorIncompatibleMode("FORWARD_ONLY");
+    }
+
+    @Override
+    public boolean last() throws SQLException {
+        checkClosed();
+        throw getErrorReporter().errorIncompatibleMode("FORWARD_ONLY");
+    }
+
+    @Override
+    public boolean previous() throws SQLException {
+        checkClosed();
+        throw getErrorReporter().errorIncompatibleMode("FORWARD_ONLY");
+    }
+
+    @Override
+    public boolean relative(int rows) throws SQLException {
+        checkClosed();
+        throw getErrorReporter().errorIncompatibleMode("FORWARD_ONLY");
+    }
+
+    @Override
+    public boolean absolute(int row) throws SQLException {
+        checkClosed();
+        throw getErrorReporter().errorIncompatibleMode("FORWARD_ONLY");
+    }
+
+    @Override
+    public boolean isBeforeFirst() {
+        return state == ST_BEFORE_FIRST;
+    }
+
+    @Override
+    public boolean isAfterLast() {
+        return state == ST_AFTER_LAST;
+    }
+
+    @Override
+    public boolean isFirst() {
+        return state == ST_NEXT && rowNumber == 1;
+    }
+
+    @Override
+    public boolean isLast() throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "isLast");
+    }
+
+    @Override
+    public int getRow() throws SQLException {
+        checkClosed();
+        return (int) rowNumber;
+    }
+
+    private void checkCursorPosition() throws SQLException {
+        if (state != ST_NEXT) {
+            throw getErrorReporter().errorNoCurrentRow();
+        }
+    }
+
+    private ObjectReader getComplexColumnReader() {
+        if (complexColumnReader == null) {
+            ADBDriverContext ctx = metadata.statement.connection.protocol.driverContext;
+            ADBRowStore tmpStore = createRowStore(1);
+            complexColumnReader = tmpStore.createComplexColumnObjectReader(ctx.admFormatObjectReader);
+        }
+        return complexColumnReader;
+    }
+
+    // Column accessors
+
+    @Override
+    public int findColumn(String columnLabel) throws SQLException {
+        checkClosed();
+        int columnIndex = metadata.findColumnIndexByName(columnLabel);
+        if (columnIndex < 0) {
+            throw getErrorReporter().errorColumnNotFound(columnLabel);
+        }
+        return columnIndex + 1;
+    }
+
+    // Column accessors: basic types
+
+    private int fetchColumnIndex(int columnNumber) throws SQLException {
+        if (columnNumber < 1 || columnNumber > metadata.getColumnCount()) {
+            throw getErrorReporter().errorColumnNotFound(String.valueOf(columnNumber));
+        }
+        return columnNumber - 1;
+    }
+
+    private int fetchColumnIndex(String columnLabel) throws SQLException {
+        int columnIndex = metadata.findColumnIndexByName(columnLabel);
+        if (columnIndex < 0) {
+            throw getErrorReporter().errorColumnNotFound(columnLabel);
+        }
+        return columnIndex;
+    }
+
+    @Override
+    public boolean wasNull() throws SQLException {
+        checkClosed();
+        checkCursorPosition();
+        if (columnIndexOfLatestGet < 0) {
+            return false;
+        }
+        ADBDatatype columnValueType = rowStore.getColumnType(columnIndexOfLatestGet);
+        return columnValueType == ADBDatatype.NULL || columnValueType == ADBDatatype.MISSING;
+    }
+
+    @Override
+    public boolean getBoolean(int columnNumber) throws SQLException {
+        checkClosed();
+        checkCursorPosition();
+        return getBooleanImpl(fetchColumnIndex(columnNumber));
+    }
+
+    @Override
+    public boolean getBoolean(String columnLabel) throws SQLException {
+        checkClosed();
+        checkCursorPosition();
+        return getBooleanImpl(fetchColumnIndex(columnLabel));
+    }
+
+    private boolean getBooleanImpl(int columnIndex) throws SQLException {
+        boolean v = rowStore.getBoolean(columnIndex);
+        columnIndexOfLatestGet = columnIndex;
+        return v;
+    }
+
+    @Override
+    public byte getByte(int columnNumber) throws SQLException {
+        checkClosed();
+        checkCursorPosition();
+        return getByteImpl(fetchColumnIndex(columnNumber));
+    }
+
+    @Override
+    public byte getByte(String columnLabel) throws SQLException {
+        checkClosed();
+        checkCursorPosition();
+        return getByteImpl(fetchColumnIndex(columnLabel));
+    }
+
+    private byte getByteImpl(int columnIndex) throws SQLException {
+        byte v = rowStore.getByte(columnIndex);
+        columnIndexOfLatestGet = columnIndex;
+        return v;
+    }
+
+    @Override
+    public short getShort(int columnNumber) throws SQLException {
+        checkClosed();
+        checkCursorPosition();
+        return getShortImpl(fetchColumnIndex(columnNumber));
+    }
+
+    @Override
+    public short getShort(String columnLabel) throws SQLException {
+        checkClosed();
+        checkCursorPosition();
+        return getShortImpl(fetchColumnIndex(columnLabel));
+    }
+
+    private short getShortImpl(int columnIndex) throws SQLException {
+        short v = rowStore.getShort(columnIndex);
+        columnIndexOfLatestGet = columnIndex;
+        return v;
+    }
+
+    @Override
+    public int getInt(int columnNumber) throws SQLException {
+        checkClosed();
+        checkCursorPosition();
+        return getIntImpl(fetchColumnIndex(columnNumber));
+    }
+
+    @Override
+    public int getInt(String columnLabel) throws SQLException {
+        checkClosed();
+        checkCursorPosition();
+        return getIntImpl(fetchColumnIndex(columnLabel));
+    }
+
+    private int getIntImpl(int columnIndex) throws SQLException {
+        int v = rowStore.getInt(columnIndex);
+        columnIndexOfLatestGet = columnIndex;
+        return v;
+    }
+
+    @Override
+    public long getLong(int columnNumber) throws SQLException {
+        checkClosed();
+        checkCursorPosition();
+        return getLongImpl(fetchColumnIndex(columnNumber));
+    }
+
+    @Override
+    public long getLong(String columnLabel) throws SQLException {
+        checkClosed();
+        checkCursorPosition();
+        return getLongImpl(fetchColumnIndex(columnLabel));
+    }
+
+    private long getLongImpl(int columnIndex) throws SQLException {
+        long v = rowStore.getLong(columnIndex);
+        columnIndexOfLatestGet = columnIndex;
+        return v;
+    }
+
+    @Override
+    public float getFloat(int columnNumber) throws SQLException {
+        checkClosed();
+        checkCursorPosition();
+        return getFloatImpl(fetchColumnIndex(columnNumber));
+    }
+
+    @Override
+    public float getFloat(String columnLabel) throws SQLException {
+        checkClosed();
+        checkCursorPosition();
+        return getFloatImpl(fetchColumnIndex(columnLabel));
+    }
+
+    private float getFloatImpl(int columnIndex) throws SQLException {
+        float v = rowStore.getFloat(columnIndex);
+        columnIndexOfLatestGet = columnIndex;
+        return v;
+    }
+
+    @Override
+    public double getDouble(int columnNumber) throws SQLException {
+        checkClosed();
+        checkCursorPosition();
+        return getDoubleImpl(fetchColumnIndex(columnNumber));
+    }
+
+    @Override
+    public double getDouble(String columnLabel) throws SQLException {
+        checkClosed();
+        checkCursorPosition();
+        return getDoubleImpl(fetchColumnIndex(columnLabel));
+    }
+
+    private double getDoubleImpl(int columnIndex) throws SQLException {
+        double v = rowStore.getDouble(columnIndex);
+        columnIndexOfLatestGet = columnIndex;
+        return v;
+    }
+
+    @Override
+    public BigDecimal getBigDecimal(int columnNumber) throws SQLException {
+        checkClosed();
+        checkCursorPosition();
+        return getBigDecimalImpl(fetchColumnIndex(columnNumber), false, -1);
+    }
+
+    @Override
+    public BigDecimal getBigDecimal(int columnNumber, int scale) throws SQLException {
+        checkClosed();
+        checkCursorPosition();
+        return getBigDecimalImpl(fetchColumnIndex(columnNumber), true, scale);
+    }
+
+    @Override
+    public BigDecimal getBigDecimal(String columnLabel) throws SQLException {
+        checkClosed();
+        checkCursorPosition();
+        return getBigDecimalImpl(fetchColumnIndex(columnLabel), false, -1);
+    }
+
+    @Override
+    public BigDecimal getBigDecimal(String columnLabel, int scale) throws SQLException {
+        checkClosed();
+        checkCursorPosition();
+        return getBigDecimalImpl(fetchColumnIndex(columnLabel), true, scale);
+    }
+
+    private BigDecimal getBigDecimalImpl(int columnIndex, boolean setScale, int scale) throws SQLException {
+        BigDecimal v = rowStore.getBigDecimal(columnIndex, setScale, scale);
+        columnIndexOfLatestGet = columnIndex;
+        return v;
+    }
+
+    @Override
+    public Date getDate(int columnNumber) throws SQLException {
+        checkClosed();
+        checkCursorPosition();
+        return getDateImpl(fetchColumnIndex(columnNumber), null);
+    }
+
+    @Override
+    public Date getDate(int columnNumber, Calendar cal) throws SQLException {
+        checkClosed();
+        checkCursorPosition();
+        return getDateImpl(fetchColumnIndex(columnNumber), cal);
+    }
+
+    @Override
+    public Date getDate(String columnLabel) throws SQLException {
+        checkClosed();
+        checkCursorPosition();
+        return getDateImpl(fetchColumnIndex(columnLabel), null);
+    }
+
+    @Override
+    public Date getDate(String columnLabel, Calendar cal) throws SQLException {
+        checkClosed();
+        checkCursorPosition();
+        return getDateImpl(fetchColumnIndex(columnLabel), cal);
+    }
+
+    private Date getDateImpl(int columnIndex, Calendar cal) throws SQLException {
+        Date v = rowStore.getDate(columnIndex, cal);
+        columnIndexOfLatestGet = columnIndex;
+        return v;
+    }
+
+    @Override
+    public Time getTime(int columnNumber) throws SQLException {
+        checkClosed();
+        checkCursorPosition();
+        return getTimeImpl(fetchColumnIndex(columnNumber), null);
+    }
+
+    @Override
+    public Time getTime(int columnNumber, Calendar cal) throws SQLException {
+        checkClosed();
+        checkCursorPosition();
+        return getTimeImpl(fetchColumnIndex(columnNumber), cal);
+    }
+
+    @Override
+    public Time getTime(String columnLabel) throws SQLException {
+        checkClosed();
+        checkCursorPosition();
+        return getTimeImpl(fetchColumnIndex(columnLabel), null);
+    }
+
+    @Override
+    public Time getTime(String columnLabel, Calendar cal) throws SQLException {
+        checkClosed();
+        checkCursorPosition();
+        return getTimeImpl(fetchColumnIndex(columnLabel), cal);
+    }
+
+    private Time getTimeImpl(int columnIndex, Calendar cal) throws SQLException {
+        Time v = rowStore.getTime(columnIndex, cal);
+        columnIndexOfLatestGet = columnIndex;
+        return v;
+    }
+
+    @Override
+    public Timestamp getTimestamp(int columnNumber) throws SQLException {
+        checkClosed();
+        checkCursorPosition();
+        return getTimestampImpl(fetchColumnIndex(columnNumber), null);
+    }
+
+    @Override
+    public Timestamp getTimestamp(int columnNumber, Calendar cal) throws SQLException {
+        checkClosed();
+        checkCursorPosition();
+        return getTimestampImpl(fetchColumnIndex(columnNumber), cal);
+    }
+
+    @Override
+    public Timestamp getTimestamp(String columnLabel) throws SQLException {
+        checkClosed();
+        checkCursorPosition();
+        return getTimestampImpl(fetchColumnIndex(columnLabel), null);
+    }
+
+    @Override
+    public Timestamp getTimestamp(String columnLabel, Calendar cal) throws SQLException {
+        checkClosed();
+        checkCursorPosition();
+        return getTimestampImpl(fetchColumnIndex(columnLabel), cal);
+    }
+
+    private Timestamp getTimestampImpl(int columnIndex, Calendar cal) throws SQLException {
+        Timestamp v = rowStore.getTimestamp(columnIndex, cal);
+        columnIndexOfLatestGet = columnIndex;
+        return v;
+    }
+
+    @Override
+    public String getString(int columnNumber) throws SQLException {
+        checkClosed();
+        checkCursorPosition();
+        return getStringImpl(fetchColumnIndex(columnNumber));
+    }
+
+    @Override
+    public String getString(String columnLabel) throws SQLException {
+        checkClosed();
+        checkCursorPosition();
+        return getStringImpl(fetchColumnIndex(columnLabel));
+    }
+
+    @Override
+    public String getNString(int columnNumber) throws SQLException {
+        return getString(columnNumber);
+    }
+
+    @Override
+    public String getNString(String columnLabel) throws SQLException {
+        return getString(columnLabel);
+    }
+
+    private String getStringImpl(int columnIndex) throws SQLException {
+        String v = rowStore.getString(columnIndex);
+        columnIndexOfLatestGet = columnIndex;
+        return v;
+    }
+
+    @Override
+    public byte[] getBytes(int columnNumber) throws SQLException {
+        checkClosed();
+        checkCursorPosition();
+        return getBytesImpl(fetchColumnIndex(columnNumber));
+    }
+
+    @Override
+    public byte[] getBytes(String columnLabel) throws SQLException {
+        checkClosed();
+        checkCursorPosition();
+        return getBytesImpl(fetchColumnIndex(columnLabel));
+    }
+
+    private byte[] getBytesImpl(int columnIndex) throws SQLException {
+        byte[] v = rowStore.getBinary(columnIndex);
+        columnIndexOfLatestGet = columnIndex;
+        return v;
+    }
+
+    // Column accessor: Generic (getObject)
+
+    @Override
+    public Object getObject(int columnNumber) throws SQLException {
+        checkClosed();
+        checkCursorPosition();
+        return getObjectImpl(fetchColumnIndex(columnNumber));
+    }
+
+    @Override
+    public Object getObject(String columnLabel) throws SQLException {
+        checkClosed();
+        checkCursorPosition();
+        return getObjectImpl(fetchColumnIndex(columnLabel));
+    }
+
+    @Override
+    public <T> T getObject(int columnNumber, Class<T> type) throws SQLException {
+        checkClosed();
+        checkCursorPosition();
+        if (type == null) {
+            throw getErrorReporter().errorParameterValueNotSupported("type");
+        }
+        return getObjectImpl(fetchColumnIndex(columnNumber), type);
+    }
+
+    @Override
+    public <T> T getObject(String columnLabel, Class<T> type) throws SQLException {
+        checkClosed();
+        checkCursorPosition();
+        if (type == null) {
+            throw getErrorReporter().errorParameterValueNotSupported("type");
+        }
+        return getObjectImpl(fetchColumnIndex(columnLabel), type);
+    }
+
+    private Object getObjectImpl(int columnIndex) throws SQLException {
+        ADBColumn column = metadata.getColumnByIndex(columnIndex);
+        return getObjectImpl(columnIndex, column.getType().getJavaClass());
+    }
+
+    private <T> T getObjectImpl(int columnIndex, Class<T> type) throws SQLException {
+        T v = rowStore.getObject(columnIndex, type);
+        columnIndexOfLatestGet = columnIndex;
+        return v;
+    }
+
+    @Override
+    public Object getObject(int columnIndex, Map<String, Class<?>> map) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "getObject");
+    }
+
+    @Override
+    public Object getObject(String columnLabel, Map<String, Class<?>> map) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "getObject");
+    }
+
+    // Column accessors: streams
+
+    @Override
+    public InputStream getBinaryStream(int columnIndex) throws SQLException {
+        checkClosed();
+        checkCursorPosition();
+        return getBinaryStreamImpl(fetchColumnIndex(columnIndex));
+    }
+
+    @Override
+    public InputStream getBinaryStream(String columnLabel) throws SQLException {
+        checkClosed();
+        checkCursorPosition();
+        return getBinaryStreamImpl(fetchColumnIndex(columnLabel));
+    }
+
+    private InputStream getBinaryStreamImpl(int columnIndex) throws SQLException {
+        InputStream v = rowStore.getInputStream(columnIndex);
+        columnIndexOfLatestGet = columnIndex;
+        return v;
+    }
+
+    @Override
+    public Reader getCharacterStream(int columnNumber) throws SQLException {
+        checkClosed();
+        checkCursorPosition();
+        return getCharacterStreamImpl(fetchColumnIndex(columnNumber));
+    }
+
+    @Override
+    public Reader getCharacterStream(String columnLabel) throws SQLException {
+        checkClosed();
+        checkCursorPosition();
+        return getCharacterStreamImpl(fetchColumnIndex(columnLabel));
+    }
+
+    private Reader getCharacterStreamImpl(int columnIndex) throws SQLException {
+        Reader v = rowStore.getCharacterStream(columnIndex);
+        columnIndexOfLatestGet = columnIndex;
+        return v;
+    }
+
+    @Override
+    public Reader getNCharacterStream(int columnIndex) throws SQLException {
+        return getCharacterStream(columnIndex);
+    }
+
+    @Override
+    public Reader getNCharacterStream(String columnLabel) throws SQLException {
+        return getCharacterStream(columnLabel);
+    }
+
+    @Override
+    public InputStream getAsciiStream(int columnNumber) throws SQLException {
+        checkClosed();
+        checkCursorPosition();
+        return getAsciiStreamImpl(fetchColumnIndex(columnNumber));
+    }
+
+    @Override
+    public InputStream getAsciiStream(String columnLabel) throws SQLException {
+        checkClosed();
+        checkCursorPosition();
+        return getAsciiStreamImpl(fetchColumnIndex(columnLabel));
+    }
+
+    private InputStream getAsciiStreamImpl(int columnIndex) throws SQLException {
+        String value = getString(columnIndex);
+        return value != null ? new ByteArrayInputStream(value.getBytes(StandardCharsets.US_ASCII)) : null;
+    }
+
+    @Override
+    public InputStream getUnicodeStream(int columnNumber) throws SQLException {
+        checkClosed();
+        checkCursorPosition();
+        return getUnicodeStreamImpl(fetchColumnIndex(columnNumber));
+    }
+
+    @Override
+    public InputStream getUnicodeStream(String columnLabel) throws SQLException {
+        checkClosed();
+        checkCursorPosition();
+        return getUnicodeStreamImpl(fetchColumnIndex(columnLabel));
+    }
+
+    private InputStream getUnicodeStreamImpl(int columnIndex) throws SQLException {
+        String value = getString(columnIndex);
+        return value != null ? new ByteArrayInputStream(value.getBytes(StandardCharsets.UTF_16)) : null;
+    }
+
+    // Column accessors: unsupported
+
+    @Override
+    public Ref getRef(int columnIndex) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "getRef");
+    }
+
+    @Override
+    public Ref getRef(String columnLabel) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "getRef");
+    }
+
+    @Override
+    public RowId getRowId(int columnIndex) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "getRowId");
+    }
+
+    @Override
+    public RowId getRowId(String columnLabel) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "getRowId");
+    }
+
+    @Override
+    public URL getURL(int columnIndex) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "getURL");
+    }
+
+    @Override
+    public URL getURL(String columnLabel) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "getURL");
+    }
+
+    // Column accessors: unsupported - LOB, Array, SQLXML
+
+    @Override
+    public Array getArray(int columnIndex) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "getArray");
+    }
+
+    @Override
+    public Array getArray(String columnLabel) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "getArray");
+    }
+
+    @Override
+    public Blob getBlob(int columnIndex) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "getBlob");
+    }
+
+    @Override
+    public Blob getBlob(String columnLabel) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "getBlob");
+    }
+
+    @Override
+    public Clob getClob(int columnIndex) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "getClob");
+    }
+
+    @Override
+    public Clob getClob(String columnLabel) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "getClob");
+    }
+
+    @Override
+    public NClob getNClob(int columnIndex) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "getNClob");
+    }
+
+    @Override
+    public NClob getNClob(String columnLabel) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "getNClob");
+    }
+
+    @Override
+    public SQLXML getSQLXML(int columnIndex) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "getSQLXML");
+    }
+
+    @Override
+    public SQLXML getSQLXML(String columnLabel) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "getSQLXML");
+    }
+
+    // Updates (unsupported)
+
+    // Column setters
+
+    @Override
+    public void updateArray(int columnIndex, Array x) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "updateArray");
+    }
+
+    @Override
+    public void updateArray(String columnLabel, Array x) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "updateArray");
+    }
+
+    @Override
+    public void updateAsciiStream(int columnIndex, InputStream x, int length) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "updateAsciiStream");
+    }
+
+    @Override
+    public void updateAsciiStream(String columnLabel, InputStream x, int length) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "updateAsciiStream");
+    }
+
+    @Override
+    public void updateAsciiStream(int columnIndex, InputStream x, long length) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "updateAsciiStream");
+    }
+
+    @Override
+    public void updateAsciiStream(String columnLabel, InputStream x, long length) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "updateAsciiStream");
+    }
+
+    @Override
+    public void updateAsciiStream(int columnIndex, InputStream x) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "updateAsciiStream");
+    }
+
+    @Override
+    public void updateAsciiStream(String columnLabel, InputStream x) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "updateAsciiStream");
+    }
+
+    @Override
+    public void updateBigDecimal(int columnIndex, BigDecimal x) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "updateBigDecimal");
+    }
+
+    @Override
+    public void updateBigDecimal(String columnLabel, BigDecimal x) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "updateBigDecimal");
+    }
+
+    @Override
+    public void updateBinaryStream(int columnIndex, InputStream x, int length) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "updateBinaryStream");
+    }
+
+    @Override
+    public void updateBinaryStream(String columnLabel, InputStream x, int length) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "updateBinaryStream");
+    }
+
+    @Override
+    public void updateBinaryStream(int columnIndex, InputStream x, long length) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "updateBinaryStream");
+    }
+
+    @Override
+    public void updateBinaryStream(String columnLabel, InputStream x, long length) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "updateBinaryStream");
+    }
+
+    @Override
+    public void updateBinaryStream(int columnIndex, InputStream x) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "updateBinaryStream");
+    }
+
+    @Override
+    public void updateBinaryStream(String columnLabel, InputStream x) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "updateBinaryStream");
+    }
+
+    @Override
+    public void updateBlob(int columnIndex, Blob x) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "updateBlob");
+    }
+
+    @Override
+    public void updateBlob(String columnLabel, Blob x) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "updateBlob");
+    }
+
+    @Override
+    public void updateBlob(int columnIndex, InputStream inputStream, long length) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "updateBlob");
+    }
+
+    @Override
+    public void updateBlob(String columnLabel, InputStream inputStream, long length) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "updateBlob");
+    }
+
+    @Override
+    public void updateBlob(int columnIndex, InputStream inputStream) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "updateBlob");
+    }
+
+    @Override
+    public void updateBlob(String columnLabel, InputStream inputStream) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "updateBlob");
+    }
+
+    @Override
+    public void updateBoolean(int columnIndex, boolean x) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "updateBoolean");
+    }
+
+    @Override
+    public void updateBoolean(String columnLabel, boolean x) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "updateBoolean");
+    }
+
+    @Override
+    public void updateByte(int columnIndex, byte x) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "updateByte");
+    }
+
+    @Override
+    public void updateByte(String columnLabel, byte x) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "updateByte");
+    }
+
+    @Override
+    public void updateBytes(int columnIndex, byte[] x) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "updateBytes");
+    }
+
+    @Override
+    public void updateBytes(String columnLabel, byte[] x) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "updateBytes");
+    }
+
+    @Override
+    public void updateCharacterStream(int columnIndex, Reader x, int length) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "updateCharacterStream");
+    }
+
+    @Override
+    public void updateCharacterStream(String columnLabel, Reader reader, int length) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "updateCharacterStream");
+    }
+
+    @Override
+    public void updateCharacterStream(int columnIndex, Reader x, long length) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "updateCharacterStream");
+    }
+
+    @Override
+    public void updateCharacterStream(String columnLabel, Reader reader, long length) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "updateCharacterStream");
+    }
+
+    @Override
+    public void updateCharacterStream(int columnIndex, Reader x) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "updateCharacterStream");
+    }
+
+    @Override
+    public void updateCharacterStream(String columnLabel, Reader reader) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "updateCharacterStream");
+    }
+
+    @Override
+    public void updateClob(int columnIndex, Clob x) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "updateClob");
+    }
+
+    @Override
+    public void updateClob(String columnLabel, Clob x) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "updateClob");
+    }
+
+    @Override
+    public void updateClob(int columnIndex, Reader reader, long length) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "updateClob");
+    }
+
+    @Override
+    public void updateClob(String columnLabel, Reader reader, long length) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "updateClob");
+    }
+
+    @Override
+    public void updateClob(int columnIndex, Reader reader) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "updateClob");
+    }
+
+    @Override
+    public void updateClob(String columnLabel, Reader reader) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "updateClob");
+    }
+
+    @Override
+    public void updateDate(int columnIndex, Date x) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "updateDate");
+    }
+
+    @Override
+    public void updateDate(String columnLabel, Date x) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "updateDate");
+    }
+
+    @Override
+    public void updateDouble(int columnIndex, double x) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "updateDouble");
+    }
+
+    @Override
+    public void updateDouble(String columnLabel, double x) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "updateDouble");
+    }
+
+    @Override
+    public void updateFloat(int columnIndex, float x) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "updateFloat");
+    }
+
+    @Override
+    public void updateFloat(String columnLabel, float x) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "updateFloat");
+    }
+
+    @Override
+    public void updateInt(int columnIndex, int x) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "updateInt");
+    }
+
+    @Override
+    public void updateInt(String columnLabel, int x) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "updateInt");
+    }
+
+    @Override
+    public void updateLong(int columnIndex, long x) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "updateLong");
+    }
+
+    @Override
+    public void updateLong(String columnLabel, long x) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "updateLong");
+    }
+
+    @Override
+    public void updateNCharacterStream(int columnIndex, Reader x, long length) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "updateNCharacterStream");
+    }
+
+    @Override
+    public void updateNCharacterStream(String columnLabel, Reader reader, long length) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "updateNCharacterStream");
+    }
+
+    @Override
+    public void updateNCharacterStream(int columnIndex, Reader x) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "updateNCharacterStream");
+    }
+
+    @Override
+    public void updateNCharacterStream(String columnLabel, Reader reader) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "updateNCharacterStream");
+    }
+
+    @Override
+    public void updateNClob(int columnIndex, NClob nClob) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "updateNClob");
+    }
+
+    @Override
+    public void updateNClob(String columnLabel, NClob nClob) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "updateNClob");
+    }
+
+    @Override
+    public void updateNClob(int columnIndex, Reader reader, long length) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "updateNClob");
+    }
+
+    @Override
+    public void updateNClob(String columnLabel, Reader reader, long length) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "updateNClob");
+    }
+
+    @Override
+    public void updateNClob(int columnIndex, Reader reader) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "updateNClob");
+    }
+
+    @Override
+    public void updateNClob(String columnLabel, Reader reader) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "updateNClob");
+    }
+
+    @Override
+    public void updateNString(int columnIndex, String nString) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "updateNString");
+    }
+
+    @Override
+    public void updateNString(String columnLabel, String nString) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "updateNString");
+    }
+
+    @Override
+    public void updateNull(int columnIndex) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "updateNull");
+    }
+
+    @Override
+    public void updateNull(String columnLabel) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "updateNull");
+    }
+
+    @Override
+    public void updateObject(int columnIndex, Object x, int scaleOrLength) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "updateObject");
+    }
+
+    @Override
+    public void updateObject(int columnIndex, Object x) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "updateObject");
+    }
+
+    @Override
+    public void updateObject(int columnIndex, Object x, SQLType targetSqlType, int scaleOrLength) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "updateObject");
+    }
+
+    @Override
+    public void updateObject(String columnLabel, Object x, SQLType targetSqlType, int scaleOrLength)
+            throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "updateObject");
+    }
+
+    @Override
+    public void updateObject(int columnIndex, Object x, SQLType targetSqlType) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "updateObject");
+    }
+
+    @Override
+    public void updateObject(String columnLabel, Object x, SQLType targetSqlType) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "updateObject");
+    }
+
+    @Override
+    public void updateObject(String columnLabel, Object x, int scaleOrLength) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "updateObject");
+    }
+
+    @Override
+    public void updateObject(String columnLabel, Object x) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "updateObject");
+    }
+
+    @Override
+    public void updateRef(int columnIndex, Ref x) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "updateRef");
+    }
+
+    @Override
+    public void updateRef(String columnLabel, Ref x) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "updateRef");
+    }
+
+    @Override
+    public void updateRowId(int columnIndex, RowId x) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "updateRowId");
+    }
+
+    @Override
+    public void updateRowId(String columnLabel, RowId x) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "updateRowId");
+    }
+
+    @Override
+    public void updateSQLXML(int columnIndex, SQLXML xmlObject) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "updateSQLXML");
+    }
+
+    @Override
+    public void updateSQLXML(String columnLabel, SQLXML xmlObject) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "updateSQLXML");
+    }
+
+    @Override
+    public void updateShort(int columnIndex, short x) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "updateShort");
+    }
+
+    @Override
+    public void updateShort(String columnLabel, short x) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "updateShort");
+    }
+
+    @Override
+    public void updateString(int columnIndex, String x) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "updateString");
+    }
+
+    @Override
+    public void updateString(String columnLabel, String x) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "updateString");
+    }
+
+    @Override
+    public void updateTime(int columnIndex, Time x) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "updateTime");
+    }
+
+    @Override
+    public void updateTime(String columnLabel, Time x) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "updateTime");
+    }
+
+    @Override
+    public void updateTimestamp(int columnIndex, Timestamp x) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "updateTimestamp");
+    }
+
+    @Override
+    public void updateTimestamp(String columnLabel, Timestamp x) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "updateTimestamp");
+    }
+
+    // Update navigation and state (unsupported)
+
+    @Override
+    public void insertRow() throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "insertRow");
+    }
+
+    @Override
+    public void updateRow() throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "updateRow");
+    }
+
+    @Override
+    public void deleteRow() throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "deleteRow");
+    }
+
+    @Override
+    public void refreshRow() throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "refreshRow");
+    }
+
+    @Override
+    public void moveToInsertRow() throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "moveToInsertRow");
+    }
+
+    @Override
+    public void moveToCurrentRow() throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "moveToCurrentRow");
+    }
+
+    @Override
+    public boolean rowInserted() throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "rowInserted");
+    }
+
+    @Override
+    public boolean rowUpdated() throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "rowUpdated");
+    }
+
+    @Override
+    public boolean rowDeleted() throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "rowDeleted");
+
+    }
+
+    @Override
+    public void cancelRowUpdates() throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(ResultSet.class, "cancelRowUpdates");
+    }
+
+    // Errors and warnings
+
+    @Override
+    public SQLWarning getWarnings() throws SQLException {
+        checkClosed();
+        return null;
+    }
+
+    @Override
+    public void clearWarnings() throws SQLException {
+        checkClosed();
+    }
+
+    @Override
+    protected ADBErrorReporter getErrorReporter() {
+        return metadata.getErrorReporter();
+    }
+
+    // Ownership
+
+    @Override
+    public Statement getStatement() throws SQLException {
+        checkClosed();
+        return metadata.statement.getResultSetStatement(this);
+    }
+
+    // Cursor - related
+
+    @Override
+    public String getCursorName() throws SQLException {
+        checkClosed();
+        return "";
+    }
+
+    @Override
+    public int getType() throws SQLException {
+        checkClosed();
+        return TYPE_FORWARD_ONLY;
+    }
+
+    @Override
+    public int getConcurrency() throws SQLException {
+        checkClosed();
+        return ResultSet.CONCUR_READ_ONLY;
+    }
+
+    @Override
+    public int getHoldability() throws SQLException {
+        checkClosed();
+        return RESULT_SET_HOLDABILITY;
+    }
+
+    @Override
+    public int getFetchDirection() throws SQLException {
+        checkClosed();
+        return FETCH_FORWARD;
+    }
+
+    @Override
+    public void setFetchDirection(int direction) throws SQLException {
+        checkClosed();
+        if (direction != ResultSet.FETCH_FORWARD) {
+            throw getErrorReporter().errorParameterValueNotSupported("direction");
+        }
+    }
+
+    @Override
+    public int getFetchSize() throws SQLException {
+        checkClosed();
+        return 1;
+    }
+
+    @Override
+    public void setFetchSize(int rows) throws SQLException {
+        checkClosed();
+        // ignore value
+    }
+}
diff --git a/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBResultSetMetaData.java b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBResultSetMetaData.java
new file mode 100644
index 0000000..67dd217
--- /dev/null
+++ b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBResultSetMetaData.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.jdbc.core;
+
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+final class ADBResultSetMetaData extends ADBWrapperSupport implements ResultSetMetaData {
+
+    final ADBStatement statement;
+
+    private final List<ADBColumn> columns;
+
+    private final Map<String, Integer> indexByName;
+
+    ADBResultSetMetaData(ADBStatement statement, List<ADBColumn> columns) {
+        this.statement = Objects.requireNonNull(statement);
+        this.columns = columns != null ? columns : Collections.emptyList();
+        this.indexByName = createIndexByName(this.columns);
+    }
+
+    @Override
+    public int getColumnCount() {
+        return columns.size();
+    }
+
+    @Override
+    public String getColumnName(int columnNumber) throws SQLException {
+        return getColumnByNumber(columnNumber).getName();
+    }
+
+    @Override
+    public String getColumnLabel(int columnNumber) throws SQLException {
+        return getColumnByNumber(columnNumber).getName();
+    }
+
+    @Override
+    public int getColumnType(int columnNumber) throws SQLException {
+        return getColumnByNumber(columnNumber).getType().getJdbcType().getVendorTypeNumber();
+    }
+
+    @Override
+    public String getColumnTypeName(int columnNumber) throws SQLException {
+        return getColumnByNumber(columnNumber).getType().getTypeName();
+    }
+
+    @Override
+    public String getColumnClassName(int columnNumber) throws SQLException {
+        return getColumnByNumber(columnNumber).getType().getJavaClass().getName();
+    }
+
+    @Override
+    public int getColumnDisplaySize(int columnNumber) {
+        // TODO:based on type
+        return 1;
+    }
+
+    @Override
+    public int getPrecision(int columnNumber) {
+        // TODO:based on type
+        return 0;
+    }
+
+    @Override
+    public int getScale(int columnNumber) {
+        return 0;
+    }
+
+    @Override
+    public boolean isAutoIncrement(int columnNumber) {
+        return false;
+    }
+
+    @Override
+    public boolean isCaseSensitive(int columnNumber) {
+        return false;
+    }
+
+    @Override
+    public boolean isCurrency(int columnNumber) {
+        return false;
+    }
+
+    @Override
+    public int isNullable(int columnNumber) throws SQLException {
+        return getColumnByNumber(columnNumber).isOptional() ? columnNullable : columnNoNulls;
+    }
+
+    @Override
+    public boolean isSearchable(int columnNumber) {
+        return true;
+    }
+
+    @Override
+    public boolean isSigned(int columnNumber) {
+        return false;
+    }
+
+    @Override
+    public boolean isReadOnly(int columnNumber) {
+        return false;
+    }
+
+    @Override
+    public boolean isWritable(int columnNumber) {
+        return false;
+    }
+
+    @Override
+    public boolean isDefinitelyWritable(int columnNumber) {
+        return false;
+    }
+
+    @Override
+    public String getCatalogName(int columnNumber) {
+        return "";
+    }
+
+    @Override
+    public String getSchemaName(int columnNumber) {
+        return "";
+    }
+
+    @Override
+    public String getTableName(int columnNumber) {
+        return "";
+    }
+
+    // Helpers
+
+    private ADBColumn getColumnByNumber(int columnNumber) throws SQLException {
+        return getColumnByIndex(toColumnIndex(columnNumber));
+    }
+
+    private int toColumnIndex(int columnNumber) throws SQLException {
+        boolean ok = 0 < columnNumber && columnNumber <= columns.size();
+        if (!ok) {
+            throw getErrorReporter().errorParameterValueNotSupported("columnNumber");
+        }
+        return columnNumber - 1;
+    }
+
+    ADBColumn getColumnByIndex(int idx) {
+        return columns.get(idx);
+    }
+
+    int findColumnIndexByName(String columnName) {
+        Integer idx = indexByName.get(columnName);
+        return idx != null ? idx : -1;
+    }
+
+    private static Map<String, Integer> createIndexByName(List<ADBColumn> columns) {
+        int n = columns.size();
+        switch (n) {
+            case 0:
+                return Collections.emptyMap();
+            case 1:
+                return Collections.singletonMap(columns.get(0).getName(), 0);
+            default:
+                Map<String, Integer> m = new HashMap<>();
+                for (int i = 0; i < n; i++) {
+                    m.put(columns.get(i).getName(), i);
+                }
+                return m;
+        }
+    }
+
+    @Override
+    protected ADBErrorReporter getErrorReporter() {
+        return statement.getErrorReporter();
+    }
+}
diff --git a/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBRowStore.java b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBRowStore.java
new file mode 100644
index 0000000..da9c5bd
--- /dev/null
+++ b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBRowStore.java
@@ -0,0 +1,1191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.jdbc.core;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Reader;
+import java.io.StringReader;
+import java.io.StringWriter;
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.nio.charset.StandardCharsets;
+import java.sql.Date;
+import java.sql.SQLException;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.Period;
+import java.time.format.DateTimeParseException;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.BeanDescription;
+import com.fasterxml.jackson.databind.DeserializationConfig;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectReader;
+import com.fasterxml.jackson.databind.deser.BeanDeserializerModifier;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+
+final class ADBRowStore {
+
+    private static final String ROW_STORE_ATTR_NAME = ADBRowStore.class.getSimpleName();
+
+    private static final int FLOAT_NAN_BITS = Float.floatToIntBits(Float.NaN);
+    private static final int FLOAT_POSITIVE_ZERO_BITS = Float.floatToIntBits(+0.0f);
+    private static final int FLOAT_NEGATIVE_ZERO_BITS = Float.floatToIntBits(-0.0f);
+    private static final long DOUBLE_NAN_BITS = Double.doubleToLongBits(Double.NaN);
+    private static final long DOUBLE_POSITIVE_ZERO_BITS = Double.doubleToLongBits(+0.0d);
+    private static final long DOUBLE_NEGATIVE_ZERO_BITS = Double.doubleToLongBits(-0.0d);
+
+    static final Map<Class<?>, GetObjectFunction> OBJECT_ACCESSORS_ATOMIC = createAtomicObjectAccessorMap();
+
+    static final List<Class<?>> GET_OBJECT_NON_ATOMIC = Arrays.asList(Collection.class, List.class, Map.class);
+
+    private final ADBResultSet resultSet;
+
+    private final ADBDatatype[] columnTypes;
+    private final Object[] objectStore;
+    private final long[] registerStore; // 2 registers per column
+
+    private int parsedLength;
+    private long currentDateChronon;
+    private JsonGenerator jsonGen;
+    private StringWriter jsonGenBuffer;
+
+    ADBRowStore(ADBResultSet resultSet, int initialColumnCount) {
+        this.resultSet = Objects.requireNonNull(resultSet);
+        columnTypes = new ADBDatatype[initialColumnCount];
+        objectStore = new Object[initialColumnCount];
+        registerStore = new long[initialColumnCount * 2];
+    }
+
+    void reset() {
+        Arrays.fill(columnTypes, ADBDatatype.MISSING);
+        Arrays.fill(registerStore, 0);
+        Arrays.fill(objectStore, null);
+    }
+
+    private void setColumnType(int columnIndex, ADBDatatype columnType) {
+        columnTypes[columnIndex] = columnType;
+    }
+
+    ADBDatatype getColumnType(int columnIndex) {
+        return columnTypes[columnIndex];
+    }
+
+    void putColumn(int columnIndex, char[] textChars, int textOffset, int textLength) throws SQLException {
+        byte valueTypeTag = parseTypeTag(textChars, textOffset, textLength);
+        ADBDatatype valueType = ADBDatatype.findByTypeTag(valueTypeTag);
+        if (valueType == null) {
+            throw getErrorReporter().errorUnexpectedType(valueTypeTag);
+        }
+
+        int nonTaggedOffset = textOffset + parsedLength;
+        int nonTaggedLength = textLength - parsedLength;
+        int nonTaggedEnd = nonTaggedOffset + nonTaggedLength; // = textOffset + textLength
+
+        setColumnType(columnIndex, valueType);
+
+        // NULL, BOOLEAN, BIGINT shouldn't normally happen. only handle here for completeness
+
+        switch (valueType) {
+            case MISSING:
+            case NULL:
+                // no content
+                break;
+            case BOOLEAN:
+            case TINYINT:
+            case SMALLINT:
+            case INTEGER:
+            case BIGINT:
+            case FLOAT:
+            case DOUBLE:
+            case DATE:
+            case TIME:
+            case DATETIME:
+            case YEARMONTHDURATION:
+            case DAYTIMEDURATION:
+                long r0 = parseInt64(textChars, nonTaggedOffset, nonTaggedEnd);
+                setColumnRegisters(columnIndex, r0, 0);
+                break;
+            case STRING:
+                objectStore[columnIndex] = new String(textChars, nonTaggedOffset, nonTaggedLength);
+                break;
+            case DURATION:
+                int delimiterOffset = indexOf(ADBProtocol.TEXT_DELIMITER, textChars, nonTaggedOffset, nonTaggedEnd);
+                if (delimiterOffset < 0 || delimiterOffset == nonTaggedEnd - 1) {
+                    throw getErrorReporter().errorInProtocol();
+                }
+                r0 = parseInt64(textChars, nonTaggedOffset, delimiterOffset);
+                long r1 = parseInt64(textChars, delimiterOffset + 1, nonTaggedEnd);
+                setColumnRegisters(columnIndex, r0, r1);
+                break;
+            case UUID:
+                // TODO: better encoding as 2 longs?
+                objectStore[columnIndex] = UUID.fromString(new String(textChars, nonTaggedOffset, nonTaggedLength));
+                break;
+            case OBJECT:
+            case ARRAY:
+            case MULTISET:
+                // Unexpected (shouldn't be called)
+                throw new IllegalArgumentException(String.valueOf(valueType));
+            default:
+                throw getErrorReporter().errorUnexpectedType(valueType);
+        }
+    }
+
+    void putNullColumn(int columnIndex) {
+        setColumnType(columnIndex, ADBDatatype.NULL);
+    }
+
+    void putBooleanColumn(int columnIndex, boolean value) {
+        setColumnType(columnIndex, ADBDatatype.BOOLEAN);
+        setColumnRegisters(columnIndex, value ? 1 : 0, 0);
+    }
+
+    void putInt64Column(int columnIndex, long value) {
+        setColumnType(columnIndex, ADBDatatype.BIGINT);
+        setColumnRegisters(columnIndex, value, 0);
+    }
+
+    void putArrayColumn(int columnIndex, List<?> value) {
+        setColumnType(columnIndex, ADBDatatype.ARRAY);
+        objectStore[columnIndex] = Objects.requireNonNull(value);
+    }
+
+    void putRecordColumn(int columnIndex, Map<?, ?> value) {
+        setColumnType(columnIndex, ADBDatatype.OBJECT);
+        objectStore[columnIndex] = Objects.requireNonNull(value);
+    }
+
+    private void setColumnRegisters(int columnIndex, long r0, long r1) {
+        int registerPos = columnIndex * 2;
+        registerStore[registerPos] = r0;
+        registerStore[++registerPos] = r1;
+    }
+
+    private long getColumnRegister(int columnIndex, int registerIndex) {
+        int registerPos = columnIndex * 2;
+        switch (registerIndex) {
+            case 0:
+                break;
+            case 1:
+                registerPos++;
+                break;
+            default:
+                throw new IllegalArgumentException();
+        }
+        return registerStore[registerPos];
+    }
+
+    private boolean getColumnRegisterAsBoolean(int columnIndex, int registerIndex) {
+        return getColumnRegister(columnIndex, registerIndex) != 0;
+    }
+
+    private byte getColumnRegisterAsByte(int columnIndex, int registerIndex) {
+        return (byte) getColumnRegister(columnIndex, registerIndex);
+    }
+
+    private short getColumnRegisterAsShort(int columnIndex, int registerIndex) {
+        return (short) getColumnRegister(columnIndex, registerIndex);
+    }
+
+    private int getColumnRegisterAsInt(int columnIndex, int registerIndex) {
+        return (int) getColumnRegister(columnIndex, registerIndex);
+    }
+
+    private float getColumnRegisterAsFloat(int columnIndex, int registerIndex) {
+        return Float.intBitsToFloat(getColumnRegisterAsFloatBits(columnIndex, registerIndex));
+    }
+
+    private boolean isColumnRegisterZeroOrNanFloat(int columnIndex, int registerIndex) {
+        int bits = getColumnRegisterAsFloatBits(columnIndex, registerIndex);
+        return bits == FLOAT_POSITIVE_ZERO_BITS || bits == FLOAT_NEGATIVE_ZERO_BITS || bits == FLOAT_NAN_BITS;
+    }
+
+    private int getColumnRegisterAsFloatBits(int columnIndex, int registerIndex) {
+        return getColumnRegisterAsInt(columnIndex, registerIndex);
+    }
+
+    private double getColumnRegisterAsDouble(int columnIndex, int registerIndex) {
+        return Double.longBitsToDouble(getColumnRegisterAsDoubleBits(columnIndex, registerIndex));
+    }
+
+    private boolean isColumnRegisterZeroOrNanDouble(int columnIndex, int registerIndex) {
+        long bits = getColumnRegisterAsDoubleBits(columnIndex, registerIndex);
+        return bits == DOUBLE_POSITIVE_ZERO_BITS || bits == DOUBLE_NEGATIVE_ZERO_BITS || bits == DOUBLE_NAN_BITS;
+    }
+
+    private long getColumnRegisterAsDoubleBits(int columnIndex, int registerIndex) {
+        return getColumnRegister(columnIndex, registerIndex);
+    }
+
+    private Period getColumnRegisterAsPeriod(int columnIndex, int registerIndex) {
+        return Period.ofMonths((int) getColumnRegister(columnIndex, registerIndex));
+    }
+
+    private Duration getColumnRegisterAsDuration(int columnIndex, int registerIndex) {
+        return Duration.ofMillis((int) getColumnRegister(columnIndex, registerIndex));
+    }
+
+    private Number getNumberFromObjectStore(int columnIndex) {
+        Object o = objectStore[columnIndex];
+        if (o != null) {
+            return (Number) o;
+        }
+        Number n;
+        ADBDatatype valueType = getColumnType(columnIndex);
+        switch (valueType) {
+            case TINYINT:
+                n = getColumnRegisterAsByte(columnIndex, 0);
+                break;
+            case SMALLINT:
+                n = getColumnRegisterAsShort(columnIndex, 0);
+                break;
+            case INTEGER:
+                n = getColumnRegisterAsInt(columnIndex, 0);
+                break;
+            case BIGINT:
+                n = getColumnRegister(columnIndex, 0);
+                break;
+            case FLOAT:
+                n = getColumnRegisterAsFloat(columnIndex, 0);
+                break;
+            case DOUBLE:
+                n = getColumnRegisterAsDouble(columnIndex, 0);
+                break;
+            default:
+                throw new IllegalArgumentException(String.valueOf(valueType));
+        }
+        objectStore[columnIndex] = n;
+        return n;
+    }
+
+    private String getStringFromObjectStore(int columnIndex) {
+        return (String) objectStore[columnIndex];
+    }
+
+    private UUID getUUIDFromObjectStore(int columnIndex) {
+        return (UUID) objectStore[columnIndex];
+    }
+
+    private Period getPeriodFromObjectStore(int columnIndex) {
+        Object o = objectStore[columnIndex];
+        if (o != null) {
+            return (Period) o;
+        }
+        ADBDatatype valueType = getColumnType(columnIndex);
+        if (valueType != ADBDatatype.YEARMONTHDURATION) {
+            throw new IllegalArgumentException(String.valueOf(valueType));
+        }
+        Period v = getColumnRegisterAsPeriod(columnIndex, 0);
+        objectStore[columnIndex] = v;
+        return v;
+    }
+
+    private Duration getDurationFromObjectStore(int columnIndex) {
+        Object o = objectStore[columnIndex];
+        if (o != null) {
+            return (Duration) o;
+        }
+        ADBDatatype valueType = getColumnType(columnIndex);
+        if (valueType != ADBDatatype.DAYTIMEDURATION) {
+            throw new IllegalArgumentException(String.valueOf(valueType));
+        }
+        Duration v = getColumnRegisterAsDuration(columnIndex, 0);
+        objectStore[columnIndex] = v;
+        return v;
+    }
+
+    private String getISODurationStringFromObjectStore(int columnIndex) {
+        Object o = objectStore[columnIndex];
+        if (o != null) {
+            return (String) o;
+        }
+        ADBDatatype valueType = getColumnType(columnIndex);
+        if (valueType != ADBDatatype.DURATION) {
+            throw new IllegalArgumentException(String.valueOf(valueType));
+        }
+        String v = getColumnRegisterAsPeriod(columnIndex, 0).toString()
+                + getColumnRegisterAsDuration(columnIndex, 1).toString().substring(1);
+        objectStore[columnIndex] = v;
+        return v;
+    }
+
+    boolean getBoolean(int columnIndex) throws SQLException {
+        ADBDatatype valueType = getColumnType(columnIndex);
+        switch (valueType) {
+            case MISSING:
+            case NULL:
+                return false;
+            case BOOLEAN:
+                return getColumnRegisterAsBoolean(columnIndex, 0);
+            case TINYINT:
+            case SMALLINT:
+            case INTEGER:
+            case BIGINT:
+                return getColumnRegister(columnIndex, 0) != 0;
+            case FLOAT:
+                return !isColumnRegisterZeroOrNanFloat(columnIndex, 0);
+            case DOUBLE:
+                return !isColumnRegisterZeroOrNanDouble(columnIndex, 0);
+            case STRING:
+                return Boolean.parseBoolean(getStringFromObjectStore(columnIndex));
+            default:
+                throw getErrorReporter().errorUnexpectedType(valueType);
+        }
+    }
+
+    byte getByte(int columnIndex) throws SQLException {
+        ADBDatatype valueType = getColumnType(columnIndex);
+        switch (valueType) {
+            case MISSING:
+            case NULL:
+                return 0;
+            case BOOLEAN:
+                return (byte) (getColumnRegisterAsBoolean(columnIndex, 0) ? 1 : 0);
+            case TINYINT:
+            case SMALLINT:
+            case INTEGER:
+            case BIGINT:
+                return getColumnRegisterAsByte(columnIndex, 0);
+            case FLOAT:
+                return (byte) getColumnRegisterAsFloat(columnIndex, 0);
+            case DOUBLE:
+                return (byte) getColumnRegisterAsDouble(columnIndex, 0);
+            case STRING:
+                return (byte) parseInt64(getStringFromObjectStore(columnIndex));
+            default:
+                throw getErrorReporter().errorUnexpectedType(valueType);
+        }
+    }
+
+    short getShort(int columnIndex) throws SQLException {
+        ADBDatatype valueType = getColumnType(columnIndex);
+        switch (valueType) {
+            case MISSING:
+            case NULL:
+                return 0;
+            case BOOLEAN:
+                return (short) (getColumnRegisterAsBoolean(columnIndex, 0) ? 1 : 0);
+            case TINYINT:
+            case SMALLINT:
+            case INTEGER:
+            case BIGINT:
+                return getColumnRegisterAsShort(columnIndex, 0);
+            case FLOAT:
+                return (short) getColumnRegisterAsFloat(columnIndex, 0);
+            case DOUBLE:
+                return (short) getColumnRegisterAsDouble(columnIndex, 0);
+            case STRING:
+                return (short) parseInt64(getStringFromObjectStore(columnIndex));
+            default:
+                throw getErrorReporter().errorUnexpectedType(valueType);
+        }
+    }
+
+    int getInt(int columnIndex) throws SQLException {
+        ADBDatatype valueType = getColumnType(columnIndex);
+        switch (valueType) {
+            case MISSING:
+            case NULL:
+                return 0;
+            case BOOLEAN:
+                return getColumnRegisterAsBoolean(columnIndex, 0) ? 1 : 0;
+            case TINYINT:
+            case SMALLINT:
+            case INTEGER:
+            case BIGINT:
+            case DATE:
+            case TIME:
+            case YEARMONTHDURATION:
+                return getColumnRegisterAsInt(columnIndex, 0);
+            case FLOAT:
+                return (int) getColumnRegisterAsFloat(columnIndex, 0);
+            case DOUBLE:
+                return (int) getColumnRegisterAsDouble(columnIndex, 0);
+            case STRING:
+                return (int) parseInt64(getStringFromObjectStore(columnIndex));
+            default:
+                throw getErrorReporter().errorUnexpectedType(valueType);
+        }
+    }
+
+    long getLong(int columnIndex) throws SQLException {
+        ADBDatatype valueType = getColumnType(columnIndex);
+        switch (valueType) {
+            case MISSING:
+            case NULL:
+                return 0;
+            case BOOLEAN:
+                return getColumnRegisterAsBoolean(columnIndex, 0) ? 1 : 0;
+            case TINYINT:
+            case SMALLINT:
+            case INTEGER:
+            case BIGINT:
+            case DATE:
+            case TIME:
+            case DATETIME:
+            case YEARMONTHDURATION:
+            case DAYTIMEDURATION:
+                return getColumnRegister(columnIndex, 0);
+            case FLOAT:
+                return (long) getColumnRegisterAsFloat(columnIndex, 0);
+            case DOUBLE:
+                return (long) getColumnRegisterAsDouble(columnIndex, 0);
+            case STRING:
+                return parseInt64(getStringFromObjectStore(columnIndex));
+            default:
+                // TODO:support temporal types?
+                throw getErrorReporter().errorUnexpectedType(valueType);
+        }
+    }
+
+    float getFloat(int columnIndex) throws SQLException {
+        ADBDatatype valueType = getColumnType(columnIndex);
+        switch (valueType) {
+            case MISSING:
+            case NULL:
+                return 0;
+            case BOOLEAN:
+                return getColumnRegisterAsBoolean(columnIndex, 0) ? 1f : 0f;
+            case TINYINT:
+            case SMALLINT:
+            case INTEGER:
+            case BIGINT:
+                return getColumnRegister(columnIndex, 0);
+            case FLOAT:
+                return getColumnRegisterAsFloat(columnIndex, 0);
+            case DOUBLE:
+                return (float) getColumnRegisterAsDouble(columnIndex, 0);
+            case STRING:
+                try {
+                    return Float.parseFloat(getStringFromObjectStore(columnIndex));
+                } catch (NumberFormatException e) {
+                    throw getErrorReporter().errorInvalidValueOfType(valueType);
+                }
+            default:
+                throw getErrorReporter().errorUnexpectedType(valueType);
+        }
+    }
+
+    double getDouble(int columnIndex) throws SQLException {
+        ADBDatatype valueType = getColumnType(columnIndex);
+        switch (valueType) {
+            case MISSING:
+            case NULL:
+                return 0;
+            case BOOLEAN:
+                return getColumnRegisterAsBoolean(columnIndex, 0) ? 1d : 0d;
+            case TINYINT:
+            case SMALLINT:
+            case INTEGER:
+            case BIGINT:
+                return getColumnRegister(columnIndex, 0);
+            case FLOAT:
+                return getColumnRegisterAsFloat(columnIndex, 0);
+            case DOUBLE:
+                return getColumnRegisterAsDouble(columnIndex, 0);
+            case STRING:
+                try {
+                    return Double.parseDouble(getStringFromObjectStore(columnIndex));
+                } catch (NumberFormatException e) {
+                    throw getErrorReporter().errorInvalidValueOfType(valueType);
+                }
+            default:
+                throw getErrorReporter().errorUnexpectedType(valueType);
+        }
+    }
+
+    BigDecimal getBigDecimal(int columnIndex) throws SQLException {
+        return getBigDecimal(columnIndex, false, 0);
+    }
+
+    @SuppressWarnings("UnpredictableBigDecimalConstructorCall")
+    BigDecimal getBigDecimal(int columnIndex, boolean setScale, int scale) throws SQLException {
+        BigDecimal dec;
+        ADBDatatype valueType = getColumnType(columnIndex);
+        switch (valueType) {
+            case MISSING:
+            case NULL:
+                return null;
+            case BOOLEAN:
+                dec = getColumnRegisterAsBoolean(columnIndex, 0) ? BigDecimal.ONE : BigDecimal.ZERO;
+                break;
+            case TINYINT:
+            case SMALLINT:
+            case INTEGER:
+            case BIGINT:
+            case DATE:
+            case TIME:
+            case DATETIME:
+            case YEARMONTHDURATION:
+            case DAYTIMEDURATION:
+                dec = BigDecimal.valueOf(getColumnRegister(columnIndex, 0));
+                break;
+            case FLOAT:
+                try {
+                    dec = new BigDecimal(getColumnRegisterAsFloat(columnIndex, 0));
+                } catch (NumberFormatException e) {
+                    throw getErrorReporter().errorInvalidValueOfType(valueType);
+                }
+                break;
+            case DOUBLE:
+                try {
+                    dec = new BigDecimal(getColumnRegisterAsDouble(columnIndex, 0));
+                } catch (NumberFormatException e) {
+                    throw getErrorReporter().errorInvalidValueOfType(valueType);
+                }
+                break;
+            case STRING:
+                try {
+                    dec = new BigDecimal(getStringFromObjectStore(columnIndex));
+                } catch (NumberFormatException e) {
+                    throw getErrorReporter().errorInvalidValueOfType(valueType);
+                }
+                break;
+            default:
+                throw getErrorReporter().errorUnexpectedType(valueType);
+        }
+
+        return setScale ? dec.setScale(scale, RoundingMode.DOWN) : dec;
+    }
+
+    private Date getDate(int columnIndex) throws SQLException {
+        return getDate(columnIndex, null);
+    }
+
+    Date getDate(int columnIndex, Calendar cal) throws SQLException {
+        // TODO:cal is not used
+        ADBDatatype valueType = getColumnType(columnIndex);
+        switch (valueType) {
+            case MISSING:
+            case NULL:
+                return null;
+            case DATE:
+                return toDateFromDateChronon(getColumnRegister(columnIndex, 0));
+            case DATETIME:
+                return toDateFromDatetimeChronon(getColumnRegister(columnIndex, 0));
+            case STRING:
+                try {
+                    LocalDate d = LocalDate.parse(getStringFromObjectStore(columnIndex)); // TODO:review
+                    return new Date(d.getYear() - 1900, d.getMonthValue() - 1, d.getDayOfMonth());
+                } catch (DateTimeParseException e) {
+                    throw getErrorReporter().errorInvalidValueOfType(valueType);
+                }
+            default:
+                throw getErrorReporter().errorUnexpectedType(valueType);
+        }
+    }
+
+    LocalDate getLocalDate(int columnIndex) throws SQLException {
+        ADBDatatype valueType = getColumnType(columnIndex);
+        switch (valueType) {
+            case MISSING:
+            case NULL:
+                return null;
+            case DATE:
+                return toLocalDateFromDateChronon(getColumnRegister(columnIndex, 0));
+            case DATETIME:
+                return toLocalDateFromDatetimeChronon(getColumnRegister(columnIndex, 0));
+            case STRING:
+                try {
+                    return LocalDate.parse(getStringFromObjectStore(columnIndex)); // TODO:review
+                } catch (DateTimeParseException e) {
+                    throw getErrorReporter().errorInvalidValueOfType(valueType);
+                }
+            default:
+                throw getErrorReporter().errorUnexpectedType(valueType);
+        }
+    }
+
+    private Time getTime(int columnIndex) throws SQLException {
+        return getTime(columnIndex, null);
+    }
+
+    Time getTime(int columnIndex, Calendar cal) throws SQLException {
+        // TODO:cal not used
+        ADBDatatype valueType = getColumnType(columnIndex);
+        switch (valueType) {
+            case MISSING:
+            case NULL:
+                return null;
+            case TIME:
+                return toTimeFromTimeChronon(getColumnRegister(columnIndex, 0));
+            case DATETIME:
+                return toTimeFromDatetimeChronon(getColumnRegister(columnIndex, 0));
+            case STRING:
+                try {
+                    LocalTime t = LocalTime.parse(getStringFromObjectStore(columnIndex)); // TODO:review
+                    return toTimeFromTimeChronon(TimeUnit.NANOSECONDS.toMillis(t.toNanoOfDay()));
+                } catch (DateTimeParseException e) {
+                    throw getErrorReporter().errorInvalidValueOfType(valueType);
+                }
+            default:
+                throw getErrorReporter().errorUnexpectedType(valueType);
+        }
+    }
+
+    LocalTime getLocalTime(int columnIndex) throws SQLException {
+        ADBDatatype valueType = getColumnType(columnIndex);
+        switch (valueType) {
+            case MISSING:
+            case NULL:
+                return null;
+            case TIME:
+                return toLocalTimeFromTimeChronon(getColumnRegister(columnIndex, 0));
+            case DATETIME:
+                return toLocalTimeFromDatetimeChronon(getColumnRegister(columnIndex, 0));
+            case STRING:
+                try {
+                    return LocalTime.parse(getStringFromObjectStore(columnIndex)); // TODO:review
+                } catch (DateTimeParseException e) {
+                    throw getErrorReporter().errorInvalidValueOfType(valueType);
+                }
+            default:
+                throw getErrorReporter().errorUnexpectedType(valueType);
+        }
+    }
+
+    private Timestamp getTimestamp(int columnIndex) throws SQLException {
+        return getTimestamp(columnIndex, null);
+    }
+
+    Timestamp getTimestamp(int columnIndex, Calendar cal) throws SQLException {
+        //TODO:FIXME:CAL NOT USED
+        ADBDatatype valueType = getColumnType(columnIndex);
+        switch (valueType) {
+            case MISSING:
+            case NULL:
+                return null;
+            case DATE:
+                return toTimestampFromDateChronon(getColumnRegister(columnIndex, 0));
+            case DATETIME:
+                return toTimestampFromDatetimeChronon(getColumnRegister(columnIndex, 0));
+            case STRING:
+                try {
+                    Instant i = Instant.parse(getStringFromObjectStore(columnIndex));
+                    long millis0 = TimeUnit.SECONDS.toMillis(i.getEpochSecond());
+                    long millis1 = TimeUnit.NANOSECONDS.toMillis(i.getNano());
+                    return new Timestamp(millis0 + millis1);
+                } catch (DateTimeParseException e) {
+                    throw getErrorReporter().errorInvalidValueOfType(valueType);
+                }
+            default:
+                throw getErrorReporter().errorUnexpectedType(valueType);
+        }
+    }
+
+    Instant getInstant(int columnIndex) throws SQLException {
+        ADBDatatype valueType = getColumnType(columnIndex);
+        switch (valueType) {
+            case MISSING:
+            case NULL:
+                return null;
+            case DATE:
+                return toInstantFromDateChronon(getColumnRegister(columnIndex, 0));
+            case DATETIME:
+                return toInstantFromDatetimeChronon(getColumnRegister(columnIndex, 0));
+            case STRING:
+                try {
+                    return Instant.parse(getStringFromObjectStore(columnIndex)); // TODO:review
+                } catch (DateTimeParseException e) {
+                    throw getErrorReporter().errorInvalidValueOfType(valueType);
+                }
+            default:
+                throw getErrorReporter().errorUnexpectedType(valueType);
+        }
+    }
+
+    Period getPeriod(int columnIndex) throws SQLException {
+        ADBDatatype valueType = getColumnType(columnIndex);
+        switch (valueType) {
+            case MISSING:
+            case NULL:
+                return null;
+            case YEARMONTHDURATION:
+                return getPeriodFromObjectStore(columnIndex);
+            case DURATION:
+                return getColumnRegisterAsPeriod(columnIndex, 0);
+            case STRING:
+                try {
+                    return Period.parse(getStringFromObjectStore(columnIndex)); // TODO:review
+                } catch (DateTimeParseException e) {
+                    throw getErrorReporter().errorInvalidValueOfType(valueType);
+                }
+            default:
+                throw getErrorReporter().errorUnexpectedType(valueType);
+        }
+    }
+
+    Duration getDuration(int columnIndex) throws SQLException {
+        ADBDatatype valueType = getColumnType(columnIndex);
+        switch (valueType) {
+            case MISSING:
+            case NULL:
+                return null;
+            case DAYTIMEDURATION:
+                return getDurationFromObjectStore(columnIndex);
+            case DURATION:
+                return getColumnRegisterAsDuration(columnIndex, 1);
+            case STRING:
+                try {
+                    return Duration.parse(getStringFromObjectStore(columnIndex)); // TODO:review
+                } catch (DateTimeParseException e) {
+                    throw getErrorReporter().errorInvalidValueOfType(valueType);
+                }
+            default:
+                throw getErrorReporter().errorUnexpectedType(valueType);
+        }
+    }
+
+    byte[] getBinary(int columnIndex) throws SQLException {
+        ADBDatatype valueType = getColumnType(columnIndex);
+        switch (valueType) {
+            case MISSING:
+            case NULL:
+                return null;
+            case STRING:
+                return getStringFromObjectStore(columnIndex).getBytes(StandardCharsets.UTF_8);
+            default:
+                throw getErrorReporter().errorUnexpectedType(valueType);
+        }
+    }
+
+    UUID getUUID(int columnIndex) throws SQLException {
+        ADBDatatype valueType = getColumnType(columnIndex);
+        switch (valueType) {
+            case MISSING:
+            case NULL:
+                return null;
+            case UUID:
+                return getUUIDFromObjectStore(columnIndex);
+            default:
+                throw getErrorReporter().errorUnexpectedType(valueType);
+        }
+    }
+
+    String getString(int columnIndex) throws SQLException {
+        ADBDatatype valueType = getColumnType(columnIndex);
+        switch (valueType) {
+            case MISSING:
+            case NULL:
+                return null;
+            case BOOLEAN:
+                return Boolean.toString(getColumnRegisterAsBoolean(columnIndex, 0));
+            case TINYINT:
+            case SMALLINT:
+            case INTEGER:
+            case BIGINT:
+                return Long.toString(getColumnRegister(columnIndex, 0));
+            case FLOAT:
+                return Float.toString(getColumnRegisterAsFloat(columnIndex, 0));
+            case DOUBLE:
+                return Double.toString(getColumnRegisterAsDouble(columnIndex, 0));
+            case DATE:
+                return toLocalDateFromDateChronon(getColumnRegister(columnIndex, 0)).toString(); // TODO:review
+            case TIME:
+                return toLocalTimeFromTimeChronon(getColumnRegister(columnIndex, 0)).toString(); // TODO:review
+            case DATETIME:
+                return toInstantFromDatetimeChronon(getColumnRegister(columnIndex, 0)).toString(); // TODO:review
+            case YEARMONTHDURATION:
+                return getPeriodFromObjectStore(columnIndex).toString(); // TODO:review
+            case DAYTIMEDURATION:
+                return getDurationFromObjectStore(columnIndex).toString(); // TODO:review
+            case DURATION:
+                return getISODurationStringFromObjectStore(columnIndex); // TODO:review
+            case STRING:
+                return getStringFromObjectStore(columnIndex);
+            case UUID:
+                return getUUIDFromObjectStore(columnIndex).toString();
+            case OBJECT:
+            case ARRAY:
+                return printAsJson(objectStore[columnIndex]);
+            default:
+                throw getErrorReporter().errorUnexpectedType(valueType);
+        }
+    }
+
+    Reader getCharacterStream(int columnIndex) throws SQLException {
+        ADBDatatype valueType = getColumnType(columnIndex);
+        switch (valueType) {
+            case MISSING:
+            case NULL:
+                return null;
+            case STRING:
+                return new StringReader(getStringFromObjectStore(columnIndex));
+            default:
+                throw getErrorReporter().errorUnexpectedType(valueType);
+        }
+    }
+
+    InputStream getInputStream(int columnIndex) throws SQLException {
+        ADBDatatype valueType = getColumnType(columnIndex);
+        switch (valueType) {
+            case MISSING:
+            case NULL:
+                return null;
+            case STRING:
+                return new ByteArrayInputStream(getStringFromObjectStore(columnIndex).getBytes(StandardCharsets.UTF_8));
+            default:
+                throw getErrorReporter().errorUnexpectedType(valueType);
+        }
+    }
+
+    Object getObject(int columnIndex) throws SQLException {
+        ADBDatatype valueType = getColumnType(columnIndex);
+        switch (valueType) {
+            case MISSING:
+            case NULL:
+                return null;
+            case BOOLEAN:
+                return getColumnRegisterAsBoolean(columnIndex, 0);
+            case TINYINT:
+            case SMALLINT:
+            case INTEGER:
+            case BIGINT:
+            case FLOAT:
+            case DOUBLE:
+                return getNumberFromObjectStore(columnIndex);
+            case DATE:
+                return toDateFromDateChronon(getColumnRegister(columnIndex, 0));
+            case TIME:
+                return toTimeFromTimeChronon(getColumnRegister(columnIndex, 0));
+            case DATETIME:
+                return toTimestampFromDatetimeChronon(getColumnRegister(columnIndex, 0));
+            case YEARMONTHDURATION:
+                return getPeriodFromObjectStore(columnIndex);
+            case DAYTIMEDURATION:
+                return getDurationFromObjectStore(columnIndex);
+            case DURATION:
+                return getISODurationStringFromObjectStore(columnIndex);
+            case STRING:
+                return getStringFromObjectStore(columnIndex);
+            case UUID:
+                return getUUIDFromObjectStore(columnIndex);
+            case OBJECT:
+            case ARRAY:
+                return objectStore[columnIndex]; // TODO:how to make immutable?
+            default:
+                throw getErrorReporter().errorUnexpectedType(valueType);
+        }
+    }
+
+    <T> T getObject(int columnIndex, Class<T> targetType) throws SQLException {
+        ADBDatatype valueType = getColumnType(columnIndex);
+        switch (valueType) {
+            case MISSING:
+            case NULL:
+                return null;
+            default:
+                GetObjectFunction getter = OBJECT_ACCESSORS_ATOMIC.get(targetType);
+                Object v;
+                if (getter != null) {
+                    v = getter.getObject(this, columnIndex);
+                } else if (GET_OBJECT_NON_ATOMIC.contains(targetType)) {
+                    v = getObject(columnIndex);
+                } else {
+                    throw getErrorReporter().errorUnexpectedType(targetType);
+                }
+                return targetType.cast(v);
+        }
+    }
+
+    interface GetObjectFunction {
+        Object getObject(ADBRowStore rowStore, int columnIndex) throws SQLException;
+    }
+
+    private static Map<Class<?>, GetObjectFunction> createAtomicObjectAccessorMap() {
+        Map<Class<?>, GetObjectFunction> map = new HashMap<>();
+        map.put(Boolean.TYPE, ADBRowStore::getBoolean);
+        map.put(Boolean.class, ADBRowStore::getBoolean);
+        map.put(Byte.TYPE, ADBRowStore::getByte);
+        map.put(Byte.class, ADBRowStore::getByte);
+        map.put(Short.TYPE, ADBRowStore::getShort);
+        map.put(Short.class, ADBRowStore::getShort);
+        map.put(Integer.TYPE, ADBRowStore::getInt);
+        map.put(Integer.class, ADBRowStore::getInt);
+        map.put(Long.TYPE, ADBRowStore::getLong);
+        map.put(Long.class, ADBRowStore::getLong);
+        map.put(Float.TYPE, ADBRowStore::getFloat);
+        map.put(Float.class, ADBRowStore::getFloat);
+        map.put(Double.TYPE, ADBRowStore::getDouble);
+        map.put(Double.class, ADBRowStore::getDouble);
+        map.put(BigDecimal.class, ADBRowStore::getBigDecimal);
+        map.put(Date.class, ADBRowStore::getDate);
+        map.put(LocalDate.class, ADBRowStore::getLocalDate);
+        map.put(Time.class, ADBRowStore::getTime);
+        map.put(LocalTime.class, ADBRowStore::getLocalTime);
+        map.put(Timestamp.class, ADBRowStore::getTimestamp);
+        map.put(Instant.class, ADBRowStore::getInstant);
+        map.put(Period.class, ADBRowStore::getPeriod);
+        map.put(Duration.class, ADBRowStore::getDuration);
+        map.put(UUID.class, ADBRowStore::getUUID);
+        map.put(String.class, ADBRowStore::getString);
+        return map;
+    }
+
+    private Date toDateFromDateChronon(long dateChrononInDays) {
+        return new Date(TimeUnit.DAYS.toMillis(dateChrononInDays));
+    }
+
+    private Date toDateFromDatetimeChronon(long datetimeChrononInMillis) {
+        return new Date(datetimeChrononInMillis);
+    }
+
+    private LocalDate toLocalDateFromDateChronon(long dateChrononInDays) {
+        return LocalDate.ofEpochDay(dateChrononInDays);
+    }
+
+    private LocalDate toLocalDateFromDatetimeChronon(long datetimeChrononInMillis) {
+        return LocalDate.ofEpochDay(TimeUnit.MILLISECONDS.toDays(datetimeChrononInMillis));
+    }
+
+    private Time toTimeFromTimeChronon(long timeChrononInMillis) {
+        long datetimeChrononInMillis = getCurrentDateChrononInMillis() + timeChrononInMillis;
+        return new Time(datetimeChrononInMillis);
+    }
+
+    private Time toTimeFromDatetimeChronon(long datetimeChrononInMillis) {
+        return new Time(datetimeChrononInMillis);
+    }
+
+    private LocalTime toLocalTimeFromTimeChronon(long timeChrononInMillis) {
+        return LocalTime.ofNanoOfDay(TimeUnit.MILLISECONDS.toNanos(timeChrononInMillis));
+    }
+
+    private LocalTime toLocalTimeFromDatetimeChronon(long datetimeChrononInMillis) {
+        return LocalTime.ofNanoOfDay(TimeUnit.MILLISECONDS.toNanos(datetimeChrononInMillis));
+    }
+
+    private Timestamp toTimestampFromDatetimeChronon(long datetimeChrononInMillis) {
+        return new Timestamp(datetimeChrononInMillis);
+    }
+
+    private Timestamp toTimestampFromDateChronon(long dateChrononInDays) {
+        return new Timestamp(TimeUnit.DAYS.toMillis(dateChrononInDays));
+    }
+
+    private Instant toInstantFromDatetimeChronon(long datetimeChrononInMillis) {
+        return Instant.ofEpochMilli(datetimeChrononInMillis);
+    }
+
+    private Instant toInstantFromDateChronon(long dateChrononInDays) {
+        return Instant.ofEpochMilli(TimeUnit.DAYS.toMillis(dateChrononInDays));
+    }
+
+    private long getCurrentDateChrononInMillis() {
+        if (currentDateChronon == 0) {
+            long chrononOfDay = TimeUnit.DAYS.toMillis(1);
+            currentDateChronon = System.currentTimeMillis() / chrononOfDay * chrononOfDay;
+        }
+        return currentDateChronon;
+    }
+
+    private String printAsJson(Object value) throws SQLException {
+        if (jsonGenBuffer == null) {
+            jsonGenBuffer = new StringWriter();
+            try {
+                //TODO:FIXME:need to configure generator to print java.sql.Date/Times properly
+                jsonGen = resultSet.metadata.statement.connection.protocol.driverContext.genericObjectWriter
+                        .createGenerator(jsonGenBuffer);
+            } catch (IOException e) {
+                throw getErrorReporter().errorInResultHandling(e);
+            }
+        }
+        try {
+            jsonGen.writeObject(value);
+            jsonGen.flush();
+            return jsonGenBuffer.getBuffer().toString();
+        } catch (IOException e) {
+            throw getErrorReporter().errorInResultHandling(e);
+        } finally {
+            jsonGenBuffer.getBuffer().setLength(0);
+        }
+    }
+
+    ObjectReader createComplexColumnObjectReader(ObjectReader templateReader) {
+        return templateReader.withAttribute(ROW_STORE_ATTR_NAME, this);
+    }
+
+    static void configureDeserialization(ObjectMapper objectMapper, SimpleModule serdeModule) {
+        objectMapper.configure(DeserializationFeature.USE_LONG_FOR_INTS, true);
+        serdeModule.setDeserializerModifier(createADMFormatDeserializerModifier());
+    }
+
+    private static BeanDeserializerModifier createADMFormatDeserializerModifier() {
+        return new BeanDeserializerModifier() {
+            @Override
+            public JsonDeserializer<?> modifyDeserializer(DeserializationConfig config, BeanDescription beanDesc,
+                    JsonDeserializer<?> deserializer) {
+                if (String.class.equals(beanDesc.getClassInfo().getAnnotated())) {
+                    ADBRowStore rowStore = (ADBRowStore) config.getAttributes().getAttribute(ROW_STORE_ATTR_NAME);
+                    return rowStore.createADMFormatStringDeserializer();
+                } else {
+                    return deserializer;
+                }
+            }
+        };
+    }
+
+    private JsonDeserializer<?> createADMFormatStringDeserializer() {
+        return new JsonDeserializer<Object>() {
+            @Override
+            public Object deserialize(JsonParser parser, DeserializationContext ctx) throws IOException {
+                if (!parser.hasToken(JsonToken.VALUE_STRING)) {
+                    throw new IOException("Unexpected token");
+                }
+                try {
+                    ADBRowStore.this.reset();
+                    ADBRowStore.this.putColumn(0, parser.getTextCharacters(), parser.getTextOffset(),
+                            parser.getTextLength());
+                    return ADBRowStore.this.getObject(0);
+                } catch (SQLException e) {
+                    throw new IOException(e);
+                }
+            }
+        };
+    }
+
+    @FunctionalInterface
+    public interface ICharAccessor<T> {
+        char charAt(T input, int index);
+    }
+
+    private long parseInt64(CharSequence buffer) throws SQLException {
+        return parseInt64(buffer, 0, buffer.length(), CharSequence::charAt);
+    }
+
+    private long parseInt64(char[] buffer, int begin, int end) throws SQLException {
+        return parseInt64(buffer, begin, end, (input, index) -> input[index]);
+    }
+
+    private <T> long parseInt64(T buffer, int begin, int end, ICharAccessor<T> charAccessor) throws SQLException {
+        if (end < begin) {
+            throw new IllegalArgumentException();
+        }
+        boolean positive = true;
+        long value = 0;
+        int offset = begin;
+        char c = charAccessor.charAt(buffer, offset);
+        if (c == '+') {
+            offset++;
+        } else if (c == '-') {
+            offset++;
+            positive = false;
+        }
+        try {
+            for (; offset < end; offset++) {
+                c = charAccessor.charAt(buffer, offset);
+                if (c >= '0' && c <= '9') {
+                    value = Math.addExact(Math.multiplyExact(value, 10L), '0' - c);
+                } else {
+                    throw getErrorReporter().errorInProtocol(String.valueOf(c));
+                }
+            }
+            if (positive) {
+                value = Math.multiplyExact(value, -1L);
+            }
+            return value;
+        } catch (ArithmeticException e) {
+            throw getErrorReporter().errorInProtocol();
+        }
+    }
+
+    private byte parseTypeTag(char[] textChars, int textOffset, int textLength) throws SQLException {
+        if (textLength == 0) {
+            // empty string
+            parsedLength = 0;
+            return ADBDatatype.STRING.getTypeTag();
+        }
+        if (textChars[textOffset] == ADBProtocol.TEXT_DELIMITER) {
+            // any string
+            parsedLength = 1;
+            return ADBDatatype.STRING.getTypeTag();
+        }
+        // any type
+        int typeTagLength = 2;
+        if (textLength < typeTagLength) {
+            throw getErrorReporter().errorInProtocol();
+        }
+        byte parsedTypeTag = getByteFromValidHexChars(textChars[textOffset], textChars[textOffset + 1]);
+        if (parsedTypeTag == ADBDatatype.MISSING.getTypeTag() || parsedTypeTag == ADBDatatype.NULL.getTypeTag()) {
+            parsedLength = typeTagLength;
+            return parsedTypeTag;
+        }
+        int delimiterLength = 1;
+        if (textLength < typeTagLength + delimiterLength) {
+            throw getErrorReporter().errorInProtocol();
+        }
+        if (textChars[textOffset + typeTagLength] != ADBProtocol.TEXT_DELIMITER) {
+            throw getErrorReporter().errorInProtocol();
+        }
+        parsedLength = typeTagLength + delimiterLength;
+        return parsedTypeTag;
+    }
+
+    private byte getByteFromValidHexChars(char c0, char c1) throws SQLException {
+        return (byte) ((getValueFromValidHexChar(c0) << 4) + getValueFromValidHexChar(c1));
+    }
+
+    private int getValueFromValidHexChar(char c) throws SQLException {
+        if (c >= '0' && c <= '9') {
+            return c - '0';
+        }
+        if (c >= 'a' && c <= 'f') {
+            return 10 + c - 'a';
+        }
+        if (c >= 'A' && c <= 'F') {
+            return 10 + c - 'A';
+        }
+        throw getErrorReporter().errorInProtocol(String.valueOf(c));
+    }
+
+    private static int indexOf(char c, char[] array, int begin, int end) {
+        for (int i = begin; i < end; i++) {
+            if (array[i] == c) {
+                return i;
+            }
+        }
+        return -1;
+    }
+
+    private ADBErrorReporter getErrorReporter() {
+        return resultSet.getErrorReporter();
+    }
+}
diff --git a/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBStatement.java b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBStatement.java
new file mode 100644
index 0000000..94ac5a3
--- /dev/null
+++ b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBStatement.java
@@ -0,0 +1,846 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.jdbc.core;
+
+import java.io.IOException;
+import java.lang.ref.Reference;
+import java.lang.ref.WeakReference;
+import java.sql.Connection;
+import java.sql.Date;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.SQLWarning;
+import java.sql.Statement;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.Period;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.BeanDescription;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.SerializationConfig;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.ser.BeanSerializerModifier;
+
+class ADBStatement extends ADBWrapperSupport implements java.sql.Statement {
+
+    static final List<Class<?>> SET_OBJECT_NON_ATOMIC = Arrays.asList(Object[].class, Collection.class, Map.class);
+
+    static final Map<Class<?>, AbstractValueSerializer> SERIALIZER_MAP = createSerializerMap();
+
+    protected final ADBConnection connection;
+    protected final String catalog;
+    protected final String schema;
+
+    protected final AtomicBoolean closed = new AtomicBoolean(false);
+    protected volatile boolean closeOnCompletion;
+
+    protected int queryTimeoutSeconds;
+    protected long maxRows;
+
+    // common result fields
+    protected int updateCount = -1;
+    protected List<ADBProtocol.QueryServiceResponse.Message> warnings;
+
+    // executeQuery() result fields
+    protected final ConcurrentLinkedQueue<ADBResultSet> resultSetsWithResources;
+    protected final ConcurrentLinkedQueue<WeakReference<ADBResultSet>> resultSetsWithoutResources;
+
+    // execute() result field
+    protected ADBProtocol.QueryServiceResponse executeResponse;
+    protected ADBResultSet executeResultSet;
+
+    // Lifecycle
+
+    ADBStatement(ADBConnection connection, String catalog, String schema) {
+        this.connection = Objects.requireNonNull(connection);
+        this.catalog = catalog;
+        this.schema = schema;
+        this.resultSetsWithResources = new ConcurrentLinkedQueue<>();
+        this.resultSetsWithoutResources = new ConcurrentLinkedQueue<>();
+    }
+
+    @Override
+    public void close() throws SQLException {
+        closeImpl(true, true);
+    }
+
+    void closeImpl(boolean closeResultSets, boolean notifyConnection) throws SQLException {
+        boolean wasClosed = closed.getAndSet(true);
+        if (wasClosed) {
+            return;
+        }
+        try {
+            if (closeResultSets) {
+                closeRegisteredResultSets();
+            }
+        } finally {
+            if (notifyConnection) {
+                connection.deregisterStatement(this);
+            }
+        }
+    }
+
+    @Override
+    public void closeOnCompletion() throws SQLException {
+        checkClosed();
+        closeOnCompletion = true;
+    }
+
+    @Override
+    public boolean isCloseOnCompletion() throws SQLException {
+        checkClosed();
+        return closeOnCompletion;
+    }
+
+    @Override
+    public boolean isClosed() {
+        return closed.get();
+    }
+
+    protected void checkClosed() throws SQLException {
+        if (isClosed()) {
+            throw getErrorReporter().errorObjectClosed(Statement.class);
+        }
+    }
+
+    // Execution
+
+    @Override
+    public ADBResultSet executeQuery(String sql) throws SQLException {
+        checkClosed();
+        return executeQueryImpl(sql, null);
+    }
+
+    protected ADBResultSet executeQueryImpl(String sql, List<?> args) throws SQLException {
+        // note: we're not assigning executeResponse field at this method
+        ADBProtocol.QueryServiceResponse response =
+                connection.protocol.submitStatement(sql, args, true, false, queryTimeoutSeconds, catalog, schema);
+        boolean isQuery = connection.protocol.isStatementCategory(response,
+                ADBProtocol.QueryServiceResponse.StatementCategory.QUERY);
+        if (!isQuery) {
+            throw getErrorReporter().errorInvalidStatementCategory();
+        }
+        warnings = connection.protocol.getWarningIfExists(response);
+        updateCount = -1;
+        return fetchResultSet(response);
+    }
+
+    @Override
+    public long executeLargeUpdate(String sql) throws SQLException {
+        checkClosed();
+        return executeUpdateImpl(sql, null);
+    }
+
+    @Override
+    public int executeUpdate(String sql) throws SQLException {
+        checkClosed();
+        return executeUpdateImpl(sql, null);
+    }
+
+    @Override
+    public long executeLargeUpdate(String sql, int autoGeneratedKeys) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(Statement.class, "executeLargeUpdate");
+    }
+
+    @Override
+    public int executeUpdate(String sql, int autoGeneratedKeys) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(Statement.class, "executeUpdate");
+    }
+
+    @Override
+    public long executeLargeUpdate(String sql, int[] columnIndexes) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(Statement.class, "executeLargeUpdate");
+    }
+
+    @Override
+    public int executeUpdate(String sql, int[] columnIndexes) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(Statement.class, "executeUpdate");
+    }
+
+    @Override
+    public long executeLargeUpdate(String sql, String[] columnNames) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(Statement.class, "executeLargeUpdate");
+    }
+
+    @Override
+    public int executeUpdate(String sql, String[] columnNames) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(Statement.class, "executeUpdate");
+    }
+
+    protected int executeUpdateImpl(String sql, List<Object> args) throws SQLException {
+        ADBProtocol.QueryServiceResponse response =
+                connection.protocol.submitStatement(sql, args, false, false, queryTimeoutSeconds, catalog, schema);
+        boolean isQuery = connection.protocol.isStatementCategory(response,
+                ADBProtocol.QueryServiceResponse.StatementCategory.QUERY);
+        // TODO: remove result set on the server (both query and update returning cases)
+        if (isQuery) {
+            throw getErrorReporter().errorInvalidStatementCategory();
+        }
+        warnings = connection.protocol.getWarningIfExists(response);
+        updateCount = connection.protocol.getUpdateCount(response);
+        return updateCount;
+    }
+
+    @Override
+    public boolean execute(String sql) throws SQLException {
+        checkClosed();
+        return executeImpl(sql, null);
+    }
+
+    @Override
+    public boolean execute(String sql, int autoGeneratedKeys) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(Statement.class, "execute");
+    }
+
+    @Override
+    public boolean execute(String sql, int[] columnIndexes) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(Statement.class, "execute");
+    }
+
+    @Override
+    public boolean execute(String sql, String[] columnNames) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(Statement.class, "execute");
+    }
+
+    protected boolean executeImpl(String sql, List<Object> args) throws SQLException {
+        ADBProtocol.QueryServiceResponse response =
+                connection.protocol.submitStatement(sql, args, false, false, queryTimeoutSeconds, catalog, schema);
+        warnings = connection.protocol.getWarningIfExists(response);
+        boolean isQuery = connection.protocol.isStatementCategory(response,
+                ADBProtocol.QueryServiceResponse.StatementCategory.QUERY);
+        if (isQuery) {
+            updateCount = -1;
+            executeResponse = response;
+            return true;
+        } else {
+            updateCount = connection.protocol.getUpdateCount(response);
+            executeResponse = null;
+            return false;
+        }
+    }
+
+    @Override
+    public void cancel() throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(Statement.class, "cancel");
+    }
+
+    @Override
+    public int getQueryTimeout() throws SQLException {
+        checkClosed();
+        return queryTimeoutSeconds;
+    }
+
+    @Override
+    public void setQueryTimeout(int timeoutSeconds) throws SQLException {
+        checkClosed();
+        if (timeoutSeconds < 0) {
+            throw getErrorReporter().errorParameterValueNotSupported("timeoutSeconds");
+        }
+        queryTimeoutSeconds = timeoutSeconds;
+    }
+
+    @Override
+    public void setEscapeProcessing(boolean enable) throws SQLException {
+        checkClosed();
+    }
+
+    // Batch execution
+
+    @Override
+    public long[] executeLargeBatch() throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(Statement.class, "executeLargeBatch");
+    }
+
+    @Override
+    public int[] executeBatch() throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(Statement.class, "executeBatch");
+    }
+
+    @Override
+    public void addBatch(String sql) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(Statement.class, "addBatch");
+    }
+
+    @Override
+    public void clearBatch() throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(Statement.class, "clearBatch");
+    }
+
+    // Result access
+
+    @Override
+    public ADBResultSet getResultSet() throws SQLException {
+        checkClosed();
+        ADBProtocol.QueryServiceResponse response = executeResponse;
+        if (response == null) {
+            return null;
+        }
+        ADBResultSet rs = fetchResultSet(response);
+        executeResultSet = rs;
+        executeResponse = null;
+        return rs;
+    }
+
+    @Override
+    public boolean getMoreResults() throws SQLException {
+        return getMoreResults(Statement.CLOSE_ALL_RESULTS);
+    }
+
+    @Override
+    public boolean getMoreResults(int current) throws SQLException {
+        checkClosed();
+        ADBResultSet rs = executeResultSet;
+        executeResultSet = null;
+        if (rs != null && current != Statement.KEEP_CURRENT_RESULT) {
+            rs.closeImpl(true);
+        }
+        return false;
+    }
+
+    @Override
+    public int getResultSetType() throws SQLException {
+        checkClosed();
+        return ResultSet.TYPE_FORWARD_ONLY;
+    }
+
+    @Override
+    public int getResultSetConcurrency() throws SQLException {
+        checkClosed();
+        return ResultSet.CONCUR_READ_ONLY;
+    }
+
+    @Override
+    public int getResultSetHoldability() throws SQLException {
+        checkClosed();
+        return ADBResultSet.RESULT_SET_HOLDABILITY;
+    }
+
+    @Override
+    public ResultSet getGeneratedKeys() throws SQLException {
+        checkClosed();
+        return createEmptyResultSet();
+    }
+
+    @Override
+    public long getLargeUpdateCount() throws SQLException {
+        checkClosed();
+        return updateCount;
+    }
+
+    @Override
+    public int getUpdateCount() throws SQLException {
+        return (int) getLargeUpdateCount();
+    }
+
+    // ResultSet lifecycle
+
+    private ADBResultSet fetchResultSet(ADBProtocol.QueryServiceResponse execResponse) throws SQLException {
+        List<ADBColumn> columns = connection.protocol.getColumns(execResponse);
+        if (getLogger().isLoggable(Level.FINER)) {
+            getLogger().log(Level.FINE, "result schema " + columns);
+        }
+        if (connection.protocol.isExplainOnly(execResponse)) {
+            AbstractValueSerializer stringSer = getADMFormatSerializer(String.class);
+            ArrayNode explainResult = connection.protocol.fetchExplainOnlyResult(execResponse, stringSer);
+            return createSystemResultSet(columns, explainResult);
+        } else {
+            JsonParser rowParser = connection.protocol.fetchResult(execResponse);
+            return createResultSetImpl(columns, rowParser, true, maxRows);
+        }
+    }
+
+    protected ADBResultSet createSystemResultSet(List<ADBColumn> columns, ArrayNode values) {
+        JsonParser rowParser = connection.protocol.createJsonParser(values);
+        return createResultSetImpl(columns, rowParser, false, 0);
+    }
+
+    protected ADBResultSet createEmptyResultSet() {
+        ArrayNode empty = (ArrayNode) connection.protocol.driverContext.genericObjectReader.createArrayNode();
+        return createSystemResultSet(Collections.emptyList(), empty);
+    }
+
+    private ADBResultSet createResultSetImpl(List<ADBColumn> columns, JsonParser rowParser,
+            boolean rowParserOwnsResources, long maxRows) {
+        ADBResultSetMetaData metadata = new ADBResultSetMetaData(this, columns);
+        ADBResultSet rs = new ADBResultSet(metadata, rowParser, rowParserOwnsResources, maxRows);
+        registerResultSet(rs);
+        return rs;
+    }
+
+    private void registerResultSet(ADBResultSet rs) {
+        if (rs.rowParserOwnsResources) {
+            resultSetsWithResources.add(rs);
+        } else {
+            resultSetsWithoutResources.removeIf(ADBStatement::isEmptyReference);
+            resultSetsWithoutResources.add(new WeakReference<>(rs));
+        }
+    }
+
+    protected void deregisterResultSet(ADBResultSet rs) {
+        if (rs.rowParserOwnsResources) {
+            resultSetsWithResources.remove(rs);
+        } else {
+            resultSetsWithoutResources.removeIf(ref -> {
+                ADBResultSet refrs = ref.get();
+                return refrs == null || refrs == rs;
+            });
+        }
+        if (closeOnCompletion && resultSetsWithResources.isEmpty() && resultSetsWithoutResources.isEmpty()) {
+            try {
+                closeImpl(false, true);
+            } catch (SQLException e) {
+                // this exception shouldn't happen because there are no result sets to close
+                if (getLogger().isLoggable(Level.FINE)) {
+                    getLogger().log(Level.FINE, e.getMessage(), e);
+                }
+            }
+        }
+    }
+
+    private void closeRegisteredResultSets() throws SQLException {
+        SQLException err = null;
+        try {
+            closedRegisteredResultSetsImpl(resultSetsWithResources, Function.identity());
+        } catch (SQLException e) {
+            err = e;
+        }
+        try {
+            closedRegisteredResultSetsImpl(resultSetsWithoutResources, Reference::get);
+        } catch (SQLException e) {
+            if (err != null) {
+                e.addSuppressed(err);
+            }
+            err = e;
+        }
+        if (err != null) {
+            throw err;
+        }
+    }
+
+    private <T> void closedRegisteredResultSetsImpl(Queue<T> queue, Function<T, ADBResultSet> rsAccessor)
+            throws SQLException {
+        SQLException err = null;
+        T item;
+        while ((item = queue.poll()) != null) {
+            ADBResultSet rs = rsAccessor.apply(item);
+            if (rs != null) {
+                try {
+                    rs.closeImpl(false);
+                } catch (SQLException e) {
+                    if (err != null) {
+                        e.addSuppressed(err);
+                    }
+                    err = e;
+                }
+            }
+        }
+        if (err != null) {
+            throw err;
+        }
+    }
+
+    private static boolean isEmptyReference(Reference<ADBResultSet> ref) {
+        return ref.get() == null;
+    }
+
+    // Result control
+
+    @Override
+    public void setLargeMaxRows(long maxRows) throws SQLException {
+        checkClosed();
+        if (maxRows < 0) {
+            throw getErrorReporter().errorParameterValueNotSupported("maxRows");
+        }
+        this.maxRows = maxRows;
+    }
+
+    @Override
+    public void setMaxRows(int maxRows) throws SQLException {
+        setLargeMaxRows(maxRows);
+    }
+
+    @Override
+    public long getLargeMaxRows() throws SQLException {
+        checkClosed();
+        return maxRows;
+    }
+
+    @Override
+    public int getMaxRows() throws SQLException {
+        return (int) getLargeMaxRows();
+    }
+
+    @Override
+    public void setCursorName(String name) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(Statement.class, "setCursorName");
+    }
+
+    // Unsupported hints (ignored)
+
+    @Override
+    public int getFetchDirection() throws SQLException {
+        checkClosed();
+        return ResultSet.FETCH_FORWARD;
+    }
+
+    @Override
+    public void setFetchDirection(int direction) throws SQLException {
+        checkClosed();
+        switch (direction) {
+            case ResultSet.FETCH_FORWARD:
+            case ResultSet.FETCH_REVERSE:
+            case ResultSet.FETCH_UNKNOWN:
+                // ignore this hint
+                break;
+            default:
+                throw getErrorReporter().errorParameterValueNotSupported("direction");
+        }
+    }
+
+    @Override
+    public int getFetchSize() throws SQLException {
+        checkClosed();
+        return 1;
+    }
+
+    @Override
+    public void setFetchSize(int rows) throws SQLException {
+        checkClosed();
+        if (rows < 0) {
+            throw getErrorReporter().errorParameterNotSupported("rows");
+        }
+    }
+
+    @Override
+    public int getMaxFieldSize() throws SQLException {
+        checkClosed();
+        return 0;
+    }
+
+    @Override
+    public void setMaxFieldSize(int maxFieldSize) throws SQLException {
+        throw getErrorReporter().errorMethodNotSupported(Statement.class, "setMaxFieldSize");
+    }
+
+    @Override
+    public boolean isPoolable() throws SQLException {
+        checkClosed();
+        return false;
+    }
+
+    @Override
+    public void setPoolable(boolean poolable) throws SQLException {
+        checkClosed();
+    }
+
+    // Errors and warnings
+
+    @Override
+    public SQLWarning getWarnings() throws SQLException {
+        checkClosed();
+        return warnings != null ? connection.protocol.createSQLWarning(warnings) : null;
+    }
+
+    @Override
+    public void clearWarnings() throws SQLException {
+        checkClosed();
+        warnings = null;
+    }
+
+    @Override
+    protected ADBErrorReporter getErrorReporter() {
+        return connection.getErrorReporter();
+    }
+
+    protected Logger getLogger() {
+        return connection.getLogger();
+    }
+
+    // Ownership
+
+    @Override
+    public Connection getConnection() throws SQLException {
+        checkClosed();
+        return connection;
+    }
+
+    ADBStatement getResultSetStatement(ADBResultSet rs) {
+        return rs.metadata.statement;
+    }
+
+    // Serialization
+
+    static void configureSerialization(SimpleModule serdeModule) {
+        serdeModule.setSerializerModifier(createADMFormatSerializerModifier());
+    }
+
+    static AbstractValueSerializer getADMFormatSerializer(Class<?> cls) {
+        return SERIALIZER_MAP.get(cls);
+    }
+
+    private static BeanSerializerModifier createADMFormatSerializerModifier() {
+        return new BeanSerializerModifier() {
+            @Override
+            public JsonSerializer<?> modifySerializer(SerializationConfig config, BeanDescription beanDesc,
+                    JsonSerializer<?> serializer) {
+                Class<?> cls = beanDesc.getClassInfo().getAnnotated();
+                if (isSetObjectCompatible(cls)) {
+                    AbstractValueSerializer ser = getADMFormatSerializer(cls);
+                    return ser != null ? ser : super.modifySerializer(config, beanDesc, serializer);
+                } else {
+                    return null;
+                }
+            }
+        };
+    }
+
+    static boolean isSetObjectCompatible(Class<?> cls) {
+        if (ADBRowStore.OBJECT_ACCESSORS_ATOMIC.containsKey(cls)) {
+            return true;
+        }
+        for (Class<?> aClass : SET_OBJECT_NON_ATOMIC) {
+            if (aClass.isAssignableFrom(cls)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private static Map<Class<?>, AbstractValueSerializer> createSerializerMap() {
+        Map<Class<?>, AbstractValueSerializer> serializerMap = new HashMap<>();
+        registerSerializer(serializerMap, createGenericSerializer(Byte.class, ADBDatatype.TINYINT));
+        registerSerializer(serializerMap, createGenericSerializer(Short.class, ADBDatatype.SMALLINT));
+        registerSerializer(serializerMap, createGenericSerializer(Integer.class, ADBDatatype.INTEGER));
+        registerSerializer(serializerMap, createGenericSerializer(UUID.class, ADBDatatype.UUID));
+        // Long is serialized as JSON number by Jackson
+        registerSerializer(serializerMap, createFloatSerializer());
+        registerSerializer(serializerMap, createDoubleSerializer());
+        registerSerializer(serializerMap, createStringSerializer());
+        registerSerializer(serializerMap, createSqlDateSerializer());
+        registerSerializer(serializerMap, createLocalDateSerializer());
+        registerSerializer(serializerMap, createSqlTimeSerializer());
+        registerSerializer(serializerMap, createLocalTimeSerializer());
+        registerSerializer(serializerMap, createSqlTimestampSerializer());
+        registerSerializer(serializerMap, createInstantSerializer());
+        registerSerializer(serializerMap, createPeriodSerializer());
+        registerSerializer(serializerMap, createDurationSerializer());
+        return serializerMap;
+    }
+
+    private static void registerSerializer(Map<Class<?>, AbstractValueSerializer> map,
+            AbstractValueSerializer serializer) {
+        map.put(serializer.getJavaType(), serializer);
+    }
+
+    private static ATaggedValueSerializer createGenericSerializer(Class<?> javaType, ADBDatatype ADBDatatype) {
+        return new ATaggedValueSerializer(javaType, ADBDatatype) {
+            @Override
+            protected void serializeNonTaggedValue(Object value, StringBuilder out) {
+                out.append(value);
+            }
+        };
+    }
+
+    private static AbstractValueSerializer createStringSerializer() {
+        return new AbstractValueSerializer(java.lang.String.class) {
+            @Override
+            public void serialize(Object value, JsonGenerator gen, SerializerProvider serializers) throws IOException {
+                gen.writeString(serializeToString(value));
+            }
+
+            @Override
+            protected String serializeToString(Object value) {
+                return ADBProtocol.TEXT_DELIMITER + String.valueOf(value);
+            }
+        };
+    }
+
+    private static ATaggedValueSerializer createFloatSerializer() {
+        return new ATaggedValueSerializer(Float.class, ADBDatatype.FLOAT) {
+            @Override
+            protected void serializeNonTaggedValue(Object value, StringBuilder out) {
+                int bits = Float.floatToIntBits((Float) value);
+                out.append((long) bits);
+            }
+        };
+    }
+
+    private static ATaggedValueSerializer createDoubleSerializer() {
+        return new ATaggedValueSerializer(Double.class, ADBDatatype.DOUBLE) {
+            @Override
+            protected void serializeNonTaggedValue(Object value, StringBuilder out) {
+                long bits = Double.doubleToLongBits((Double) value);
+                out.append(bits);
+            }
+        };
+    }
+
+    private static ATaggedValueSerializer createSqlDateSerializer() {
+        return new ATaggedValueSerializer(java.sql.Date.class, ADBDatatype.DATE) {
+            @Override
+            protected void serializeNonTaggedValue(Object value, StringBuilder out) {
+                long millis = ((Date) value).getTime();
+                out.append(millis);
+            }
+        };
+    }
+
+    private static ATaggedValueSerializer createLocalDateSerializer() {
+        return new ATaggedValueSerializer(java.time.LocalDate.class, ADBDatatype.DATE) {
+            @Override
+            protected void serializeNonTaggedValue(Object value, StringBuilder out) {
+                long millis = TimeUnit.DAYS.toMillis(((LocalDate) value).toEpochDay());
+                out.append(millis);
+            }
+        };
+    }
+
+    private static ATaggedValueSerializer createSqlTimeSerializer() {
+        return new ATaggedValueSerializer(java.sql.Time.class, ADBDatatype.TIME) {
+            @Override
+            protected void serializeNonTaggedValue(Object value, StringBuilder out) {
+                long millis = ((Time) value).getTime();
+                out.append(millis);
+            }
+        };
+    }
+
+    private static ATaggedValueSerializer createLocalTimeSerializer() {
+        return new ATaggedValueSerializer(java.time.LocalTime.class, ADBDatatype.TIME) {
+            @Override
+            protected void serializeNonTaggedValue(Object value, StringBuilder out) {
+                long millis = TimeUnit.NANOSECONDS.toMillis(((LocalTime) value).toNanoOfDay());
+                out.append(millis);
+            }
+        };
+    }
+
+    private static ATaggedValueSerializer createSqlTimestampSerializer() {
+        return new ATaggedValueSerializer(java.sql.Timestamp.class, ADBDatatype.DATETIME) {
+            @Override
+            protected void serializeNonTaggedValue(Object value, StringBuilder out) {
+                long millis = ((Timestamp) value).getTime();
+                out.append(millis);
+            }
+        };
+    }
+
+    private static ATaggedValueSerializer createInstantSerializer() {
+        return new ATaggedValueSerializer(java.time.Instant.class, ADBDatatype.DATETIME) {
+            @Override
+            protected void serializeNonTaggedValue(Object value, StringBuilder out) {
+                long millis = ((Instant) value).toEpochMilli();
+                out.append(millis);
+            }
+        };
+    }
+
+    private static ATaggedValueSerializer createPeriodSerializer() {
+        return new ATaggedValueSerializer(java.time.Period.class, ADBDatatype.YEARMONTHDURATION) {
+            @Override
+            protected void serializeNonTaggedValue(Object value, StringBuilder out) {
+                long months = ((Period) value).toTotalMonths();
+                out.append(months);
+            }
+        };
+    }
+
+    private static ATaggedValueSerializer createDurationSerializer() {
+        return new ATaggedValueSerializer(java.time.Duration.class, ADBDatatype.DAYTIMEDURATION) {
+            @Override
+            protected void serializeNonTaggedValue(Object value, StringBuilder out) {
+                long millis = ((Duration) value).toMillis();
+                out.append(millis);
+            }
+        };
+    }
+
+    static abstract class AbstractValueSerializer extends JsonSerializer<Object> {
+
+        protected final Class<?> javaType;
+
+        protected AbstractValueSerializer(Class<?> javaType) {
+            this.javaType = Objects.requireNonNull(javaType);
+        }
+
+        protected Class<?> getJavaType() {
+            return javaType;
+        }
+
+        abstract String serializeToString(Object value);
+    }
+
+    private static abstract class ATaggedValueSerializer extends AbstractValueSerializer {
+
+        protected final ADBDatatype adbType;
+
+        protected ATaggedValueSerializer(Class<?> javaType, ADBDatatype adbType) {
+            super(javaType);
+            this.adbType = Objects.requireNonNull(adbType);
+        }
+
+        @Override
+        public void serialize(Object value, JsonGenerator gen, SerializerProvider serializers) throws IOException {
+            gen.writeString(serializeToString(value)); // TODO:optimize?
+        }
+
+        protected final String serializeToString(Object value) {
+            StringBuilder textBuilder = new StringBuilder(64); // TODO:optimize?
+            printByteAsHex(adbType.getTypeTag(), textBuilder);
+            textBuilder.append(ADBProtocol.TEXT_DELIMITER);
+            serializeNonTaggedValue(value, textBuilder);
+            return textBuilder.toString();
+        }
+
+        protected abstract void serializeNonTaggedValue(Object value, StringBuilder out);
+
+        private static void printByteAsHex(byte b, StringBuilder out) {
+            out.append(hex((b >>> 4) & 0x0f));
+            out.append(hex(b & 0x0f));
+        }
+
+        private static char hex(int i) {
+            return (char) (i + (i < 10 ? '0' : ('A' - 10)));
+        }
+    }
+}
diff --git a/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBWrapperSupport.java b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBWrapperSupport.java
new file mode 100644
index 0000000..30cbfd0
--- /dev/null
+++ b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBWrapperSupport.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.jdbc.core;
+
+import java.sql.SQLException;
+import java.sql.Wrapper;
+
+abstract class ADBWrapperSupport implements Wrapper {
+
+    @Override
+    public final boolean isWrapperFor(Class<?> iface) {
+        return iface.isInstance(this);
+    }
+
+    @Override
+    public final <T> T unwrap(Class<T> iface) throws SQLException {
+        if (!iface.isInstance(this)) {
+            throw getErrorReporter().errorUnwrapTypeMismatch(iface);
+        }
+        return iface.cast(this);
+    }
+
+    protected abstract ADBErrorReporter getErrorReporter();
+}
diff --git a/asterixdb-jdbc/asterix-jdbc-driver/pom.xml b/asterixdb-jdbc/asterix-jdbc-driver/pom.xml
new file mode 100644
index 0000000..afac089
--- /dev/null
+++ b/asterixdb-jdbc/asterix-jdbc-driver/pom.xml
@@ -0,0 +1,137 @@
+<!--
+ ! Licensed to the Apache Software Foundation (ASF) under one
+ ! or more contributor license agreements.  See the NOTICE file
+ ! distributed with this work for additional information
+ ! regarding copyright ownership.  The ASF licenses this file
+ ! to you under the Apache License, Version 2.0 (the
+ ! "License"); you may not use this file except in compliance
+ ! with the License.  You may obtain a copy of the License at
+ !
+ !   http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing,
+ ! software distributed under the License is distributed on an
+ ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ! KIND, either express or implied.  See the License for the
+ ! specific language governing permissions and limitations
+ ! under the License.
+ !-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <artifactId>apache-asterixdb</artifactId>
+    <groupId>org.apache.asterix</groupId>
+    <version>0.9.7-SNAPSHOT</version>
+  </parent>
+  <artifactId>asterix-jdbc-driver</artifactId>
+
+  <licenses>
+    <license>
+      <name>Apache License, Version 2.0</name>
+      <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+      <distribution>repo</distribution>
+      <comments>A business-friendly OSS license</comments>
+    </license>
+  </licenses>
+
+  <properties>
+    <root.dir>${basedir}/..</root.dir>
+    <source.jdk.version>1.8</source.jdk.version>
+    <target.jdk.version>1.8</target.jdk.version>
+    <source.java.package.path>org/apache/asterix/jdbc/</source.java.package.path>
+    <implementation.title>Apache AsterixDB JDBC Driver</implementation.title>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.asterix</groupId>
+      <artifactId>asterix-jdbc-core</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <configuration>
+          <archive>
+            <manifestSections>
+              <manifestSection>
+                <name>${source.java.package.path}</name>
+                <manifestEntries>
+                  <Implementation-Title>${implementation.title}</Implementation-Title>
+                  <Implementation-Version>${project.version}</Implementation-Version>
+                  <Implementation-Vendor>${project.organization.name}</Implementation-Vendor>
+                </manifestEntries>
+              </manifestSection>
+            </manifestSections>
+          </archive>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-shade-plugin</artifactId>
+        <executions>
+          <execution>
+            <goals>
+              <goal>shade</goal>
+            </goals>
+            <configuration>
+              <shadedArtifactAttached>true</shadedArtifactAttached>
+              <shadedClassifierName>dist</shadedClassifierName>
+              <transformers>
+                <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer"/>
+                <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer">
+                  <addHeader>false</addHeader>
+                </transformer>
+                <transformer implementation="org.apache.maven.plugins.shade.resource.PluginXmlResourceTransformer"/>
+                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+              </transformers>
+              <filters>
+                <filter>
+                  <artifact>org.apache.asterix:asterix-jdbc-core</artifact>
+                  <excludes>
+                    <exclude>META-INF/MANIFEST.MF</exclude>
+                  </excludes>
+                </filter>
+                <filter>
+                  <artifact>org.apache.httpcomponents:*</artifact>
+                  <excludes>
+                    <exclude>module-info.class</exclude>
+                    <exclude>META-INF/DEPENDENCIES</exclude>
+                    <exclude>META-INF/MANIFEST.MF</exclude>
+                  </excludes>
+                </filter>
+                <filter>
+                  <artifact>commons-logging:*</artifact>
+                  <excludes>
+                    <exclude>module-info.class</exclude>
+                    <exclude>META-INF/MANIFEST.MF</exclude>
+                  </excludes>
+                </filter>
+                <filter>
+                  <artifact>commons-codec:*</artifact>
+                  <excludes>
+                    <exclude>module-info.class</exclude>
+                    <exclude>META-INF/MANIFEST.MF</exclude>
+                  </excludes>
+                </filter>
+                <filter>
+                  <artifact>com.fasterxml.jackson.core:*</artifact>
+                  <excludes>
+                    <exclude>module-info.class</exclude>
+                    <exclude>META-INF/MANIFEST.MF</exclude>
+                  </excludes>
+                </filter>
+              </filters>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+</project>
diff --git a/asterixdb-jdbc/asterix-jdbc-driver/src/main/java/org/apache/asterix/jdbc/Driver.java b/asterixdb-jdbc/asterix-jdbc-driver/src/main/java/org/apache/asterix/jdbc/Driver.java
new file mode 100644
index 0000000..4d175ed
--- /dev/null
+++ b/asterixdb-jdbc/asterix-jdbc-driver/src/main/java/org/apache/asterix/jdbc/Driver.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.jdbc;
+
+import org.apache.asterix.jdbc.core.ADBDriverBase;
+
+public final class Driver extends ADBDriverBase implements java.sql.Driver {
+
+    private static final String DRIVER_SCHEME = "asterixdb:";
+
+    private static final int DEFAULT_API_PORT = 19002;
+
+    static {
+        setupLogging(Driver.class);
+        registerDriver(new Driver());
+    }
+
+    public Driver() {
+        super(DRIVER_SCHEME, DEFAULT_API_PORT);
+    }
+}
diff --git a/asterixdb-jdbc/asterix-jdbc-driver/src/main/resources/META-INF/services/java.sql.Driver b/asterixdb-jdbc/asterix-jdbc-driver/src/main/resources/META-INF/services/java.sql.Driver
new file mode 100644
index 0000000..626a039
--- /dev/null
+++ b/asterixdb-jdbc/asterix-jdbc-driver/src/main/resources/META-INF/services/java.sql.Driver
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+org.apache.asterix.jdbc.Driver
\ No newline at end of file
