[ASTERIXDB-3390][STO]: Support GCS for cloud deployment

Change-Id: I0b9cad99de2d32d1e672a3d396897807687685e8
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18253
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Wail Alkowaileet <wael.y.k@gmail.com>
Reviewed-by: Ian Maxon <imaxon@uci.edu>
diff --git a/asterixdb/asterix-app/pom.xml b/asterixdb/asterix-app/pom.xml
index dbca45f..27be120 100644
--- a/asterixdb/asterix-app/pom.xml
+++ b/asterixdb/asterix-app/pom.xml
@@ -107,6 +107,47 @@
         </executions>
       </plugin>
       <plugin>
+        <groupId>com.googlecode.maven-download-plugin</groupId>
+        <artifactId>download-maven-plugin</artifactId>
+        <version>1.4.2</version>
+        <executions>
+          <execution>
+            <id>install-fake-gcs</id>
+            <phase>${gcs.download.stage}</phase>
+            <goals>
+              <goal>wget</goal>
+            </goals>
+            <configuration>
+              <url>https://github.com/fsouza/fake-gcs-server/releases/download/v1.48.0/fake-gcs-server_1.48.0_Linux_amd64.tar.gz</url>
+              <outputFileName>fake-gcs-server_1.48.0_Linux_amd64.tar.gz</outputFileName>
+              <outputDirectory>${project.build.directory}</outputDirectory>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-antrun-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>extract-gcs</id>
+            <phase>${gcs.install.stage}</phase>
+            <configuration>
+              <target>
+                <echo message="Extracting fake-gcs-server" />
+                <mkdir dir="${project.build.directory}/fake-gcs-server" />
+                <gunzip src="${project.build.directory}/fake-gcs-server_1.48.0_Linux_amd64.tar.gz" dest="${project.build.directory}/fake-gcs-server_1.48.0_Linux_amd64.tar" />
+                <untar src="${project.build.directory}/fake-gcs-server_1.48.0_Linux_amd64.tar" dest="${project.build.directory}/fake-gcs-server" />
+                <chmod file="${project.build.directory}/fake-gcs-server/fake-gcs-server" perm="ugo+rx" />
+              </target>
+            </configuration>
+            <goals>
+              <goal>run</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
         <groupId>org.apache.asterix</groupId>
         <artifactId>asterix-test-datagenerator-maven-plugin</artifactId>
         <version>${asterix-test-datagenerator-maven-plugin.version}</version>
@@ -274,6 +315,30 @@
               <outputFile>${project.build.directory}/azurite/logs/azurite.log</outputFile>
             </configuration>
           </execution>
+          <execution>
+            <id>fake-gcs-server</id>
+            <phase>${gcs.stage}</phase>
+            <goals>
+              <goal>exec</goal>
+            </goals>
+            <configuration>
+              <executable>${project.build.directory}/fake-gcs-server/fake-gcs-server</executable>
+              <workingDirectory>${project.build.directory}/fake-gcs-server</workingDirectory>
+              <arguments>
+                <argument>-port</argument>
+                <argument>4443</argument>
+                <argument>-scheme</argument>
+                <argument>http</argument>
+                <argument>-host</argument>
+                <argument>127.0.0.1</argument>
+                <argument>-log-level</argument>
+                <argument>error</argument>
+                <argument>-filesystem-root</argument>
+                <argument>${project.build.directory}/fake-gcs-server/storage</argument>
+              </arguments>
+              <async>true</async>
+            </configuration>
+          </execution>
         </executions>
       </plugin>
       <plugin>
@@ -448,7 +513,7 @@
     <profile>
       <id>asterix-gerrit-asterix-app</id>
       <properties>
-        <test.excludes>**/CloudStorageTest.java,**/SqlppExecutionWithCancellationTest.java,**/DmlTest.java,**/RepeatedTest.java,**/SqlppExecutionTest.java,**/SqlppExecutionColumnTest.java,**/*StaticPartitioning*Test.java,**/*Ssl*Test.java,**/Podman*.java,**/*AnalyzedExecutionTest.java,**/SqlppProfiledExecutionTest.java,**/CloudPythonTest.java</test.excludes>
+        <test.excludes>**/CloudStorageTest.java,**/CloudStorageGCSTest.java,**/SqlppExecutionWithCancellationTest.java,**/DmlTest.java,**/RepeatedTest.java,**/SqlppExecutionTest.java,**/SqlppExecutionColumnTest.java,**/*StaticPartitioning*Test.java,**/*Ssl*Test.java,**/Podman*.java,**/*AnalyzedExecutionTest.java,**/SqlppProfiledExecutionTest.java,**/CloudPythonTest.java</test.excludes>
         <itest.excludes>**/*.java</itest.excludes>
       </properties>
       <build>
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageGCSTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageGCSTest.java
new file mode 100644
index 0000000..89a4781
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageGCSTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.cloud_storage;
+
+import static org.apache.asterix.api.common.LocalCloudUtil.CLOUD_STORAGE_BUCKET;
+import static org.apache.asterix.api.common.LocalCloudUtil.MOCK_SERVER_REGION;
+
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.asterix.common.config.GlobalConfig;
+import org.apache.asterix.test.common.TestExecutor;
+import org.apache.asterix.test.runtime.LangExecutionUtil;
+import org.apache.asterix.testframework.context.TestCaseContext;
+import org.apache.asterix.testframework.xml.Description;
+import org.apache.asterix.testframework.xml.TestCase;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.junit.AfterClass;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.FixMethodOrder;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.MethodSorters;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import com.google.cloud.NoCredentials;
+import com.google.cloud.storage.Blob;
+import com.google.cloud.storage.BucketInfo;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.StorageClass;
+import com.google.cloud.storage.StorageOptions;
+
+/**
+ * Run tests in cloud deployment environment
+ */
+@RunWith(Parameterized.class)
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+public class CloudStorageGCSTest {
+
+    private static final Logger LOGGER = LogManager.getLogger();
+
+    private final TestCaseContext tcCtx;
+    private static final String SUITE_TESTS = "testsuite_cloud_storage.xml";
+    private static final String ONLY_TESTS = "testsuite_cloud_storage_only.xml";
+    private static final String CONFIG_FILE_NAME = "src/test/resources/cc-cloud-storage-gcs.conf";
+    private static final String DELTA_RESULT_PATH = "results_cloud";
+    private static final String EXCLUDED_TESTS = "MP";
+    public static final String MOCK_SERVER_HOSTNAME = "http://127.0.0.1:4443";
+    private static final String MOCK_SERVER_PROJECT_ID = "asterixdb-gcs-test-project-id";
+
+    public CloudStorageGCSTest(TestCaseContext tcCtx) {
+        this.tcCtx = tcCtx;
+    }
+
+    @BeforeClass
+    public static void setUp() throws Exception {
+        Storage storage = StorageOptions.newBuilder().setHost(MOCK_SERVER_HOSTNAME)
+                .setCredentials(NoCredentials.getInstance()).setProjectId(MOCK_SERVER_PROJECT_ID).build().getService();
+        cleanup(storage);
+        initialize(storage);
+        storage.close();
+        TestExecutor testExecutor = new TestExecutor(DELTA_RESULT_PATH);
+        testExecutor.executorId = "cloud";
+        testExecutor.stripSubstring = "//DB:";
+        LangExecutionUtil.setUp(CONFIG_FILE_NAME, testExecutor);
+        System.setProperty(GlobalConfig.CONFIG_FILE_PROPERTY, CONFIG_FILE_NAME);
+    }
+
+    @AfterClass
+    public static void tearDown() throws Exception {
+        LangExecutionUtil.tearDown();
+    }
+
+    @Parameters(name = "CloudStorageGCSTest {index}: {0}")
+    public static Collection<Object[]> tests() throws Exception {
+        return LangExecutionUtil.tests(ONLY_TESTS, SUITE_TESTS);
+    }
+
+    @Test
+    public void test() throws Exception {
+        List<TestCase.CompilationUnit> cu = tcCtx.getTestCase().getCompilationUnit();
+        Assume.assumeTrue(cu.size() > 1 || !EXCLUDED_TESTS.equals(getText(cu.get(0).getDescription())));
+        LangExecutionUtil.test(tcCtx);
+    }
+
+    private static String getText(Description description) {
+        return description == null ? "" : description.getValue();
+    }
+
+    private static void cleanup(Storage storage) {
+        try {
+            Iterable<Blob> blobs = storage.list(CLOUD_STORAGE_BUCKET).iterateAll();
+            blobs.forEach(Blob::delete);
+            storage.delete(CLOUD_STORAGE_BUCKET);
+        } catch (Exception ex) {
+            // ignore
+        }
+    }
+
+    private static void initialize(Storage storage) {
+        storage.create(BucketInfo.newBuilder(CLOUD_STORAGE_BUCKET).setStorageClass(StorageClass.STANDARD)
+                .setLocation(MOCK_SERVER_REGION).build());
+    }
+}
diff --git a/asterixdb/asterix-app/src/test/resources/cc-cloud-storage-gcs.conf b/asterixdb/asterix-app/src/test/resources/cc-cloud-storage-gcs.conf
new file mode 100644
index 0000000..d0ebd24
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/cc-cloud-storage-gcs.conf
@@ -0,0 +1,72 @@
+; Licensed to the Apache Software Foundation (ASF) under one
+; or more contributor license agreements.  See the NOTICE file
+; distributed with this work for additional information
+; regarding copyright ownership.  The ASF licenses this file
+; to you under the Apache License, Version 2.0 (the
+; "License"); you may not use this file except in compliance
+; with the License.  You may obtain a copy of the License at
+;
+;   http://www.apache.org/licenses/LICENSE-2.0
+;
+; Unless required by applicable law or agreed to in writing,
+; software distributed under the License is distributed on an
+; "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+; KIND, either express or implied.  See the License for the
+; specific language governing permissions and limitations
+; under the License.
+
+[nc/asterix_nc1]
+txn.log.dir=target/tmp/asterix_nc1/txnlog
+core.dump.dir=target/tmp/asterix_nc1/coredump
+iodevices=target/tmp/asterix_nc1/iodevice1
+iodevices=../asterix-server/target/tmp/asterix_nc1/iodevice2
+nc.api.port=19004
+#jvm.args=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5006
+
+[nc/asterix_nc2]
+ncservice.port=9091
+txn.log.dir=target/tmp/asterix_nc2/txnlog
+core.dump.dir=target/tmp/asterix_nc2/coredump
+iodevices=target/tmp/asterix_nc2/iodevice1,../asterix-server/target/tmp/asterix_nc2/iodevice2
+nc.api.port=19005
+#jvm.args=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5007
+
+[nc]
+credential.file=src/test/resources/security/passwd
+python.cmd.autolocate=true
+python.env=FOO=BAR=BAZ,BAR=BAZ
+address=127.0.0.1
+command=asterixnc
+app.class=org.apache.asterix.hyracks.bootstrap.NCApplication
+jvm.args=-Xmx4096m -Dnode.Resolver="org.apache.asterix.external.util.IdentitiyResolverFactory"
+storage.buffercache.size=128MB
+storage.memorycomponent.globalbudget=512MB
+
+[cc]
+address = 127.0.0.1
+app.class=org.apache.asterix.hyracks.bootstrap.CCApplication
+heartbeat.period=2000
+heartbeat.max.misses=25
+credential.file=src/test/resources/security/passwd
+
+[common]
+log.dir = logs/
+log.level = INFO
+compiler.framesize=32KB
+compiler.sortmemory=320KB
+compiler.groupmemory=160KB
+compiler.joinmemory=256KB
+compiler.textsearchmemory=160KB
+compiler.windowmemory=192KB
+compiler.internal.sanitycheck=true
+messaging.frame.size=4096
+messaging.frame.count=512
+cloud.deployment=true
+storage.buffercache.pagesize=32KB
+storage.partitioning=static
+cloud.storage.scheme=gcs
+cloud.storage.bucket=cloud-storage-container
+cloud.storage.region=us-west2
+cloud.storage.endpoint=http://127.0.0.1:4443
+cloud.storage.anonymous.auth=true
+cloud.storage.cache.policy=lazy
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
index 9893336..54c32ef 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
@@ -79,7 +79,7 @@
     <test-case FilePath="copy-to/negative">
       <compilation-unit name="supported-adapter-format-compression">
         <output-dir compare="Text">supported-adapter-format-compression</output-dir>
-        <expected-error>ASX1188: Unsupported writing adapter 'AZUREBLOB'. Supported adapters: [localfs, s3]</expected-error>
+        <expected-error>ASX1188: Unsupported writing adapter 'AZUREBLOB'. Supported adapters: [gcs, localfs, s3]</expected-error>
         <expected-error>ASX1189: Unsupported writing format 'csv'. Supported formats: [json]</expected-error>
         <expected-error>ASX1096: Unknown compression scheme rar. Supported schemes are [gzip]</expected-error>
       </compilation-unit>
diff --git a/asterixdb/asterix-cloud/pom.xml b/asterixdb/asterix-cloud/pom.xml
index 3843267..36cb039 100644
--- a/asterixdb/asterix-cloud/pom.xml
+++ b/asterixdb/asterix-cloud/pom.xml
@@ -61,6 +61,77 @@
           </execution>
         </executions>
       </plugin>
+      <plugin>
+        <groupId>com.googlecode.maven-download-plugin</groupId>
+        <artifactId>download-maven-plugin</artifactId>
+        <version>1.4.2</version>
+        <executions>
+          <execution>
+            <id>install-fake-gcs</id>
+            <phase>${gcs.download.stage}</phase>
+            <goals>
+              <goal>wget</goal>
+            </goals>
+            <configuration>
+              <url>https://github.com/fsouza/fake-gcs-server/releases/download/v1.48.0/fake-gcs-server_1.48.0_Linux_amd64.tar.gz</url>
+              <outputFileName>fake-gcs-server_1.48.0_Linux_amd64.tar.gz</outputFileName>
+              <outputDirectory>${project.build.directory}</outputDirectory>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-antrun-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>extract-gcs</id>
+            <phase>${gcs.install.stage}</phase>
+            <configuration>
+              <target>
+                <echo message="Extracting fake-gcs-server" />
+                <mkdir dir="${project.build.directory}/fake-gcs-server" />
+                <gunzip src="${project.build.directory}/fake-gcs-server_1.48.0_Linux_amd64.tar.gz" dest="${project.build.directory}/fake-gcs-server_1.48.0_Linux_amd64.tar" />
+                <untar src="${project.build.directory}/fake-gcs-server_1.48.0_Linux_amd64.tar" dest="${project.build.directory}/fake-gcs-server" />
+                <chmod file="${project.build.directory}/fake-gcs-server/fake-gcs-server" perm="ugo+rx" />
+              </target>
+            </configuration>
+            <goals>
+              <goal>run</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+      <groupId>org.codehaus.mojo</groupId>
+      <artifactId>exec-maven-plugin</artifactId>
+        <executions>
+          <execution>
+          <id>fake-gcs-server</id>
+          <phase>${gcs.stage}</phase>
+          <goals>
+            <goal>exec</goal>
+          </goals>
+          <configuration>
+            <executable>${project.build.directory}/fake-gcs-server/fake-gcs-server</executable>
+            <workingDirectory>${project.build.directory}/fake-gcs-server</workingDirectory>
+            <arguments>
+              <argument>-port</argument>
+              <argument>4443</argument>
+              <argument>-scheme</argument>
+              <argument>http</argument>
+              <argument>-host</argument>
+              <argument>127.0.0.1</argument>
+              <argument>-log-level</argument>
+              <argument>error</argument>
+              <argument>-filesystem-root</argument>
+              <argument>${project.build.directory}/fake-gcs-server/storage</argument>
+            </arguments>
+            <async>true</async>
+          </configuration>
+          </execution>
+        </executions>
+      </plugin>
     </plugins>
   </build>
 
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/IBulkOperationCallBack.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/IBulkOperationCallBack.java
index f0b336d..a4223eb 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/IBulkOperationCallBack.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/IBulkOperationCallBack.java
@@ -20,8 +20,9 @@
 
 import java.util.List;
 
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
 
 public interface IBulkOperationCallBack {
-    void call(List<FileReference> fileReferences);
+    void call(List<FileReference> fileReferences) throws HyracksDataException;
 }
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudClientProvider.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudClientProvider.java
index 21450c4..cc511c7 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudClientProvider.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudClientProvider.java
@@ -20,7 +20,10 @@
 
 import org.apache.asterix.cloud.clients.aws.s3.S3ClientConfig;
 import org.apache.asterix.cloud.clients.aws.s3.S3CloudClient;
+import org.apache.asterix.cloud.clients.google.gcs.GCSClientConfig;
+import org.apache.asterix.cloud.clients.google.gcs.GCSCloudClient;
 import org.apache.asterix.common.config.CloudProperties;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public class CloudClientProvider {
 
@@ -28,11 +31,14 @@
         throw new AssertionError("do not instantiate");
     }
 
-    public static ICloudClient getClient(CloudProperties cloudProperties) {
+    public static ICloudClient getClient(CloudProperties cloudProperties) throws HyracksDataException {
         String storageScheme = cloudProperties.getStorageScheme();
         if ("s3".equalsIgnoreCase(storageScheme)) {
             S3ClientConfig config = S3ClientConfig.of(cloudProperties);
             return new S3CloudClient(config);
+        } else if ("gcs".equalsIgnoreCase(storageScheme)) {
+            GCSClientConfig config = GCSClientConfig.of(cloudProperties);
+            return new GCSCloudClient(config);
         }
         throw new IllegalStateException("unsupported cloud storage scheme: " + storageScheme);
     }
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudBufferedWriter.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudBufferedWriter.java
index 16e79f9..6b51964 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudBufferedWriter.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudBufferedWriter.java
@@ -31,7 +31,7 @@
      * @param length length
      * @return amount uploaded
      */
-    int upload(InputStream stream, int length);
+    int upload(InputStream stream, int length) throws HyracksDataException;
 
     /**
      * Checks whether the writer has not written anything
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java
index 7941ada..2bd0802 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java
@@ -139,7 +139,7 @@
      * @param ioManager local {@link IOManager}
      * @return an instance of a new parallel downloader
      */
-    IParallelDownloader createParallelDownloader(String bucket, IOManager ioManager);
+    IParallelDownloader createParallelDownloader(String bucket, IOManager ioManager) throws HyracksDataException;
 
     /**
      * Produces a {@link JsonNode} that contains information about the stored objects in the cloud
@@ -153,5 +153,5 @@
     /**
      * Performs any necessary closing and cleaning up
      */
-    void close();
+    void close() throws HyracksDataException;
 }
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/IParallelDownloader.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/IParallelDownloader.java
index 32d0a74..184e015 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/IParallelDownloader.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/IParallelDownloader.java
@@ -43,5 +43,5 @@
     /**
      * Close the downloader and release all of its resources
      */
-    void close();
+    void close() throws HyracksDataException;
 }
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSBufferedWriter.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSBufferedWriter.java
new file mode 100644
index 0000000..4f9d437
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSBufferedWriter.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.clients.google.gcs;
+
+import static org.apache.asterix.cloud.CloudResettableInputStream.MIN_BUFFER_SIZE;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.asterix.cloud.clients.ICloudBufferedWriter;
+import org.apache.asterix.cloud.clients.profiler.IRequestProfiler;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import com.google.cloud.WriteChannel;
+import com.google.cloud.storage.BlobId;
+import com.google.cloud.storage.BlobInfo;
+import com.google.cloud.storage.Storage;
+
+public class GCSBufferedWriter implements ICloudBufferedWriter {
+    private static final Logger LOGGER = LogManager.getLogger();
+    private final String bucket;
+    private final String path;
+    private final IRequestProfiler profiler;
+    private final Storage gcsClient;
+    private boolean uploadStarted = false;
+    private int partNumber;
+    private WriteChannel writer = null;
+
+    public GCSBufferedWriter(String bucket, String path, Storage gcsClient, IRequestProfiler profiler) {
+        this.bucket = bucket;
+        this.path = path;
+        this.profiler = profiler;
+        this.gcsClient = gcsClient;
+    }
+
+    @Override
+    public int upload(InputStream stream, int length) throws HyracksDataException {
+        profiler.objectMultipartUpload();
+        setUploadId();
+        try {
+            ByteBuffer buffer = ByteBuffer.wrap(stream.readNBytes(length));
+            while (buffer.hasRemaining()) {
+                writer.write(buffer);
+            }
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+        return partNumber++;
+    }
+
+    @Override
+    public boolean isEmpty() {
+        return !uploadStarted;
+    }
+
+    @Override
+    public void finish() throws HyracksDataException {
+        if (!uploadStarted) {
+            throw new IllegalStateException("Cannot finish without writing any bytes");
+        }
+        profiler.objectMultipartUpload();
+        try {
+            writer.close();
+            writer = null;
+            uploadStarted = false;
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+        log("FINISHED");
+    }
+
+    @Override
+    public void abort() {
+        // https://github.com/googleapis/java-storage/issues/202
+        // Cannot abort. Upload Ids and data are discarded after a week
+        writer = null;
+        LOGGER.warn("Multipart upload for {} was aborted", path);
+    }
+
+    private void setUploadId() {
+        if (!uploadStarted) {
+            uploadStarted = true;
+            partNumber = 1;
+            writer = gcsClient.writer(BlobInfo.newBuilder(BlobId.of(bucket, path)).build());
+            writer.setChunkSize(MIN_BUFFER_SIZE);
+            log("STARTED");
+        }
+    }
+
+    private void log(String op) {
+        if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug("{} multipart upload for {}", op, path);
+        }
+    }
+}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSClientConfig.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSClientConfig.java
new file mode 100644
index 0000000..e8e4480
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSClientConfig.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.clients.google.gcs;
+
+import static org.apache.asterix.external.util.google.gcs.GCSConstants.ENDPOINT_FIELD_NAME;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.asterix.common.config.CloudProperties;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.auth.oauth2.OAuth2Credentials;
+import com.google.cloud.NoCredentials;
+
+public class GCSClientConfig {
+    // The maximum number of files that can be deleted (GCS restriction): https://cloud.google.com/storage/quotas#json-requests
+    static final int DELETE_BATCH_SIZE = 100;
+    private final String region;
+    private final String endpoint;
+    private final String prefix;
+    private final boolean anonymousAuth;
+    private final long profilerLogInterval;
+
+    public GCSClientConfig(String region, String endpoint, String prefix, boolean anonymousAuth,
+            long profilerLogInterval) {
+        this.region = region;
+        this.endpoint = endpoint;
+        this.prefix = prefix;
+        this.anonymousAuth = anonymousAuth;
+        this.profilerLogInterval = profilerLogInterval;
+    }
+
+    public static GCSClientConfig of(CloudProperties cloudProperties) {
+        return new GCSClientConfig(cloudProperties.getStorageRegion(), cloudProperties.getStorageEndpoint(),
+                cloudProperties.getStoragePrefix(), cloudProperties.isStorageAnonymousAuth(),
+                cloudProperties.getProfilerLogInterval());
+    }
+
+    public static GCSClientConfig of(Map<String, String> configuration) {
+        String endPoint = configuration.getOrDefault(ENDPOINT_FIELD_NAME, "");
+        long profilerLogInterval = 0;
+
+        String region = "";
+        String prefix = "";
+        boolean anonymousAuth = false;
+
+        return new GCSClientConfig(region, endPoint, prefix, anonymousAuth, profilerLogInterval);
+    }
+
+    public String getRegion() {
+        return region;
+    }
+
+    public String getEndpoint() {
+        return endpoint;
+    }
+
+    public String getPrefix() {
+        return prefix;
+    }
+
+    public long getProfilerLogInterval() {
+        return profilerLogInterval;
+    }
+
+    public boolean isAnonymousAuth() {
+        return anonymousAuth;
+    }
+
+    public OAuth2Credentials createCredentialsProvider() throws HyracksDataException {
+        try {
+            return anonymousAuth ? NoCredentials.getInstance() : GoogleCredentials.getApplicationDefault();
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java
new file mode 100644
index 0000000..2b7303d
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.clients.google.gcs;
+
+import static org.apache.asterix.cloud.clients.google.gcs.GCSClientConfig.DELETE_BATCH_SIZE;
+
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.asterix.cloud.clients.ICloudBufferedWriter;
+import org.apache.asterix.cloud.clients.ICloudClient;
+import org.apache.asterix.cloud.clients.IParallelDownloader;
+import org.apache.asterix.cloud.clients.profiler.CountRequestProfiler;
+import org.apache.asterix.cloud.clients.profiler.IRequestProfiler;
+import org.apache.asterix.cloud.clients.profiler.NoOpRequestProfiler;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.util.IoUtil;
+import org.apache.hyracks.control.nc.io.IOManager;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.api.gax.paging.Page;
+import com.google.cloud.ReadChannel;
+import com.google.cloud.storage.Blob;
+import com.google.cloud.storage.BlobId;
+import com.google.cloud.storage.BlobInfo;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.Storage.BlobListOption;
+import com.google.cloud.storage.Storage.CopyRequest;
+import com.google.cloud.storage.StorageBatch;
+import com.google.cloud.storage.StorageException;
+import com.google.cloud.storage.StorageOptions;
+
+public class GCSCloudClient implements ICloudClient {
+
+    private final Storage gcsClient;
+    private final GCSClientConfig config;
+    private final IRequestProfiler profiler;
+
+    public GCSCloudClient(GCSClientConfig config, Storage gcsClient) {
+        this.gcsClient = gcsClient;
+        this.config = config;
+        long profilerInterval = config.getProfilerLogInterval();
+        if (profilerInterval > 0) {
+            profiler = new CountRequestProfiler(profilerInterval);
+        } else {
+            profiler = NoOpRequestProfiler.INSTANCE;
+        }
+    }
+
+    public GCSCloudClient(GCSClientConfig config) throws HyracksDataException {
+        this(config, buildClient(config));
+    }
+
+    private static Storage buildClient(GCSClientConfig config) throws HyracksDataException {
+        StorageOptions.Builder builder = StorageOptions.newBuilder().setCredentials(config.createCredentialsProvider());
+
+        if (config.getEndpoint() != null && !config.getEndpoint().isEmpty()) {
+            builder.setHost(config.getEndpoint());
+        }
+        return builder.build().getService();
+    }
+
+    @Override
+    public ICloudBufferedWriter createBufferedWriter(String bucket, String path) {
+        return new GCSBufferedWriter(bucket, path, gcsClient, profiler);
+    }
+
+    @Override
+    public Set<String> listObjects(String bucket, String path, FilenameFilter filter) {
+        profiler.objectsList();
+        Page<Blob> blobs =
+                gcsClient.list(bucket, BlobListOption.prefix(path), BlobListOption.fields(Storage.BlobField.SIZE));
+
+        Set<String> files = new HashSet<>();
+        for (Blob blob : blobs.iterateAll()) {
+            if (filter.accept(null, IoUtil.getFileNameFromPath(blob.getName()))) {
+                files.add(blob.getName());
+            }
+        }
+        return files;
+    }
+
+    @Override
+    public int read(String bucket, String path, long offset, ByteBuffer buffer) throws HyracksDataException {
+        profiler.objectGet();
+        BlobId blobId = BlobId.of(bucket, path);
+        long readTo = offset + buffer.remaining();
+        int totalRead = 0;
+        int read = 0;
+        try (ReadChannel from = gcsClient.reader(blobId).limit(readTo)) {
+            while (buffer.remaining() > 0) {
+                from.seek(offset + totalRead);
+                read = from.read(buffer);
+                totalRead += read;
+            }
+        } catch (IOException | StorageException ex) {
+            throw HyracksDataException.create(ex);
+        }
+
+        if (buffer.remaining() != 0) {
+            throw new IllegalStateException("Expected buffer remaining = 0, found: " + buffer.remaining());
+        }
+        return totalRead;
+    }
+
+    @Override
+    public byte[] readAllBytes(String bucket, String path) {
+        profiler.objectGet();
+        BlobId blobId = BlobId.of(bucket, path);
+        try {
+            return gcsClient.readAllBytes(blobId);
+        } catch (StorageException e) {
+            return null;
+        }
+    }
+
+    @Override
+    public InputStream getObjectStream(String bucket, String path) {
+        profiler.objectGet();
+        try (ReadChannel reader = gcsClient.reader(bucket, path)) {
+            return Channels.newInputStream(reader);
+        } catch (StorageException ex) {
+            throw new IllegalStateException(ex);
+        }
+    }
+
+    @Override
+    public void write(String bucket, String path, byte[] data) {
+        profiler.objectWrite();
+        BlobInfo blobInfo = BlobInfo.newBuilder(bucket, path).build();
+        gcsClient.create(blobInfo, data);
+    }
+
+    @Override
+    public void copy(String bucket, String srcPath, FileReference destPath) {
+        Page<Blob> blobs = gcsClient.list(bucket, BlobListOption.prefix(srcPath));
+        profiler.objectsList();
+        for (Blob blob : blobs.iterateAll()) {
+            profiler.objectCopy();
+            BlobId source = blob.getBlobId();
+            String targetName = destPath.getChildPath(IoUtil.getFileNameFromPath(source.getName()));
+            BlobId target = BlobId.of(bucket, targetName);
+            CopyRequest copyReq = CopyRequest.newBuilder().setSource(source).setTarget(target).build();
+            gcsClient.copy(copyReq);
+        }
+    }
+
+    @Override
+    public void deleteObjects(String bucket, Collection<String> paths) {
+        if (paths.isEmpty()) {
+            return;
+        }
+
+        StorageBatch batchRequest;
+        Iterator<String> pathIter = paths.iterator();
+        while (pathIter.hasNext()) {
+            batchRequest = gcsClient.batch();
+            for (int i = 0; pathIter.hasNext() && i < DELETE_BATCH_SIZE; i++) {
+                BlobId blobId = BlobId.of(bucket, pathIter.next());
+                batchRequest.delete(blobId);
+            }
+
+            batchRequest.submit();
+            profiler.objectDelete();
+        }
+    }
+
+    @Override
+    public long getObjectSize(String bucket, String path) {
+        profiler.objectGet();
+        Blob blob = gcsClient.get(bucket, path, Storage.BlobGetOption.fields(Storage.BlobField.SIZE));
+        if (blob == null) {
+            return 0;
+        }
+        return blob.getSize();
+    }
+
+    @Override
+    public boolean exists(String bucket, String path) {
+        profiler.objectGet();
+        Blob blob = gcsClient.get(bucket, path, Storage.BlobGetOption.fields(Storage.BlobField.values()));
+        return blob != null;
+    }
+
+    @Override
+    public boolean isEmptyPrefix(String bucket, String path) {
+        profiler.objectsList();
+        Page<Blob> blobs = gcsClient.list(bucket, BlobListOption.prefix(path));
+        return !blobs.hasNextPage();
+    }
+
+    @Override
+    public IParallelDownloader createParallelDownloader(String bucket, IOManager ioManager)
+            throws HyracksDataException {
+        return new GCSParallelDownloader(bucket, ioManager, config, profiler);
+    }
+
+    @Override
+    public JsonNode listAsJson(ObjectMapper objectMapper, String bucket) {
+        Page<Blob> blobs = gcsClient.list(bucket, BlobListOption.fields(Storage.BlobField.SIZE));
+        ArrayNode objectsInfo = objectMapper.createArrayNode();
+
+        List<Blob> objects = new ArrayList<>();
+        blobs.iterateAll().forEach(objects::add);
+        objects.sort((x, y) -> String.CASE_INSENSITIVE_ORDER.compare(x.getName(), y.getName()));
+        for (Blob blob : objects) {
+            ObjectNode objectInfo = objectsInfo.addObject();
+            objectInfo.put("path", blob.getName());
+            objectInfo.put("size", blob.getSize());
+        }
+        return objectsInfo;
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        try {
+            gcsClient.close();
+        } catch (Exception ex) {
+            throw HyracksDataException.create(ex);
+        }
+    }
+}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java
new file mode 100644
index 0000000..281a855
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.clients.google.gcs;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.asterix.cloud.clients.IParallelDownloader;
+import org.apache.asterix.cloud.clients.profiler.IRequestProfiler;
+import org.apache.commons.io.FileUtils;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.control.nc.io.IOManager;
+
+import com.google.api.gax.paging.Page;
+import com.google.cloud.storage.Blob;
+import com.google.cloud.storage.BlobId;
+import com.google.cloud.storage.BlobInfo;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.StorageOptions;
+import com.google.cloud.storage.transfermanager.DownloadJob;
+import com.google.cloud.storage.transfermanager.DownloadResult;
+import com.google.cloud.storage.transfermanager.ParallelDownloadConfig;
+import com.google.cloud.storage.transfermanager.TransferManager;
+import com.google.cloud.storage.transfermanager.TransferManagerConfig;
+import com.google.cloud.storage.transfermanager.TransferStatus;
+
+public class GCSParallelDownloader implements IParallelDownloader {
+
+    //    private static final Logger LOGGER = LogManager.getLogger();
+    private final String bucket;
+    private final IOManager ioManager;
+    private final Storage gcsClient;
+    private final TransferManager transferManager;
+    private final IRequestProfiler profiler;
+
+    public GCSParallelDownloader(String bucket, IOManager ioManager, GCSClientConfig config, IRequestProfiler profiler)
+            throws HyracksDataException {
+        this.bucket = bucket;
+        this.ioManager = ioManager;
+        this.profiler = profiler;
+        StorageOptions.Builder builder = StorageOptions.newBuilder();
+        if (config.getEndpoint() != null && !config.getEndpoint().isEmpty()) {
+            builder.setHost(config.getEndpoint());
+        }
+        builder.setCredentials(config.createCredentialsProvider());
+        this.gcsClient = builder.build().getService();
+        this.transferManager =
+                TransferManagerConfig.newBuilder().setStorageOptions(builder.build()).build().getService();
+    }
+
+    @Override
+    public void downloadFiles(Collection<FileReference> toDownload) throws HyracksDataException {
+        ParallelDownloadConfig.Builder config = ParallelDownloadConfig.newBuilder().setBucketName(bucket);
+        Map<Path, List<BlobInfo>> pathListMap = new HashMap<>();
+        try {
+            for (FileReference fileReference : toDownload) {
+                profiler.objectGet();
+                FileUtils.createParentDirectories(fileReference.getFile());
+                addToMap(pathListMap, fileReference.getDeviceHandle().getMount().toPath(),
+                        BlobInfo.newBuilder(BlobId.of(bucket, fileReference.getRelativePath())).build());
+            }
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+        List<DownloadJob> downloadJobs = new ArrayList<>(pathListMap.size());
+        for (Map.Entry<Path, List<BlobInfo>> entry : pathListMap.entrySet()) {
+            downloadJobs.add(transferManager.downloadBlobs(entry.getValue(),
+                    config.setDownloadDirectory(entry.getKey()).build()));
+        }
+        downloadJobs.forEach(DownloadJob::getDownloadResults);
+    }
+
+    @Override
+    public Collection<FileReference> downloadDirectories(Collection<FileReference> toDownload)
+            throws HyracksDataException {
+        Set<FileReference> failedFiles = new HashSet<>();
+        ParallelDownloadConfig.Builder config = ParallelDownloadConfig.newBuilder().setBucketName(bucket);
+
+        Map<Path, List<BlobInfo>> pathListMap = new HashMap<>();
+        for (FileReference fileReference : toDownload) {
+            profiler.objectMultipartDownload();
+            Page<Blob> blobs = gcsClient.list(bucket, Storage.BlobListOption.prefix(fileReference.getRelativePath()));
+            for (Blob blob : blobs.iterateAll()) {
+                addToMap(pathListMap, fileReference.getDeviceHandle().getMount().toPath(), blob.asBlobInfo());
+            }
+        }
+        List<DownloadJob> downloadJobs = new ArrayList<>(pathListMap.size());
+        for (Map.Entry<Path, List<BlobInfo>> entry : pathListMap.entrySet()) {
+            downloadJobs.add(transferManager.downloadBlobs(entry.getValue(),
+                    config.setDownloadDirectory(entry.getKey()).build()));
+        }
+        List<DownloadResult> results;
+        for (DownloadJob job : downloadJobs) {
+            results = job.getDownloadResults();
+            for (DownloadResult result : results) {
+                if (result.getStatus() != TransferStatus.SUCCESS) {
+                    FileReference failedFile = ioManager.resolve(result.getInput().getName());
+                    failedFiles.add(failedFile);
+                }
+            }
+        }
+        return failedFiles;
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        try {
+            transferManager.close();
+            gcsClient.close();
+        } catch (Exception e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    private <K, V> void addToMap(Map<K, List<V>> map, K key, V value) {
+        map.computeIfAbsent(key, k -> new ArrayList<>()).add(value);
+    }
+}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/IParallelCacher.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/IParallelCacher.java
index 6600356..e3d978d 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/IParallelCacher.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/IParallelCacher.java
@@ -73,5 +73,5 @@
     /**
      * Close cacher resources
      */
-    void close();
+    void close() throws HyracksDataException;
 }
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/ParallelCacher.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/ParallelCacher.java
index 3cc481e..95330d9 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/ParallelCacher.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/ParallelCacher.java
@@ -157,7 +157,7 @@
     }
 
     @Override
-    public void close() {
+    public void close() throws HyracksDataException {
         downloader.close();
         LOGGER.info("Parallel cacher was closed");
     }
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ReplaceableCloudAccessor.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ReplaceableCloudAccessor.java
index bfa353a..277d425 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ReplaceableCloudAccessor.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ReplaceableCloudAccessor.java
@@ -175,7 +175,7 @@
         return partitions.contains(StoragePathUtil.getPartitionNumFromRelativePath(path));
     }
 
-    private void replace() {
+    private void replace() throws HyracksDataException {
         cacher.close();
         replacer.replace();
     }
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriter.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriter.java
new file mode 100644
index 0000000..4090ad6
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriter.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.writer;
+
+import org.apache.asterix.cloud.clients.ICloudClient;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.runtime.writer.IExternalPrinter;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+
+import com.google.cloud.BaseServiceException;
+
+final class GCSExternalFileWriter extends AbstractCloudExternalFileWriter {
+    static int MAX_LENGTH_IN_BYTES = 1024;
+
+    GCSExternalFileWriter(IExternalPrinter printer, ICloudClient cloudClient, String bucket, boolean partitionedPath,
+            IWarningCollector warningCollector, SourceLocation pathSourceLocation) {
+        super(printer, cloudClient, bucket, partitionedPath, warningCollector, pathSourceLocation);
+    }
+
+    @Override
+    String getAdapterName() {
+        return ExternalDataConstants.KEY_ADAPTER_NAME_GCS;
+    }
+
+    @Override
+    int getPathMaxLengthInBytes() {
+        return MAX_LENGTH_IN_BYTES;
+    }
+
+    @Override
+    boolean isSdkException(Exception e) {
+        return e instanceof BaseServiceException;
+    }
+}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java
new file mode 100644
index 0000000..ca93ff6
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.writer;
+
+import static org.apache.asterix.cloud.writer.AbstractCloudExternalFileWriter.isExceedingMaxLength;
+import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.asterix.cloud.CloudResettableInputStream;
+import org.apache.asterix.cloud.WriterSingleBufferProvider;
+import org.apache.asterix.cloud.clients.ICloudBufferedWriter;
+import org.apache.asterix.cloud.clients.ICloudClient;
+import org.apache.asterix.cloud.clients.google.gcs.GCSClientConfig;
+import org.apache.asterix.cloud.clients.google.gcs.GCSCloudClient;
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.google.gcs.GCSUtils;
+import org.apache.asterix.runtime.writer.ExternalFileWriterConfiguration;
+import org.apache.asterix.runtime.writer.IExternalFileWriter;
+import org.apache.asterix.runtime.writer.IExternalFileWriterFactory;
+import org.apache.asterix.runtime.writer.IExternalFileWriterFactoryProvider;
+import org.apache.asterix.runtime.writer.IExternalPrinter;
+import org.apache.asterix.runtime.writer.IExternalPrinterFactory;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.api.util.ExceptionUtils;
+import org.apache.hyracks.data.std.primitive.LongPointable;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import com.google.cloud.BaseServiceException;
+import com.google.cloud.storage.StorageException;
+
+public final class GCSExternalFileWriterFactory implements IExternalFileWriterFactory {
+    private static final long serialVersionUID = 1L;
+    private static final Logger LOGGER = LogManager.getLogger();
+    static final char SEPARATOR = '/';
+    public static final IExternalFileWriterFactoryProvider PROVIDER = new IExternalFileWriterFactoryProvider() {
+        @Override
+        public IExternalFileWriterFactory create(ExternalFileWriterConfiguration configuration) {
+            return new GCSExternalFileWriterFactory(configuration);
+        }
+
+        @Override
+        public char getSeparator() {
+            return SEPARATOR;
+        }
+    };
+    private final Map<String, String> configuration;
+    private final SourceLocation pathSourceLocation;
+    private final String staticPath;
+    private transient GCSCloudClient cloudClient;
+
+    private GCSExternalFileWriterFactory(ExternalFileWriterConfiguration externalConfig) {
+        configuration = externalConfig.getConfiguration();
+        pathSourceLocation = externalConfig.getPathSourceLocation();
+        staticPath = externalConfig.getStaticPath();
+        cloudClient = null;
+    }
+
+    @Override
+    public IExternalFileWriter createWriter(IHyracksTaskContext context, IExternalPrinterFactory printerFactory)
+            throws HyracksDataException {
+        buildClient();
+        String bucket = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
+        IExternalPrinter printer = printerFactory.createPrinter();
+        IWarningCollector warningCollector = context.getWarningCollector();
+        return new GCSExternalFileWriter(printer, cloudClient, bucket, staticPath == null, warningCollector,
+                pathSourceLocation);
+    }
+
+    private void buildClient() throws HyracksDataException {
+        try {
+            synchronized (this) {
+                if (cloudClient == null) {
+                    GCSClientConfig config = GCSClientConfig.of(configuration);
+                    cloudClient = new GCSCloudClient(config, GCSUtils.buildClient(configuration));
+                }
+            }
+        } catch (CompilationException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    @Override
+    public char getSeparator() {
+        return SEPARATOR;
+    }
+
+    @Override
+    public void validate() throws AlgebricksException {
+        GCSClientConfig config = GCSClientConfig.of(configuration);
+        ICloudClient testClient = new GCSCloudClient(config, GCSUtils.buildClient(configuration));
+        String bucket = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
+
+        if (bucket == null || bucket.isEmpty()) {
+            throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED,
+                    ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
+        }
+
+        try {
+            doValidate(testClient, bucket);
+        } catch (IOException e) {
+            if (e.getCause() instanceof StorageException) {
+                throw CompilationException.create(ErrorCode.EXTERNAL_SOURCE_CONTAINER_NOT_FOUND, bucket);
+            } else {
+                throw CompilationException.create(ErrorCode.EXTERNAL_SINK_ERROR,
+                        ExceptionUtils.getMessageOrToString(e));
+            }
+        } catch (BaseServiceException e) {
+            throw CompilationException.create(ErrorCode.EXTERNAL_SINK_ERROR, e, getMessageOrToString(e));
+        }
+    }
+
+    private void doValidate(ICloudClient testClient, String bucket) throws IOException, AlgebricksException {
+        if (staticPath != null) {
+            if (isExceedingMaxLength(staticPath, GCSExternalFileWriter.MAX_LENGTH_IN_BYTES)) {
+                throw new CompilationException(ErrorCode.WRITE_PATH_LENGTH_EXCEEDS_MAX_LENGTH, pathSourceLocation,
+                        staticPath, GCSExternalFileWriter.MAX_LENGTH_IN_BYTES,
+                        ExternalDataConstants.KEY_ADAPTER_NAME_GCS);
+            }
+
+            if (!testClient.isEmptyPrefix(bucket, staticPath)) {
+                throw new CompilationException(ErrorCode.DIRECTORY_IS_NOT_EMPTY, pathSourceLocation, staticPath);
+            }
+        }
+
+        String validateWritePermissions = configuration
+                .getOrDefault(ExternalDataConstants.KEY_VALIDATE_WRITE_PERMISSION, Boolean.TRUE.toString());
+        if (!Boolean.parseBoolean(validateWritePermissions)) {
+            return;
+        }
+
+        Random random = new Random();
+        String pathPrefix = "testFile";
+        String path = pathPrefix + random.nextInt();
+        while (testClient.exists(bucket, path)) {
+            path = pathPrefix + random.nextInt();
+        }
+
+        long writeValue = random.nextLong();
+        byte[] data = new byte[Long.BYTES];
+        LongPointable.setLong(data, 0, writeValue);
+        ICloudBufferedWriter writer = testClient.createBufferedWriter(bucket, path);
+        CloudResettableInputStream stream = null;
+        boolean aborted = false;
+        try {
+            stream = new CloudResettableInputStream(writer, new WriterSingleBufferProvider());
+            stream.write(data, 0, data.length);
+        } catch (HyracksDataException e) {
+            stream.abort();
+            aborted = true;
+        } finally {
+            if (stream != null && !aborted) {
+                stream.finish();
+                stream.close();
+            }
+        }
+
+        try {
+            long readValue = LongPointable.getLong(testClient.readAllBytes(bucket, path), 0);
+            if (writeValue != readValue) {
+                LOGGER.warn(
+                        "The writer can write but the written values wasn't successfully read back (wrote: {}, read:{})",
+                        writeValue, readValue);
+            }
+        } finally {
+            testClient.deleteObjects(bucket, Collections.singleton(path));
+        }
+    }
+}
diff --git a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/gcs/LSMGCSTest.java b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/gcs/LSMGCSTest.java
new file mode 100644
index 0000000..87a3e29
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/gcs/LSMGCSTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.gcs;
+
+import org.apache.asterix.cloud.LSMTest;
+import org.apache.asterix.cloud.clients.google.gcs.GCSClientConfig;
+import org.apache.asterix.cloud.clients.google.gcs.GCSCloudClient;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import com.google.cloud.NoCredentials;
+import com.google.cloud.storage.Blob;
+import com.google.cloud.storage.BucketInfo;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.StorageClass;
+import com.google.cloud.storage.StorageOptions;
+
+public class LSMGCSTest extends LSMTest {
+    private static Storage client;
+    private static final int MOCK_SERVER_PORT = 4443;
+    private static final String MOCK_SERVER_HOSTNAME = "http://127.0.0.1:" + MOCK_SERVER_PORT;
+    private static final String MOCK_SERVER_REGION = "us-west2"; // does not matter the value
+    private static final String MOCK_SERVER_PROJECT_ID = "asterixdb-gcs-test-project-id";
+
+    @BeforeClass
+    public static void setup() throws Exception {
+        client = StorageOptions.newBuilder().setHost(MOCK_SERVER_HOSTNAME).setCredentials(NoCredentials.getInstance())
+                .setProjectId(MOCK_SERVER_PROJECT_ID).build().getService();
+
+        cleanup();
+        client.create(BucketInfo.newBuilder(PLAYGROUND_CONTAINER).setStorageClass(StorageClass.STANDARD)
+                .setLocation(MOCK_SERVER_REGION).build());
+        LOGGER.info("Client created successfully");
+        GCSClientConfig config = new GCSClientConfig(MOCK_SERVER_REGION, MOCK_SERVER_HOSTNAME, "", true, 0);
+        CLOUD_CLIENT = new GCSCloudClient(config);
+    }
+
+    private static void cleanup() {
+        try {
+            Iterable<Blob> blobs = client.list(PLAYGROUND_CONTAINER).iterateAll();
+            blobs.forEach(Blob::delete);
+            client.delete(PLAYGROUND_CONTAINER);
+        } catch (Exception ex) {
+            // ignore
+        }
+    }
+
+    @AfterClass
+    public static void tearDown() throws Exception {
+        // Shutting down GCS mock server
+        LOGGER.info("Shutting down GCS mock client");
+        if (client != null) {
+            client.close();
+        }
+        LOGGER.info("GCS mock client shut down successfully");
+    }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index 98ccbce..5dfa803 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -320,7 +320,8 @@
 
     static {
         WRITER_SUPPORTED_FORMATS = Set.of(FORMAT_JSON_LOWER_CASE);
-        WRITER_SUPPORTED_ADAPTERS = Set.of(ALIAS_LOCALFS_ADAPTER.toLowerCase(), KEY_ADAPTER_NAME_AWS_S3.toLowerCase());
+        WRITER_SUPPORTED_ADAPTERS = Set.of(ALIAS_LOCALFS_ADAPTER.toLowerCase(), KEY_ADAPTER_NAME_AWS_S3.toLowerCase(),
+                KEY_ADAPTER_NAME_GCS.toLowerCase());
         WRITER_SUPPORTED_COMPRESSION = Set.of(KEY_COMPRESSION_GZIP);
     }
 
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
index cf51200..8ca428b 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
@@ -22,6 +22,7 @@
 import java.util.Map;
 import java.util.zip.Deflater;
 
+import org.apache.asterix.cloud.writer.GCSExternalFileWriterFactory;
 import org.apache.asterix.cloud.writer.S3ExternalFileWriterFactory;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.external.util.ExternalDataConstants;
@@ -51,6 +52,7 @@
         CREATOR_MAP = new HashMap<>();
         addCreator(ExternalDataConstants.KEY_ADAPTER_NAME_LOCALFS, LocalFSExternalFileWriterFactory.PROVIDER);
         addCreator(ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3, S3ExternalFileWriterFactory.PROVIDER);
+        addCreator(ExternalDataConstants.KEY_ADAPTER_NAME_GCS, GCSExternalFileWriterFactory.PROVIDER);
     }
 
     public static IExternalFileWriterFactory createWriterFactory(ICcApplicationContext appCtx, IWriteDataSink sink,
diff --git a/asterixdb/pom.xml b/asterixdb/pom.xml
index 5bd1bdd..92a1285 100644
--- a/asterixdb/pom.xml
+++ b/asterixdb/pom.xml
@@ -84,6 +84,9 @@
     <azurite.npm.install.stage>none</azurite.npm.install.stage>
     <azurite.install.stage>none</azurite.install.stage>
     <azurite.stage>none</azurite.stage>
+    <gcs.download.stage>none</gcs.download.stage>
+    <gcs.install.stage>none</gcs.install.stage>
+    <gcs.stage>none</gcs.stage>
 
     <!-- Versions under dependencymanagement or used in many projects via properties -->
     <algebricks.version>0.3.10-SNAPSHOT</algebricks.version>
@@ -756,6 +759,22 @@
       </properties>
     </profile>
     <profile>
+      <id>gcs-tests</id>
+      <activation>
+        <os>
+          <family>unix</family>
+        </os>
+        <property>
+          <name>!skipTests</name>
+        </property>
+      </activation>
+      <properties>
+        <gcs.download.stage>process-classes</gcs.download.stage>
+        <gcs.install.stage>generate-test-resources</gcs.install.stage>
+        <gcs.stage>process-test-resources</gcs.stage>
+      </properties>
+    </profile>
+    <profile>
       <id>skip-npm</id>
       <activation>
         <property>