[ASTERIXDB-3390][STO]: Support GCS for cloud deployment
Change-Id: I0b9cad99de2d32d1e672a3d396897807687685e8
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18253
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Wail Alkowaileet <wael.y.k@gmail.com>
Reviewed-by: Ian Maxon <imaxon@uci.edu>
diff --git a/asterixdb/asterix-app/pom.xml b/asterixdb/asterix-app/pom.xml
index dbca45f..27be120 100644
--- a/asterixdb/asterix-app/pom.xml
+++ b/asterixdb/asterix-app/pom.xml
@@ -107,6 +107,47 @@
</executions>
</plugin>
<plugin>
+ <groupId>com.googlecode.maven-download-plugin</groupId>
+ <artifactId>download-maven-plugin</artifactId>
+ <version>1.4.2</version>
+ <executions>
+ <execution>
+ <id>install-fake-gcs</id>
+ <phase>${gcs.download.stage}</phase>
+ <goals>
+ <goal>wget</goal>
+ </goals>
+ <configuration>
+ <url>https://github.com/fsouza/fake-gcs-server/releases/download/v1.48.0/fake-gcs-server_1.48.0_Linux_amd64.tar.gz</url>
+ <outputFileName>fake-gcs-server_1.48.0_Linux_amd64.tar.gz</outputFileName>
+ <outputDirectory>${project.build.directory}</outputDirectory>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-antrun-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>extract-gcs</id>
+ <phase>${gcs.install.stage}</phase>
+ <configuration>
+ <target>
+ <echo message="Extracting fake-gcs-server" />
+ <mkdir dir="${project.build.directory}/fake-gcs-server" />
+ <gunzip src="${project.build.directory}/fake-gcs-server_1.48.0_Linux_amd64.tar.gz" dest="${project.build.directory}/fake-gcs-server_1.48.0_Linux_amd64.tar" />
+ <untar src="${project.build.directory}/fake-gcs-server_1.48.0_Linux_amd64.tar" dest="${project.build.directory}/fake-gcs-server" />
+ <chmod file="${project.build.directory}/fake-gcs-server/fake-gcs-server" perm="ugo+rx" />
+ </target>
+ </configuration>
+ <goals>
+ <goal>run</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
<groupId>org.apache.asterix</groupId>
<artifactId>asterix-test-datagenerator-maven-plugin</artifactId>
<version>${asterix-test-datagenerator-maven-plugin.version}</version>
@@ -274,6 +315,30 @@
<outputFile>${project.build.directory}/azurite/logs/azurite.log</outputFile>
</configuration>
</execution>
+ <execution>
+ <id>fake-gcs-server</id>
+ <phase>${gcs.stage}</phase>
+ <goals>
+ <goal>exec</goal>
+ </goals>
+ <configuration>
+ <executable>${project.build.directory}/fake-gcs-server/fake-gcs-server</executable>
+ <workingDirectory>${project.build.directory}/fake-gcs-server</workingDirectory>
+ <arguments>
+ <argument>-port</argument>
+ <argument>4443</argument>
+ <argument>-scheme</argument>
+ <argument>http</argument>
+ <argument>-host</argument>
+ <argument>127.0.0.1</argument>
+ <argument>-log-level</argument>
+ <argument>error</argument>
+ <argument>-filesystem-root</argument>
+ <argument>${project.build.directory}/fake-gcs-server/storage</argument>
+ </arguments>
+ <async>true</async>
+ </configuration>
+ </execution>
</executions>
</plugin>
<plugin>
@@ -448,7 +513,7 @@
<profile>
<id>asterix-gerrit-asterix-app</id>
<properties>
- <test.excludes>**/CloudStorageTest.java,**/SqlppExecutionWithCancellationTest.java,**/DmlTest.java,**/RepeatedTest.java,**/SqlppExecutionTest.java,**/SqlppExecutionColumnTest.java,**/*StaticPartitioning*Test.java,**/*Ssl*Test.java,**/Podman*.java,**/*AnalyzedExecutionTest.java,**/SqlppProfiledExecutionTest.java,**/CloudPythonTest.java</test.excludes>
+ <test.excludes>**/CloudStorageTest.java,**/CloudStorageGCSTest.java,**/SqlppExecutionWithCancellationTest.java,**/DmlTest.java,**/RepeatedTest.java,**/SqlppExecutionTest.java,**/SqlppExecutionColumnTest.java,**/*StaticPartitioning*Test.java,**/*Ssl*Test.java,**/Podman*.java,**/*AnalyzedExecutionTest.java,**/SqlppProfiledExecutionTest.java,**/CloudPythonTest.java</test.excludes>
<itest.excludes>**/*.java</itest.excludes>
</properties>
<build>
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageGCSTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageGCSTest.java
new file mode 100644
index 0000000..89a4781
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageGCSTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.cloud_storage;
+
+import static org.apache.asterix.api.common.LocalCloudUtil.CLOUD_STORAGE_BUCKET;
+import static org.apache.asterix.api.common.LocalCloudUtil.MOCK_SERVER_REGION;
+
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.asterix.common.config.GlobalConfig;
+import org.apache.asterix.test.common.TestExecutor;
+import org.apache.asterix.test.runtime.LangExecutionUtil;
+import org.apache.asterix.testframework.context.TestCaseContext;
+import org.apache.asterix.testframework.xml.Description;
+import org.apache.asterix.testframework.xml.TestCase;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.junit.AfterClass;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.FixMethodOrder;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.MethodSorters;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import com.google.cloud.NoCredentials;
+import com.google.cloud.storage.Blob;
+import com.google.cloud.storage.BucketInfo;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.StorageClass;
+import com.google.cloud.storage.StorageOptions;
+
+/**
+ * Run tests in cloud deployment environment
+ */
+@RunWith(Parameterized.class)
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+public class CloudStorageGCSTest {
+
+ private static final Logger LOGGER = LogManager.getLogger();
+
+ private final TestCaseContext tcCtx;
+ private static final String SUITE_TESTS = "testsuite_cloud_storage.xml";
+ private static final String ONLY_TESTS = "testsuite_cloud_storage_only.xml";
+ private static final String CONFIG_FILE_NAME = "src/test/resources/cc-cloud-storage-gcs.conf";
+ private static final String DELTA_RESULT_PATH = "results_cloud";
+ private static final String EXCLUDED_TESTS = "MP";
+ public static final String MOCK_SERVER_HOSTNAME = "http://127.0.0.1:4443";
+ private static final String MOCK_SERVER_PROJECT_ID = "asterixdb-gcs-test-project-id";
+
+ public CloudStorageGCSTest(TestCaseContext tcCtx) {
+ this.tcCtx = tcCtx;
+ }
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ Storage storage = StorageOptions.newBuilder().setHost(MOCK_SERVER_HOSTNAME)
+ .setCredentials(NoCredentials.getInstance()).setProjectId(MOCK_SERVER_PROJECT_ID).build().getService();
+ cleanup(storage);
+ initialize(storage);
+ storage.close();
+ TestExecutor testExecutor = new TestExecutor(DELTA_RESULT_PATH);
+ testExecutor.executorId = "cloud";
+ testExecutor.stripSubstring = "//DB:";
+ LangExecutionUtil.setUp(CONFIG_FILE_NAME, testExecutor);
+ System.setProperty(GlobalConfig.CONFIG_FILE_PROPERTY, CONFIG_FILE_NAME);
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ LangExecutionUtil.tearDown();
+ }
+
+ @Parameters(name = "CloudStorageGCSTest {index}: {0}")
+ public static Collection<Object[]> tests() throws Exception {
+ return LangExecutionUtil.tests(ONLY_TESTS, SUITE_TESTS);
+ }
+
+ @Test
+ public void test() throws Exception {
+ List<TestCase.CompilationUnit> cu = tcCtx.getTestCase().getCompilationUnit();
+ Assume.assumeTrue(cu.size() > 1 || !EXCLUDED_TESTS.equals(getText(cu.get(0).getDescription())));
+ LangExecutionUtil.test(tcCtx);
+ }
+
+ private static String getText(Description description) {
+ return description == null ? "" : description.getValue();
+ }
+
+ private static void cleanup(Storage storage) {
+ try {
+ Iterable<Blob> blobs = storage.list(CLOUD_STORAGE_BUCKET).iterateAll();
+ blobs.forEach(Blob::delete);
+ storage.delete(CLOUD_STORAGE_BUCKET);
+ } catch (Exception ex) {
+ // ignore
+ }
+ }
+
+ private static void initialize(Storage storage) {
+ storage.create(BucketInfo.newBuilder(CLOUD_STORAGE_BUCKET).setStorageClass(StorageClass.STANDARD)
+ .setLocation(MOCK_SERVER_REGION).build());
+ }
+}
diff --git a/asterixdb/asterix-app/src/test/resources/cc-cloud-storage-gcs.conf b/asterixdb/asterix-app/src/test/resources/cc-cloud-storage-gcs.conf
new file mode 100644
index 0000000..d0ebd24
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/cc-cloud-storage-gcs.conf
@@ -0,0 +1,72 @@
+; Licensed to the Apache Software Foundation (ASF) under one
+; or more contributor license agreements. See the NOTICE file
+; distributed with this work for additional information
+; regarding copyright ownership. The ASF licenses this file
+; to you under the Apache License, Version 2.0 (the
+; "License"); you may not use this file except in compliance
+; with the License. You may obtain a copy of the License at
+;
+; http://www.apache.org/licenses/LICENSE-2.0
+;
+; Unless required by applicable law or agreed to in writing,
+; software distributed under the License is distributed on an
+; "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+; KIND, either express or implied. See the License for the
+; specific language governing permissions and limitations
+; under the License.
+
+[nc/asterix_nc1]
+txn.log.dir=target/tmp/asterix_nc1/txnlog
+core.dump.dir=target/tmp/asterix_nc1/coredump
+iodevices=target/tmp/asterix_nc1/iodevice1
+iodevices=../asterix-server/target/tmp/asterix_nc1/iodevice2
+nc.api.port=19004
+#jvm.args=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5006
+
+[nc/asterix_nc2]
+ncservice.port=9091
+txn.log.dir=target/tmp/asterix_nc2/txnlog
+core.dump.dir=target/tmp/asterix_nc2/coredump
+iodevices=target/tmp/asterix_nc2/iodevice1,../asterix-server/target/tmp/asterix_nc2/iodevice2
+nc.api.port=19005
+#jvm.args=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5007
+
+[nc]
+credential.file=src/test/resources/security/passwd
+python.cmd.autolocate=true
+python.env=FOO=BAR=BAZ,BAR=BAZ
+address=127.0.0.1
+command=asterixnc
+app.class=org.apache.asterix.hyracks.bootstrap.NCApplication
+jvm.args=-Xmx4096m -Dnode.Resolver="org.apache.asterix.external.util.IdentitiyResolverFactory"
+storage.buffercache.size=128MB
+storage.memorycomponent.globalbudget=512MB
+
+[cc]
+address = 127.0.0.1
+app.class=org.apache.asterix.hyracks.bootstrap.CCApplication
+heartbeat.period=2000
+heartbeat.max.misses=25
+credential.file=src/test/resources/security/passwd
+
+[common]
+log.dir = logs/
+log.level = INFO
+compiler.framesize=32KB
+compiler.sortmemory=320KB
+compiler.groupmemory=160KB
+compiler.joinmemory=256KB
+compiler.textsearchmemory=160KB
+compiler.windowmemory=192KB
+compiler.internal.sanitycheck=true
+messaging.frame.size=4096
+messaging.frame.count=512
+cloud.deployment=true
+storage.buffercache.pagesize=32KB
+storage.partitioning=static
+cloud.storage.scheme=gcs
+cloud.storage.bucket=cloud-storage-container
+cloud.storage.region=us-west2
+cloud.storage.endpoint=http://127.0.0.1:4443
+cloud.storage.anonymous.auth=true
+cloud.storage.cache.policy=lazy
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
index 9893336..54c32ef 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
@@ -79,7 +79,7 @@
<test-case FilePath="copy-to/negative">
<compilation-unit name="supported-adapter-format-compression">
<output-dir compare="Text">supported-adapter-format-compression</output-dir>
- <expected-error>ASX1188: Unsupported writing adapter 'AZUREBLOB'. Supported adapters: [localfs, s3]</expected-error>
+ <expected-error>ASX1188: Unsupported writing adapter 'AZUREBLOB'. Supported adapters: [gcs, localfs, s3]</expected-error>
<expected-error>ASX1189: Unsupported writing format 'csv'. Supported formats: [json]</expected-error>
<expected-error>ASX1096: Unknown compression scheme rar. Supported schemes are [gzip]</expected-error>
</compilation-unit>
diff --git a/asterixdb/asterix-cloud/pom.xml b/asterixdb/asterix-cloud/pom.xml
index 3843267..36cb039 100644
--- a/asterixdb/asterix-cloud/pom.xml
+++ b/asterixdb/asterix-cloud/pom.xml
@@ -61,6 +61,77 @@
</execution>
</executions>
</plugin>
+ <plugin>
+ <groupId>com.googlecode.maven-download-plugin</groupId>
+ <artifactId>download-maven-plugin</artifactId>
+ <version>1.4.2</version>
+ <executions>
+ <execution>
+ <id>install-fake-gcs</id>
+ <phase>${gcs.download.stage}</phase>
+ <goals>
+ <goal>wget</goal>
+ </goals>
+ <configuration>
+ <url>https://github.com/fsouza/fake-gcs-server/releases/download/v1.48.0/fake-gcs-server_1.48.0_Linux_amd64.tar.gz</url>
+ <outputFileName>fake-gcs-server_1.48.0_Linux_amd64.tar.gz</outputFileName>
+ <outputDirectory>${project.build.directory}</outputDirectory>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-antrun-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>extract-gcs</id>
+ <phase>${gcs.install.stage}</phase>
+ <configuration>
+ <target>
+ <echo message="Extracting fake-gcs-server" />
+ <mkdir dir="${project.build.directory}/fake-gcs-server" />
+ <gunzip src="${project.build.directory}/fake-gcs-server_1.48.0_Linux_amd64.tar.gz" dest="${project.build.directory}/fake-gcs-server_1.48.0_Linux_amd64.tar" />
+ <untar src="${project.build.directory}/fake-gcs-server_1.48.0_Linux_amd64.tar" dest="${project.build.directory}/fake-gcs-server" />
+ <chmod file="${project.build.directory}/fake-gcs-server/fake-gcs-server" perm="ugo+rx" />
+ </target>
+ </configuration>
+ <goals>
+ <goal>run</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>exec-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>fake-gcs-server</id>
+ <phase>${gcs.stage}</phase>
+ <goals>
+ <goal>exec</goal>
+ </goals>
+ <configuration>
+ <executable>${project.build.directory}/fake-gcs-server/fake-gcs-server</executable>
+ <workingDirectory>${project.build.directory}/fake-gcs-server</workingDirectory>
+ <arguments>
+ <argument>-port</argument>
+ <argument>4443</argument>
+ <argument>-scheme</argument>
+ <argument>http</argument>
+ <argument>-host</argument>
+ <argument>127.0.0.1</argument>
+ <argument>-log-level</argument>
+ <argument>error</argument>
+ <argument>-filesystem-root</argument>
+ <argument>${project.build.directory}/fake-gcs-server/storage</argument>
+ </arguments>
+ <async>true</async>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/IBulkOperationCallBack.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/IBulkOperationCallBack.java
index f0b336d..a4223eb 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/IBulkOperationCallBack.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/IBulkOperationCallBack.java
@@ -20,8 +20,9 @@
import java.util.List;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
public interface IBulkOperationCallBack {
- void call(List<FileReference> fileReferences);
+ void call(List<FileReference> fileReferences) throws HyracksDataException;
}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudClientProvider.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudClientProvider.java
index 21450c4..cc511c7 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudClientProvider.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudClientProvider.java
@@ -20,7 +20,10 @@
import org.apache.asterix.cloud.clients.aws.s3.S3ClientConfig;
import org.apache.asterix.cloud.clients.aws.s3.S3CloudClient;
+import org.apache.asterix.cloud.clients.google.gcs.GCSClientConfig;
+import org.apache.asterix.cloud.clients.google.gcs.GCSCloudClient;
import org.apache.asterix.common.config.CloudProperties;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
public class CloudClientProvider {
@@ -28,11 +31,14 @@
throw new AssertionError("do not instantiate");
}
- public static ICloudClient getClient(CloudProperties cloudProperties) {
+ public static ICloudClient getClient(CloudProperties cloudProperties) throws HyracksDataException {
String storageScheme = cloudProperties.getStorageScheme();
if ("s3".equalsIgnoreCase(storageScheme)) {
S3ClientConfig config = S3ClientConfig.of(cloudProperties);
return new S3CloudClient(config);
+ } else if ("gcs".equalsIgnoreCase(storageScheme)) {
+ GCSClientConfig config = GCSClientConfig.of(cloudProperties);
+ return new GCSCloudClient(config);
}
throw new IllegalStateException("unsupported cloud storage scheme: " + storageScheme);
}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudBufferedWriter.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudBufferedWriter.java
index 16e79f9..6b51964 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudBufferedWriter.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudBufferedWriter.java
@@ -31,7 +31,7 @@
* @param length length
* @return amount uploaded
*/
- int upload(InputStream stream, int length);
+ int upload(InputStream stream, int length) throws HyracksDataException;
/**
* Checks whether the writer has not written anything
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java
index 7941ada..2bd0802 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java
@@ -139,7 +139,7 @@
* @param ioManager local {@link IOManager}
* @return an instance of a new parallel downloader
*/
- IParallelDownloader createParallelDownloader(String bucket, IOManager ioManager);
+ IParallelDownloader createParallelDownloader(String bucket, IOManager ioManager) throws HyracksDataException;
/**
* Produces a {@link JsonNode} that contains information about the stored objects in the cloud
@@ -153,5 +153,5 @@
/**
* Performs any necessary closing and cleaning up
*/
- void close();
+ void close() throws HyracksDataException;
}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/IParallelDownloader.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/IParallelDownloader.java
index 32d0a74..184e015 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/IParallelDownloader.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/IParallelDownloader.java
@@ -43,5 +43,5 @@
/**
* Close the downloader and release all of its resources
*/
- void close();
+ void close() throws HyracksDataException;
}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSBufferedWriter.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSBufferedWriter.java
new file mode 100644
index 0000000..4f9d437
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSBufferedWriter.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.clients.google.gcs;
+
+import static org.apache.asterix.cloud.CloudResettableInputStream.MIN_BUFFER_SIZE;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.asterix.cloud.clients.ICloudBufferedWriter;
+import org.apache.asterix.cloud.clients.profiler.IRequestProfiler;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import com.google.cloud.WriteChannel;
+import com.google.cloud.storage.BlobId;
+import com.google.cloud.storage.BlobInfo;
+import com.google.cloud.storage.Storage;
+
+public class GCSBufferedWriter implements ICloudBufferedWriter {
+ private static final Logger LOGGER = LogManager.getLogger();
+ private final String bucket;
+ private final String path;
+ private final IRequestProfiler profiler;
+ private final Storage gcsClient;
+ private boolean uploadStarted = false;
+ private int partNumber;
+ private WriteChannel writer = null;
+
+ public GCSBufferedWriter(String bucket, String path, Storage gcsClient, IRequestProfiler profiler) {
+ this.bucket = bucket;
+ this.path = path;
+ this.profiler = profiler;
+ this.gcsClient = gcsClient;
+ }
+
+ @Override
+ public int upload(InputStream stream, int length) throws HyracksDataException {
+ profiler.objectMultipartUpload();
+ setUploadId();
+ try {
+ ByteBuffer buffer = ByteBuffer.wrap(stream.readNBytes(length));
+ while (buffer.hasRemaining()) {
+ writer.write(buffer);
+ }
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ return partNumber++;
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return !uploadStarted;
+ }
+
+ @Override
+ public void finish() throws HyracksDataException {
+ if (!uploadStarted) {
+ throw new IllegalStateException("Cannot finish without writing any bytes");
+ }
+ profiler.objectMultipartUpload();
+ try {
+ writer.close();
+ writer = null;
+ uploadStarted = false;
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ log("FINISHED");
+ }
+
+ @Override
+ public void abort() {
+ // https://github.com/googleapis/java-storage/issues/202
+ // Cannot abort. Upload Ids and data are discarded after a week
+ writer = null;
+ LOGGER.warn("Multipart upload for {} was aborted", path);
+ }
+
+ private void setUploadId() {
+ if (!uploadStarted) {
+ uploadStarted = true;
+ partNumber = 1;
+ writer = gcsClient.writer(BlobInfo.newBuilder(BlobId.of(bucket, path)).build());
+ writer.setChunkSize(MIN_BUFFER_SIZE);
+ log("STARTED");
+ }
+ }
+
+ private void log(String op) {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("{} multipart upload for {}", op, path);
+ }
+ }
+}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSClientConfig.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSClientConfig.java
new file mode 100644
index 0000000..e8e4480
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSClientConfig.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.clients.google.gcs;
+
+import static org.apache.asterix.external.util.google.gcs.GCSConstants.ENDPOINT_FIELD_NAME;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.asterix.common.config.CloudProperties;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.auth.oauth2.OAuth2Credentials;
+import com.google.cloud.NoCredentials;
+
+public class GCSClientConfig {
+ // The maximum number of files that can be deleted (GCS restriction): https://cloud.google.com/storage/quotas#json-requests
+ static final int DELETE_BATCH_SIZE = 100;
+ private final String region;
+ private final String endpoint;
+ private final String prefix;
+ private final boolean anonymousAuth;
+ private final long profilerLogInterval;
+
+ public GCSClientConfig(String region, String endpoint, String prefix, boolean anonymousAuth,
+ long profilerLogInterval) {
+ this.region = region;
+ this.endpoint = endpoint;
+ this.prefix = prefix;
+ this.anonymousAuth = anonymousAuth;
+ this.profilerLogInterval = profilerLogInterval;
+ }
+
+ public static GCSClientConfig of(CloudProperties cloudProperties) {
+ return new GCSClientConfig(cloudProperties.getStorageRegion(), cloudProperties.getStorageEndpoint(),
+ cloudProperties.getStoragePrefix(), cloudProperties.isStorageAnonymousAuth(),
+ cloudProperties.getProfilerLogInterval());
+ }
+
+ public static GCSClientConfig of(Map<String, String> configuration) {
+ String endPoint = configuration.getOrDefault(ENDPOINT_FIELD_NAME, "");
+ long profilerLogInterval = 0;
+
+ String region = "";
+ String prefix = "";
+ boolean anonymousAuth = false;
+
+ return new GCSClientConfig(region, endPoint, prefix, anonymousAuth, profilerLogInterval);
+ }
+
+ public String getRegion() {
+ return region;
+ }
+
+ public String getEndpoint() {
+ return endpoint;
+ }
+
+ public String getPrefix() {
+ return prefix;
+ }
+
+ public long getProfilerLogInterval() {
+ return profilerLogInterval;
+ }
+
+ public boolean isAnonymousAuth() {
+ return anonymousAuth;
+ }
+
+ public OAuth2Credentials createCredentialsProvider() throws HyracksDataException {
+ try {
+ return anonymousAuth ? NoCredentials.getInstance() : GoogleCredentials.getApplicationDefault();
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java
new file mode 100644
index 0000000..2b7303d
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.clients.google.gcs;
+
+import static org.apache.asterix.cloud.clients.google.gcs.GCSClientConfig.DELETE_BATCH_SIZE;
+
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.asterix.cloud.clients.ICloudBufferedWriter;
+import org.apache.asterix.cloud.clients.ICloudClient;
+import org.apache.asterix.cloud.clients.IParallelDownloader;
+import org.apache.asterix.cloud.clients.profiler.CountRequestProfiler;
+import org.apache.asterix.cloud.clients.profiler.IRequestProfiler;
+import org.apache.asterix.cloud.clients.profiler.NoOpRequestProfiler;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.util.IoUtil;
+import org.apache.hyracks.control.nc.io.IOManager;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.api.gax.paging.Page;
+import com.google.cloud.ReadChannel;
+import com.google.cloud.storage.Blob;
+import com.google.cloud.storage.BlobId;
+import com.google.cloud.storage.BlobInfo;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.Storage.BlobListOption;
+import com.google.cloud.storage.Storage.CopyRequest;
+import com.google.cloud.storage.StorageBatch;
+import com.google.cloud.storage.StorageException;
+import com.google.cloud.storage.StorageOptions;
+
+public class GCSCloudClient implements ICloudClient {
+
+ private final Storage gcsClient;
+ private final GCSClientConfig config;
+ private final IRequestProfiler profiler;
+
+ public GCSCloudClient(GCSClientConfig config, Storage gcsClient) {
+ this.gcsClient = gcsClient;
+ this.config = config;
+ long profilerInterval = config.getProfilerLogInterval();
+ if (profilerInterval > 0) {
+ profiler = new CountRequestProfiler(profilerInterval);
+ } else {
+ profiler = NoOpRequestProfiler.INSTANCE;
+ }
+ }
+
+ public GCSCloudClient(GCSClientConfig config) throws HyracksDataException {
+ this(config, buildClient(config));
+ }
+
+ private static Storage buildClient(GCSClientConfig config) throws HyracksDataException {
+ StorageOptions.Builder builder = StorageOptions.newBuilder().setCredentials(config.createCredentialsProvider());
+
+ if (config.getEndpoint() != null && !config.getEndpoint().isEmpty()) {
+ builder.setHost(config.getEndpoint());
+ }
+ return builder.build().getService();
+ }
+
+ @Override
+ public ICloudBufferedWriter createBufferedWriter(String bucket, String path) {
+ return new GCSBufferedWriter(bucket, path, gcsClient, profiler);
+ }
+
+ @Override
+ public Set<String> listObjects(String bucket, String path, FilenameFilter filter) {
+ profiler.objectsList();
+ Page<Blob> blobs =
+ gcsClient.list(bucket, BlobListOption.prefix(path), BlobListOption.fields(Storage.BlobField.SIZE));
+
+ Set<String> files = new HashSet<>();
+ for (Blob blob : blobs.iterateAll()) {
+ if (filter.accept(null, IoUtil.getFileNameFromPath(blob.getName()))) {
+ files.add(blob.getName());
+ }
+ }
+ return files;
+ }
+
+ @Override
+ public int read(String bucket, String path, long offset, ByteBuffer buffer) throws HyracksDataException {
+ profiler.objectGet();
+ BlobId blobId = BlobId.of(bucket, path);
+ long readTo = offset + buffer.remaining();
+ int totalRead = 0;
+ int read = 0;
+ try (ReadChannel from = gcsClient.reader(blobId).limit(readTo)) {
+ while (buffer.remaining() > 0) {
+ from.seek(offset + totalRead);
+ read = from.read(buffer);
+ totalRead += read;
+ }
+ } catch (IOException | StorageException ex) {
+ throw HyracksDataException.create(ex);
+ }
+
+ if (buffer.remaining() != 0) {
+ throw new IllegalStateException("Expected buffer remaining = 0, found: " + buffer.remaining());
+ }
+ return totalRead;
+ }
+
+ @Override
+ public byte[] readAllBytes(String bucket, String path) {
+ profiler.objectGet();
+ BlobId blobId = BlobId.of(bucket, path);
+ try {
+ return gcsClient.readAllBytes(blobId);
+ } catch (StorageException e) {
+ return null;
+ }
+ }
+
+ @Override
+ public InputStream getObjectStream(String bucket, String path) {
+ profiler.objectGet();
+ try (ReadChannel reader = gcsClient.reader(bucket, path)) {
+ return Channels.newInputStream(reader);
+ } catch (StorageException ex) {
+ throw new IllegalStateException(ex);
+ }
+ }
+
+ @Override
+ public void write(String bucket, String path, byte[] data) {
+ profiler.objectWrite();
+ BlobInfo blobInfo = BlobInfo.newBuilder(bucket, path).build();
+ gcsClient.create(blobInfo, data);
+ }
+
+ @Override
+ public void copy(String bucket, String srcPath, FileReference destPath) {
+ Page<Blob> blobs = gcsClient.list(bucket, BlobListOption.prefix(srcPath));
+ profiler.objectsList();
+ for (Blob blob : blobs.iterateAll()) {
+ profiler.objectCopy();
+ BlobId source = blob.getBlobId();
+ String targetName = destPath.getChildPath(IoUtil.getFileNameFromPath(source.getName()));
+ BlobId target = BlobId.of(bucket, targetName);
+ CopyRequest copyReq = CopyRequest.newBuilder().setSource(source).setTarget(target).build();
+ gcsClient.copy(copyReq);
+ }
+ }
+
+ @Override
+ public void deleteObjects(String bucket, Collection<String> paths) {
+ if (paths.isEmpty()) {
+ return;
+ }
+
+ StorageBatch batchRequest;
+ Iterator<String> pathIter = paths.iterator();
+ while (pathIter.hasNext()) {
+ batchRequest = gcsClient.batch();
+ for (int i = 0; pathIter.hasNext() && i < DELETE_BATCH_SIZE; i++) {
+ BlobId blobId = BlobId.of(bucket, pathIter.next());
+ batchRequest.delete(blobId);
+ }
+
+ batchRequest.submit();
+ profiler.objectDelete();
+ }
+ }
+
+ @Override
+ public long getObjectSize(String bucket, String path) {
+ profiler.objectGet();
+ Blob blob = gcsClient.get(bucket, path, Storage.BlobGetOption.fields(Storage.BlobField.SIZE));
+ if (blob == null) {
+ return 0;
+ }
+ return blob.getSize();
+ }
+
+ @Override
+ public boolean exists(String bucket, String path) {
+ profiler.objectGet();
+ Blob blob = gcsClient.get(bucket, path, Storage.BlobGetOption.fields(Storage.BlobField.values()));
+ return blob != null;
+ }
+
+ @Override
+ public boolean isEmptyPrefix(String bucket, String path) {
+ profiler.objectsList();
+ Page<Blob> blobs = gcsClient.list(bucket, BlobListOption.prefix(path));
+ return !blobs.hasNextPage();
+ }
+
+ @Override
+ public IParallelDownloader createParallelDownloader(String bucket, IOManager ioManager)
+ throws HyracksDataException {
+ return new GCSParallelDownloader(bucket, ioManager, config, profiler);
+ }
+
+ @Override
+ public JsonNode listAsJson(ObjectMapper objectMapper, String bucket) {
+ Page<Blob> blobs = gcsClient.list(bucket, BlobListOption.fields(Storage.BlobField.SIZE));
+ ArrayNode objectsInfo = objectMapper.createArrayNode();
+
+ List<Blob> objects = new ArrayList<>();
+ blobs.iterateAll().forEach(objects::add);
+ objects.sort((x, y) -> String.CASE_INSENSITIVE_ORDER.compare(x.getName(), y.getName()));
+ for (Blob blob : objects) {
+ ObjectNode objectInfo = objectsInfo.addObject();
+ objectInfo.put("path", blob.getName());
+ objectInfo.put("size", blob.getSize());
+ }
+ return objectsInfo;
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ try {
+ gcsClient.close();
+ } catch (Exception ex) {
+ throw HyracksDataException.create(ex);
+ }
+ }
+}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java
new file mode 100644
index 0000000..281a855
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.clients.google.gcs;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.asterix.cloud.clients.IParallelDownloader;
+import org.apache.asterix.cloud.clients.profiler.IRequestProfiler;
+import org.apache.commons.io.FileUtils;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.control.nc.io.IOManager;
+
+import com.google.api.gax.paging.Page;
+import com.google.cloud.storage.Blob;
+import com.google.cloud.storage.BlobId;
+import com.google.cloud.storage.BlobInfo;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.StorageOptions;
+import com.google.cloud.storage.transfermanager.DownloadJob;
+import com.google.cloud.storage.transfermanager.DownloadResult;
+import com.google.cloud.storage.transfermanager.ParallelDownloadConfig;
+import com.google.cloud.storage.transfermanager.TransferManager;
+import com.google.cloud.storage.transfermanager.TransferManagerConfig;
+import com.google.cloud.storage.transfermanager.TransferStatus;
+
+public class GCSParallelDownloader implements IParallelDownloader {
+
+ // private static final Logger LOGGER = LogManager.getLogger();
+ private final String bucket;
+ private final IOManager ioManager;
+ private final Storage gcsClient;
+ private final TransferManager transferManager;
+ private final IRequestProfiler profiler;
+
+ public GCSParallelDownloader(String bucket, IOManager ioManager, GCSClientConfig config, IRequestProfiler profiler)
+ throws HyracksDataException {
+ this.bucket = bucket;
+ this.ioManager = ioManager;
+ this.profiler = profiler;
+ StorageOptions.Builder builder = StorageOptions.newBuilder();
+ if (config.getEndpoint() != null && !config.getEndpoint().isEmpty()) {
+ builder.setHost(config.getEndpoint());
+ }
+ builder.setCredentials(config.createCredentialsProvider());
+ this.gcsClient = builder.build().getService();
+ this.transferManager =
+ TransferManagerConfig.newBuilder().setStorageOptions(builder.build()).build().getService();
+ }
+
+ @Override
+ public void downloadFiles(Collection<FileReference> toDownload) throws HyracksDataException {
+ ParallelDownloadConfig.Builder config = ParallelDownloadConfig.newBuilder().setBucketName(bucket);
+ Map<Path, List<BlobInfo>> pathListMap = new HashMap<>();
+ try {
+ for (FileReference fileReference : toDownload) {
+ profiler.objectGet();
+ FileUtils.createParentDirectories(fileReference.getFile());
+ addToMap(pathListMap, fileReference.getDeviceHandle().getMount().toPath(),
+ BlobInfo.newBuilder(BlobId.of(bucket, fileReference.getRelativePath())).build());
+ }
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ List<DownloadJob> downloadJobs = new ArrayList<>(pathListMap.size());
+ for (Map.Entry<Path, List<BlobInfo>> entry : pathListMap.entrySet()) {
+ downloadJobs.add(transferManager.downloadBlobs(entry.getValue(),
+ config.setDownloadDirectory(entry.getKey()).build()));
+ }
+ downloadJobs.forEach(DownloadJob::getDownloadResults);
+ }
+
+ @Override
+ public Collection<FileReference> downloadDirectories(Collection<FileReference> toDownload)
+ throws HyracksDataException {
+ Set<FileReference> failedFiles = new HashSet<>();
+ ParallelDownloadConfig.Builder config = ParallelDownloadConfig.newBuilder().setBucketName(bucket);
+
+ Map<Path, List<BlobInfo>> pathListMap = new HashMap<>();
+ for (FileReference fileReference : toDownload) {
+ profiler.objectMultipartDownload();
+ Page<Blob> blobs = gcsClient.list(bucket, Storage.BlobListOption.prefix(fileReference.getRelativePath()));
+ for (Blob blob : blobs.iterateAll()) {
+ addToMap(pathListMap, fileReference.getDeviceHandle().getMount().toPath(), blob.asBlobInfo());
+ }
+ }
+ List<DownloadJob> downloadJobs = new ArrayList<>(pathListMap.size());
+ for (Map.Entry<Path, List<BlobInfo>> entry : pathListMap.entrySet()) {
+ downloadJobs.add(transferManager.downloadBlobs(entry.getValue(),
+ config.setDownloadDirectory(entry.getKey()).build()));
+ }
+ List<DownloadResult> results;
+ for (DownloadJob job : downloadJobs) {
+ results = job.getDownloadResults();
+ for (DownloadResult result : results) {
+ if (result.getStatus() != TransferStatus.SUCCESS) {
+ FileReference failedFile = ioManager.resolve(result.getInput().getName());
+ failedFiles.add(failedFile);
+ }
+ }
+ }
+ return failedFiles;
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ try {
+ transferManager.close();
+ gcsClient.close();
+ } catch (Exception e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ private <K, V> void addToMap(Map<K, List<V>> map, K key, V value) {
+ map.computeIfAbsent(key, k -> new ArrayList<>()).add(value);
+ }
+}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/IParallelCacher.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/IParallelCacher.java
index 6600356..e3d978d 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/IParallelCacher.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/IParallelCacher.java
@@ -73,5 +73,5 @@
/**
* Close cacher resources
*/
- void close();
+ void close() throws HyracksDataException;
}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/ParallelCacher.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/ParallelCacher.java
index 3cc481e..95330d9 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/ParallelCacher.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/ParallelCacher.java
@@ -157,7 +157,7 @@
}
@Override
- public void close() {
+ public void close() throws HyracksDataException {
downloader.close();
LOGGER.info("Parallel cacher was closed");
}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ReplaceableCloudAccessor.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ReplaceableCloudAccessor.java
index bfa353a..277d425 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ReplaceableCloudAccessor.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ReplaceableCloudAccessor.java
@@ -175,7 +175,7 @@
return partitions.contains(StoragePathUtil.getPartitionNumFromRelativePath(path));
}
- private void replace() {
+ private void replace() throws HyracksDataException {
cacher.close();
replacer.replace();
}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriter.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriter.java
new file mode 100644
index 0000000..4090ad6
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriter.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.writer;
+
+import org.apache.asterix.cloud.clients.ICloudClient;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.runtime.writer.IExternalPrinter;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+
+import com.google.cloud.BaseServiceException;
+
+final class GCSExternalFileWriter extends AbstractCloudExternalFileWriter {
+ static int MAX_LENGTH_IN_BYTES = 1024;
+
+ GCSExternalFileWriter(IExternalPrinter printer, ICloudClient cloudClient, String bucket, boolean partitionedPath,
+ IWarningCollector warningCollector, SourceLocation pathSourceLocation) {
+ super(printer, cloudClient, bucket, partitionedPath, warningCollector, pathSourceLocation);
+ }
+
+ @Override
+ String getAdapterName() {
+ return ExternalDataConstants.KEY_ADAPTER_NAME_GCS;
+ }
+
+ @Override
+ int getPathMaxLengthInBytes() {
+ return MAX_LENGTH_IN_BYTES;
+ }
+
+ @Override
+ boolean isSdkException(Exception e) {
+ return e instanceof BaseServiceException;
+ }
+}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java
new file mode 100644
index 0000000..ca93ff6
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.writer;
+
+import static org.apache.asterix.cloud.writer.AbstractCloudExternalFileWriter.isExceedingMaxLength;
+import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.asterix.cloud.CloudResettableInputStream;
+import org.apache.asterix.cloud.WriterSingleBufferProvider;
+import org.apache.asterix.cloud.clients.ICloudBufferedWriter;
+import org.apache.asterix.cloud.clients.ICloudClient;
+import org.apache.asterix.cloud.clients.google.gcs.GCSClientConfig;
+import org.apache.asterix.cloud.clients.google.gcs.GCSCloudClient;
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.google.gcs.GCSUtils;
+import org.apache.asterix.runtime.writer.ExternalFileWriterConfiguration;
+import org.apache.asterix.runtime.writer.IExternalFileWriter;
+import org.apache.asterix.runtime.writer.IExternalFileWriterFactory;
+import org.apache.asterix.runtime.writer.IExternalFileWriterFactoryProvider;
+import org.apache.asterix.runtime.writer.IExternalPrinter;
+import org.apache.asterix.runtime.writer.IExternalPrinterFactory;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.api.util.ExceptionUtils;
+import org.apache.hyracks.data.std.primitive.LongPointable;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import com.google.cloud.BaseServiceException;
+import com.google.cloud.storage.StorageException;
+
+public final class GCSExternalFileWriterFactory implements IExternalFileWriterFactory {
+ private static final long serialVersionUID = 1L;
+ private static final Logger LOGGER = LogManager.getLogger();
+ static final char SEPARATOR = '/';
+ public static final IExternalFileWriterFactoryProvider PROVIDER = new IExternalFileWriterFactoryProvider() {
+ @Override
+ public IExternalFileWriterFactory create(ExternalFileWriterConfiguration configuration) {
+ return new GCSExternalFileWriterFactory(configuration);
+ }
+
+ @Override
+ public char getSeparator() {
+ return SEPARATOR;
+ }
+ };
+ private final Map<String, String> configuration;
+ private final SourceLocation pathSourceLocation;
+ private final String staticPath;
+ private transient GCSCloudClient cloudClient;
+
+ private GCSExternalFileWriterFactory(ExternalFileWriterConfiguration externalConfig) {
+ configuration = externalConfig.getConfiguration();
+ pathSourceLocation = externalConfig.getPathSourceLocation();
+ staticPath = externalConfig.getStaticPath();
+ cloudClient = null;
+ }
+
+ @Override
+ public IExternalFileWriter createWriter(IHyracksTaskContext context, IExternalPrinterFactory printerFactory)
+ throws HyracksDataException {
+ buildClient();
+ String bucket = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
+ IExternalPrinter printer = printerFactory.createPrinter();
+ IWarningCollector warningCollector = context.getWarningCollector();
+ return new GCSExternalFileWriter(printer, cloudClient, bucket, staticPath == null, warningCollector,
+ pathSourceLocation);
+ }
+
+ private void buildClient() throws HyracksDataException {
+ try {
+ synchronized (this) {
+ if (cloudClient == null) {
+ GCSClientConfig config = GCSClientConfig.of(configuration);
+ cloudClient = new GCSCloudClient(config, GCSUtils.buildClient(configuration));
+ }
+ }
+ } catch (CompilationException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ @Override
+ public char getSeparator() {
+ return SEPARATOR;
+ }
+
+ @Override
+ public void validate() throws AlgebricksException {
+ GCSClientConfig config = GCSClientConfig.of(configuration);
+ ICloudClient testClient = new GCSCloudClient(config, GCSUtils.buildClient(configuration));
+ String bucket = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
+
+ if (bucket == null || bucket.isEmpty()) {
+ throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED,
+ ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
+ }
+
+ try {
+ doValidate(testClient, bucket);
+ } catch (IOException e) {
+ if (e.getCause() instanceof StorageException) {
+ throw CompilationException.create(ErrorCode.EXTERNAL_SOURCE_CONTAINER_NOT_FOUND, bucket);
+ } else {
+ throw CompilationException.create(ErrorCode.EXTERNAL_SINK_ERROR,
+ ExceptionUtils.getMessageOrToString(e));
+ }
+ } catch (BaseServiceException e) {
+ throw CompilationException.create(ErrorCode.EXTERNAL_SINK_ERROR, e, getMessageOrToString(e));
+ }
+ }
+
+ private void doValidate(ICloudClient testClient, String bucket) throws IOException, AlgebricksException {
+ if (staticPath != null) {
+ if (isExceedingMaxLength(staticPath, GCSExternalFileWriter.MAX_LENGTH_IN_BYTES)) {
+ throw new CompilationException(ErrorCode.WRITE_PATH_LENGTH_EXCEEDS_MAX_LENGTH, pathSourceLocation,
+ staticPath, GCSExternalFileWriter.MAX_LENGTH_IN_BYTES,
+ ExternalDataConstants.KEY_ADAPTER_NAME_GCS);
+ }
+
+ if (!testClient.isEmptyPrefix(bucket, staticPath)) {
+ throw new CompilationException(ErrorCode.DIRECTORY_IS_NOT_EMPTY, pathSourceLocation, staticPath);
+ }
+ }
+
+ String validateWritePermissions = configuration
+ .getOrDefault(ExternalDataConstants.KEY_VALIDATE_WRITE_PERMISSION, Boolean.TRUE.toString());
+ if (!Boolean.parseBoolean(validateWritePermissions)) {
+ return;
+ }
+
+ Random random = new Random();
+ String pathPrefix = "testFile";
+ String path = pathPrefix + random.nextInt();
+ while (testClient.exists(bucket, path)) {
+ path = pathPrefix + random.nextInt();
+ }
+
+ long writeValue = random.nextLong();
+ byte[] data = new byte[Long.BYTES];
+ LongPointable.setLong(data, 0, writeValue);
+ ICloudBufferedWriter writer = testClient.createBufferedWriter(bucket, path);
+ CloudResettableInputStream stream = null;
+ boolean aborted = false;
+ try {
+ stream = new CloudResettableInputStream(writer, new WriterSingleBufferProvider());
+ stream.write(data, 0, data.length);
+ } catch (HyracksDataException e) {
+ stream.abort();
+ aborted = true;
+ } finally {
+ if (stream != null && !aborted) {
+ stream.finish();
+ stream.close();
+ }
+ }
+
+ try {
+ long readValue = LongPointable.getLong(testClient.readAllBytes(bucket, path), 0);
+ if (writeValue != readValue) {
+ LOGGER.warn(
+ "The writer can write but the written values wasn't successfully read back (wrote: {}, read:{})",
+ writeValue, readValue);
+ }
+ } finally {
+ testClient.deleteObjects(bucket, Collections.singleton(path));
+ }
+ }
+}
diff --git a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/gcs/LSMGCSTest.java b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/gcs/LSMGCSTest.java
new file mode 100644
index 0000000..87a3e29
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/gcs/LSMGCSTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.gcs;
+
+import org.apache.asterix.cloud.LSMTest;
+import org.apache.asterix.cloud.clients.google.gcs.GCSClientConfig;
+import org.apache.asterix.cloud.clients.google.gcs.GCSCloudClient;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import com.google.cloud.NoCredentials;
+import com.google.cloud.storage.Blob;
+import com.google.cloud.storage.BucketInfo;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.StorageClass;
+import com.google.cloud.storage.StorageOptions;
+
+public class LSMGCSTest extends LSMTest {
+ private static Storage client;
+ private static final int MOCK_SERVER_PORT = 4443;
+ private static final String MOCK_SERVER_HOSTNAME = "http://127.0.0.1:" + MOCK_SERVER_PORT;
+ private static final String MOCK_SERVER_REGION = "us-west2"; // does not matter the value
+ private static final String MOCK_SERVER_PROJECT_ID = "asterixdb-gcs-test-project-id";
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ client = StorageOptions.newBuilder().setHost(MOCK_SERVER_HOSTNAME).setCredentials(NoCredentials.getInstance())
+ .setProjectId(MOCK_SERVER_PROJECT_ID).build().getService();
+
+ cleanup();
+ client.create(BucketInfo.newBuilder(PLAYGROUND_CONTAINER).setStorageClass(StorageClass.STANDARD)
+ .setLocation(MOCK_SERVER_REGION).build());
+ LOGGER.info("Client created successfully");
+ GCSClientConfig config = new GCSClientConfig(MOCK_SERVER_REGION, MOCK_SERVER_HOSTNAME, "", true, 0);
+ CLOUD_CLIENT = new GCSCloudClient(config);
+ }
+
+ private static void cleanup() {
+ try {
+ Iterable<Blob> blobs = client.list(PLAYGROUND_CONTAINER).iterateAll();
+ blobs.forEach(Blob::delete);
+ client.delete(PLAYGROUND_CONTAINER);
+ } catch (Exception ex) {
+ // ignore
+ }
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ // Shutting down GCS mock server
+ LOGGER.info("Shutting down GCS mock client");
+ if (client != null) {
+ client.close();
+ }
+ LOGGER.info("GCS mock client shut down successfully");
+ }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index 98ccbce..5dfa803 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -320,7 +320,8 @@
static {
WRITER_SUPPORTED_FORMATS = Set.of(FORMAT_JSON_LOWER_CASE);
- WRITER_SUPPORTED_ADAPTERS = Set.of(ALIAS_LOCALFS_ADAPTER.toLowerCase(), KEY_ADAPTER_NAME_AWS_S3.toLowerCase());
+ WRITER_SUPPORTED_ADAPTERS = Set.of(ALIAS_LOCALFS_ADAPTER.toLowerCase(), KEY_ADAPTER_NAME_AWS_S3.toLowerCase(),
+ KEY_ADAPTER_NAME_GCS.toLowerCase());
WRITER_SUPPORTED_COMPRESSION = Set.of(KEY_COMPRESSION_GZIP);
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
index cf51200..8ca428b 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
@@ -22,6 +22,7 @@
import java.util.Map;
import java.util.zip.Deflater;
+import org.apache.asterix.cloud.writer.GCSExternalFileWriterFactory;
import org.apache.asterix.cloud.writer.S3ExternalFileWriterFactory;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.external.util.ExternalDataConstants;
@@ -51,6 +52,7 @@
CREATOR_MAP = new HashMap<>();
addCreator(ExternalDataConstants.KEY_ADAPTER_NAME_LOCALFS, LocalFSExternalFileWriterFactory.PROVIDER);
addCreator(ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3, S3ExternalFileWriterFactory.PROVIDER);
+ addCreator(ExternalDataConstants.KEY_ADAPTER_NAME_GCS, GCSExternalFileWriterFactory.PROVIDER);
}
public static IExternalFileWriterFactory createWriterFactory(ICcApplicationContext appCtx, IWriteDataSink sink,
diff --git a/asterixdb/pom.xml b/asterixdb/pom.xml
index 5bd1bdd..92a1285 100644
--- a/asterixdb/pom.xml
+++ b/asterixdb/pom.xml
@@ -84,6 +84,9 @@
<azurite.npm.install.stage>none</azurite.npm.install.stage>
<azurite.install.stage>none</azurite.install.stage>
<azurite.stage>none</azurite.stage>
+ <gcs.download.stage>none</gcs.download.stage>
+ <gcs.install.stage>none</gcs.install.stage>
+ <gcs.stage>none</gcs.stage>
<!-- Versions under dependencymanagement or used in many projects via properties -->
<algebricks.version>0.3.10-SNAPSHOT</algebricks.version>
@@ -756,6 +759,22 @@
</properties>
</profile>
<profile>
+ <id>gcs-tests</id>
+ <activation>
+ <os>
+ <family>unix</family>
+ </os>
+ <property>
+ <name>!skipTests</name>
+ </property>
+ </activation>
+ <properties>
+ <gcs.download.stage>process-classes</gcs.download.stage>
+ <gcs.install.stage>generate-test-resources</gcs.install.stage>
+ <gcs.stage>process-test-resources</gcs.stage>
+ </properties>
+ </profile>
+ <profile>
<id>skip-npm</id>
<activation>
<property>