[ASTERIXDB-3616][EXT]: Add retry logic for external cloud writers
Ext-ref: MB-66723
Change-Id: Ie9fcea374a4a04286d82e9323c5d677f975707fc
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19825
Reviewed-by: Michael Blow <mblow@apache.org>
Reviewed-by: Hussain Towaileb <hussainht@gmail.com>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Michael Blow <mblow@apache.org>
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java
index d332944..0baebc7 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java
@@ -23,11 +23,13 @@
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Set;
+import java.util.function.Predicate;
import org.apache.asterix.cloud.IWriteBufferProvider;
import org.apache.asterix.cloud.clients.profiler.IRequestProfilerLimiter;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.cloud.io.request.ICloudRetryPredicate;
import org.apache.hyracks.control.nc.io.IOManager;
import com.fasterxml.jackson.databind.JsonNode;
@@ -168,4 +170,10 @@
* Performs any necessary closing and cleaning up
*/
void close() throws HyracksDataException;
+
+ Predicate<Exception> getObjectNotFoundExceptionPredicate();
+
+ default ICloudRetryPredicate getRetryUnlessNotFound() {
+ return ex -> Predicate.not(getObjectNotFoundExceptionPredicate()).test(ex);
+ }
}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/UnstableCloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/UnstableCloudClient.java
index ccd6da1..30148f4 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/UnstableCloudClient.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/UnstableCloudClient.java
@@ -26,6 +26,7 @@
import java.util.Random;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Predicate;
import org.apache.asterix.cloud.CloudResettableInputStream;
import org.apache.asterix.cloud.IWriteBufferProvider;
@@ -143,6 +144,11 @@
cloudClient.close();
}
+ @Override
+ public Predicate<Exception> getObjectNotFoundExceptionPredicate() {
+ return cloudClient.getObjectNotFoundExceptionPredicate();
+ }
+
private static void fail() throws HyracksDataException {
double prob = RANDOM.nextInt(100) / 100.0d;
if (prob < ERROR_RATE.get()) {
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
index 01a8b02..1195b8c 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
@@ -34,6 +34,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Set;
+import java.util.function.Predicate;
import org.apache.asterix.cloud.CloudResettableInputStream;
import org.apache.asterix.cloud.IWriteBufferProvider;
@@ -315,6 +316,11 @@
s3Client.close();
}
+ @Override
+ public Predicate<Exception> getObjectNotFoundExceptionPredicate() {
+ return ex -> ex instanceof NoSuchKeyException;
+ }
+
/**
* FOR TESTING ONLY
*/
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageCloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageCloudClient.java
index 84d68e3..91a4fba 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageCloudClient.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageCloudClient.java
@@ -34,6 +34,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Set;
+import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -390,6 +391,11 @@
// Hence this implementation is a no op.
}
+ @Override
+ public Predicate<Exception> getObjectNotFoundExceptionPredicate() {
+ return ex -> (ex instanceof BlobStorageException bse) && bse.getErrorCode().equals(BlobErrorCode.BLOB_NOT_FOUND);
+ }
+
private static BlobContainerClient buildClient(AzBlobStorageClientConfig config) {
BlobContainerClientBuilder blobContainerClientBuilder =
new BlobContainerClientBuilder().containerName(config.getBucket()).endpoint(getEndpoint(config));
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java
index 62c1835..bd0a044 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java
@@ -32,6 +32,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Set;
+import java.util.function.Predicate;
import org.apache.asterix.cloud.IWriteBufferProvider;
import org.apache.asterix.cloud.clients.CloudFile;
@@ -301,6 +302,11 @@
}
}
+ @Override
+ public Predicate<Exception> getObjectNotFoundExceptionPredicate() {
+ return ex -> (ex instanceof BaseServiceException bse)&& bse.getCode() == 404;
+ }
+
private static Storage buildClient(GCSClientConfig config) throws HyracksDataException {
StorageOptions.Builder builder = StorageOptions.newBuilder().setCredentials(config.createCredentialsProvider());
builder.setStorageRetryStrategy(DEFAULT_NO_RETRY_ON_THREAD_INTERRUPT_STRATEGY);
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriter.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriter.java
index d02b34e..f93ae92 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriter.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriter.java
@@ -19,6 +19,7 @@
package org.apache.asterix.cloud.writer;
import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
+import static org.apache.hyracks.cloud.util.CloudRetryableRequestUtil.runWithNoRetryOnInterruption;
import org.apache.asterix.cloud.CloudOutputStream;
import org.apache.asterix.cloud.IWriteBufferProvider;
@@ -33,6 +34,7 @@
import org.apache.hyracks.api.exceptions.IWarningCollector;
import org.apache.hyracks.api.exceptions.SourceLocation;
import org.apache.hyracks.api.exceptions.Warning;
+import org.apache.hyracks.cloud.io.request.ICloudRequest;
import org.apache.hyracks.data.std.api.IValueReference;
import com.google.common.base.Utf8;
@@ -60,7 +62,7 @@
@Override
public final void open() throws HyracksDataException {
- printer.open();
+ runWithRetryIfSdkException(printer::open);
}
@Override
@@ -69,7 +71,8 @@
return;
}
- if (partitionedPath && !cloudClient.isEmptyPrefix(bucket, directory)) {
+ boolean emptyPrefix = runWithNoRetryOnInterruption(() -> cloudClient.isEmptyPrefix(bucket, directory));
+ if (partitionedPath && !emptyPrefix) {
throw new RuntimeDataException(ErrorCode.DIRECTORY_IS_NOT_EMPTY, pathSourceLocation, directory);
}
}
@@ -91,7 +94,7 @@
@Override
public final void write(IValueReference value) throws HyracksDataException {
try {
- printer.print(value);
+ runWithRetryIfSdkException(() -> printer.print(value));
} catch (HyracksDataException e) {
throw e;
} catch (Exception e) {
@@ -106,9 +109,9 @@
public final void abort() throws HyracksDataException {
try {
if (cloudWriter != null) {
- cloudWriter.abort();
+ runWithRetryIfSdkException(cloudWriter::abort);
}
- printer.close();
+ runWithRetryIfSdkException(printer::close);
} catch (HyracksDataException e) {
throw e;
} catch (Exception e) {
@@ -122,7 +125,7 @@
@Override
public final void close() throws HyracksDataException {
try {
- printer.close();
+ runWithRetryIfSdkException(printer::close);
} catch (HyracksDataException e) {
throw e;
} catch (Exception e) {
@@ -139,6 +142,10 @@
abstract boolean isSdkException(Exception e);
+ private void runWithRetryIfSdkException(ICloudRequest request) throws HyracksDataException {
+ runWithNoRetryOnInterruption(request, this::isSdkException);
+ }
+
private boolean checkAndWarnExceedingMaxLength(String fullPath) {
boolean exceeding = isExceedingMaxLength(fullPath, getPathMaxLengthInBytes());
if (exceeding && warningCollector.shouldWarn()) {
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriterFactory.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriterFactory.java
index 1de64d0..2007e54 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriterFactory.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriterFactory.java
@@ -20,6 +20,7 @@
import static org.apache.asterix.cloud.writer.AbstractCloudExternalFileWriter.isExceedingMaxLength;
import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
+import static org.apache.hyracks.cloud.util.CloudRetryableRequestUtil.runWithNoRetryOnInterruption;
import java.io.IOException;
import java.util.Collections;
@@ -40,6 +41,7 @@
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.cloud.io.request.ICloudReturnableRequest;
import org.apache.hyracks.data.std.primitive.LongPointable;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -119,7 +121,7 @@
ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3);
}
- if (!testClient.isEmptyPrefix(bucket, staticPath)) {
+ if (!runWithNoRetryOnInterruption(() -> testClient.isEmptyPrefix(bucket, staticPath))) {
// Ensure that the static path is empty
throw new CompilationException(ErrorCode.DIRECTORY_IS_NOT_EMPTY, pathSourceLocation, staticPath);
}
@@ -135,10 +137,16 @@
Random random = new Random();
String pathPrefix = "testFile";
String path = pathPrefix + random.nextInt();
- while (testClient.exists(bucket, path)) {
+
+ String existsFinalPath = path;
+ ICloudReturnableRequest<Boolean> existsRequest = () -> testClient.exists(bucket, existsFinalPath);
+ while (runWithNoRetryOnInterruption(existsRequest, testClient.getRetryUnlessNotFound())) {
path = pathPrefix + random.nextInt();
+ String existsFinalPathUpdated = path;
+ existsRequest = () -> testClient.exists(bucket, existsFinalPathUpdated);
}
+ final String finalPath = path;
long writeValue = random.nextLong();
byte[] data = new byte[Long.BYTES];
LongPointable.setLong(data, 0, writeValue);
@@ -146,28 +154,29 @@
ICloudWriter writer = testClient.createWriter(bucket, path, bufferProvider);
boolean aborted = false;
try {
- writer.write(data, 0, data.length);
+ runWithNoRetryOnInterruption(() -> writer.write(data, 0, data.length));
} catch (HyracksDataException e) {
- writer.abort();
+ runWithNoRetryOnInterruption(writer::abort);
aborted = true;
throw e;
} finally {
if (writer != null && !aborted) {
- writer.finish();
+ runWithNoRetryOnInterruption(writer::finish);
}
}
try {
- long readValue = LongPointable.getLong(testClient.readAllBytes(bucket, path), 0);
+ byte[] bytes = runWithNoRetryOnInterruption(() -> testClient.readAllBytes(bucket, finalPath));
+ long readValue = LongPointable.getLong(bytes, 0);
if (writeValue != readValue) {
- // This should never happen unless S3 is messed up. But log for sanity check
+ // This should never happen unless cloud storage is messed up. But log for sanity check
LOGGER.warn(
"The writer can write but the written values wasn't successfully read back (wrote: {}, read:{})",
writeValue, readValue);
}
} finally {
// Delete the written file
- testClient.deleteObjects(bucket, Collections.singleton(path));
+ runWithNoRetryOnInterruption(() -> testClient.deleteObjects(bucket, Collections.singleton(finalPath)));
}
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/request/ICloudBeforeRetryRequest.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/request/ICloudBeforeRetryRequest.java
index 4dab80d..3ace317 100644
--- a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/request/ICloudBeforeRetryRequest.java
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/request/ICloudBeforeRetryRequest.java
@@ -18,6 +18,7 @@
*/
package org.apache.hyracks.cloud.io.request;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.cloud.util.CloudRetryableRequestUtil;
/**
@@ -30,5 +31,5 @@
/**
* Run pre-retry routine before reattempting {@link ICloudRequest} or {@link ICloudReturnableRequest}
*/
- void beforeRetry();
+ void beforeRetry() throws HyracksDataException;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/request/ICloudRetryPredicate.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/request/ICloudRetryPredicate.java
new file mode 100644
index 0000000..bcaae11
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/request/ICloudRetryPredicate.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.cloud.io.request;
+
+import java.util.function.Predicate;
+
+/**
+ * An interface for a condition that determines whether a cloud operation should stop or not.
+ * If met, the encountered exception is re-thrown as root cause in HyracksDataException
+ */
+@FunctionalInterface
+public interface ICloudRetryPredicate extends Predicate<Exception> {
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryableRequestUtil.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryableRequestUtil.java
index da9c8fb..e011900 100644
--- a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryableRequestUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryableRequestUtil.java
@@ -26,6 +26,7 @@
import org.apache.hyracks.api.util.ExceptionUtils;
import org.apache.hyracks.cloud.io.request.ICloudBeforeRetryRequest;
import org.apache.hyracks.cloud.io.request.ICloudRequest;
+import org.apache.hyracks.cloud.io.request.ICloudRetryPredicate;
import org.apache.hyracks.cloud.io.request.ICloudReturnableRequest;
import org.apache.hyracks.util.ExponentialRetryPolicy;
import org.apache.hyracks.util.IRetryPolicy;
@@ -55,7 +56,8 @@
private static final int NUMBER_OF_RETRIES = getNumberOfRetries();
private static final long MAX_DELAY_BETWEEN_RETRIES = getMaxDelayBetweenRetries();
- private static final ICloudBeforeRetryRequest NO_OP_RETRY = () -> {
+ private static final ICloudRetryPredicate RETRY_ALWAYS_PREDICATE = e -> true;
+ private static final ICloudBeforeRetryRequest NO_OP_BEFORE_RETRY = () -> {
};
private CloudRetryableRequestUtil() {
@@ -67,7 +69,7 @@
* @param request request to run
*/
public static void run(ICloudRequest request) throws HyracksDataException {
- run(request, NO_OP_RETRY);
+ run(request, NO_OP_BEFORE_RETRY);
}
/**
@@ -89,7 +91,7 @@
* @return a value of return type
*/
public static <T> T run(ICloudReturnableRequest<T> request) throws HyracksDataException {
- return run(request, NO_OP_RETRY);
+ return run(request, NO_OP_BEFORE_RETRY);
}
/**
@@ -107,7 +109,7 @@
try {
while (true) {
try {
- return doRun(request, retry);
+ return doRun(request, retry, RETRY_ALWAYS_PREDICATE);
} catch (Throwable e) {
// First, clear the interrupted flag
interrupted |= Thread.interrupted();
@@ -135,7 +137,7 @@
* @param request request to run
*/
public static void runWithNoRetryOnInterruption(ICloudRequest request) throws HyracksDataException {
- doRun(request, NO_OP_RETRY);
+ runWithNoRetryOnInterruption(request, RETRY_ALWAYS_PREDICATE);
}
/**
@@ -151,14 +153,41 @@
doRun(request, retry);
}
- private static <T> T doRun(ICloudReturnableRequest<T> request, ICloudBeforeRetryRequest retry)
+ public static void runWithNoRetryOnInterruption(ICloudRequest request, ICloudRetryPredicate shouldRetry)
throws HyracksDataException {
+ doRun(request, NO_OP_BEFORE_RETRY, shouldRetry);
+ }
+
+ public static <T> T runWithNoRetryOnInterruption(ICloudReturnableRequest<T> request) throws HyracksDataException {
+ return runWithNoRetryOnInterruption(request, NO_OP_BEFORE_RETRY, RETRY_ALWAYS_PREDICATE);
+ }
+
+ public static <T> T runWithNoRetryOnInterruption(ICloudReturnableRequest<T> request, ICloudBeforeRetryRequest retry)
+ throws HyracksDataException {
+ return runWithNoRetryOnInterruption(request, retry, RETRY_ALWAYS_PREDICATE);
+ }
+
+ public static <T> T runWithNoRetryOnInterruption(ICloudReturnableRequest<T> request,
+ ICloudRetryPredicate shouldRetry) throws HyracksDataException {
+ return runWithNoRetryOnInterruption(request, NO_OP_BEFORE_RETRY, shouldRetry);
+ }
+
+ public static <T> T runWithNoRetryOnInterruption(ICloudReturnableRequest<T> request, ICloudBeforeRetryRequest retry,
+ ICloudRetryPredicate shouldRetry) throws HyracksDataException {
+ return doRun(request, retry, shouldRetry);
+ }
+
+ private static <T> T doRun(ICloudReturnableRequest<T> request, ICloudBeforeRetryRequest retry,
+ ICloudRetryPredicate shouldRetry) throws HyracksDataException {
int attempt = 1;
IRetryPolicy retryPolicy = null;
while (true) {
try {
return request.call();
} catch (IOException | BaseServiceException | SdkException e) {
+ if (!shouldRetry.test(e)) {
+ throw HyracksDataException.create(e);
+ }
if (retryPolicy == null) {
retryPolicy = new ExponentialRetryPolicy(NUMBER_OF_RETRIES, MAX_DELAY_BETWEEN_RETRIES);
}
@@ -182,7 +211,12 @@
}
private static void doRun(ICloudRequest request, ICloudBeforeRetryRequest retry) throws HyracksDataException {
- doRun(asReturnableRequest(request), retry);
+ doRun(request, retry, RETRY_ALWAYS_PREDICATE);
+ }
+
+ private static void doRun(ICloudRequest request, ICloudBeforeRetryRequest retry, ICloudRetryPredicate shouldRetry)
+ throws HyracksDataException {
+ doRun(asReturnableRequest(request), retry, shouldRetry);
}
private static int getNumberOfRetries() {