[ASTERIXDB-3591][RT] Ensure close/fail of pipeline start uninterrupted
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- propagate interrupted exception while waiting in retry() of
ExponentialRetryPolicy.
Ext-ref: MB-66048
Change-Id: I49f859e1b8b72f7ae5e7bdbbb759389c6789fa0b
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19623
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-app/pom.xml b/asterixdb/asterix-app/pom.xml
index c801283..f74a995 100644
--- a/asterixdb/asterix-app/pom.xml
+++ b/asterixdb/asterix-app/pom.xml
@@ -490,7 +490,7 @@
<id>asterix-gerrit-asterix-app</id>
<properties>
<test.excludes>
- **/CloudStorageTest.java,**/CloudStorageGCSTest.java,**/CloudStorageUnstableTest.java,
+ **/CloudStorageTest.java,**/CloudStorageCancellationTest.java,**/CloudStorageGCSTest.java,**/CloudStorageUnstableTest.java,
**/SqlppExecutionWithCancellationTest.java,**/DmlTest.java,**/RepeatedTest.java,**/SqlppExecutionTest.java,
**/SqlppExecutionColumnTest.java,**/*StaticPartitioning*Test.java,**/*Ssl*Test.java,**/Podman*.java,
**/*AnalyzedExecutionTest.java,**/SqlppProfiledExecutionTest.java,**/CloudPythonTest.java,
@@ -611,6 +611,7 @@
<properties>
<test.includes>
**/CloudStorageTest.java,
+ **/CloudStorageCancellationTest.java,
**/SqlppSinglePointLookupExecutionTest.java, **/AwsS3*.java
</test.includes>
<failIfNoTests>false</failIfNoTests>
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageCancellationTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageCancellationTest.java
new file mode 100644
index 0000000..2e1f94f
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageCancellationTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.cloud_storage;
+
+import static org.apache.asterix.test.cloud_storage.CloudStorageTest.DELTA_RESULT_PATH;
+import static org.apache.asterix.test.cloud_storage.CloudStorageTest.EXCLUDED_TESTS;
+import static org.apache.asterix.test.cloud_storage.CloudStorageTest.ONLY_TESTS;
+import static org.apache.asterix.test.cloud_storage.CloudStorageTest.SUITE_TESTS;
+
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.asterix.api.common.LocalCloudUtilAdobeMock;
+import org.apache.asterix.test.common.CancellationTestExecutor;
+import org.apache.asterix.test.common.TestExecutor;
+import org.apache.asterix.test.runtime.LangExecutionUtil;
+import org.apache.asterix.testframework.context.TestCaseContext;
+import org.apache.asterix.testframework.xml.Description;
+import org.apache.asterix.testframework.xml.TestCase;
+import org.junit.AfterClass;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.FixMethodOrder;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.MethodSorters;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+/**
+ * Run tests in cloud deployment environment
+ */
+@RunWith(Parameterized.class)
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+public class CloudStorageCancellationTest {
+
+ private final TestCaseContext tcCtx;
+
+ public CloudStorageCancellationTest(TestCaseContext tcCtx) {
+ this.tcCtx = tcCtx;
+ }
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ TestExecutor testExecutor = new CancellationTestExecutor(DELTA_RESULT_PATH);
+ CloudStorageTest.setupEnv(testExecutor);
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ LangExecutionUtil.tearDown();
+ LocalCloudUtilAdobeMock.shutdownSilently();
+ }
+
+ @Parameters(name = "CloudStorageCancellationTest {index}: {0}")
+ public static Collection<Object[]> tests() throws Exception {
+ return LangExecutionUtil.tests(ONLY_TESTS, SUITE_TESTS);
+ }
+
+ @Test
+ public void test() throws Exception {
+ List<TestCase.CompilationUnit> cu = tcCtx.getTestCase().getCompilationUnit();
+ Assume.assumeTrue(cu.size() > 1 || !EXCLUDED_TESTS.equals(getText(cu.get(0).getDescription())));
+ LangExecutionUtil.test(tcCtx);
+ }
+
+ private static String getText(Description description) {
+ return description == null ? "" : description.getValue();
+ }
+}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageTest.java
index 498f060..3405838 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageTest.java
@@ -57,16 +57,16 @@
private static final Logger LOGGER = LogManager.getLogger();
private final TestCaseContext tcCtx;
- private static final String SUITE_TESTS = "testsuite_cloud_storage.xml";
- private static final String ONLY_TESTS = "testsuite_cloud_storage_only.xml";
- private static final String CONFIG_FILE_NAME = "src/test/resources/cc-cloud-storage.conf";
- private static final String DELTA_RESULT_PATH = "results_cloud";
- private static final String EXCLUDED_TESTS = "MP";
+ public static final String SUITE_TESTS = "testsuite_cloud_storage.xml";
+ public static final String ONLY_TESTS = "testsuite_cloud_storage_only.xml";
+ public static final String CONFIG_FILE_NAME = "src/test/resources/cc-cloud-storage.conf";
+ public static final String DELTA_RESULT_PATH = "results_cloud";
+ public static final String EXCLUDED_TESTS = "MP";
- private static final String PLAYGROUND_CONTAINER = "playground";
- private static final String MOCK_SERVER_REGION = "us-west-2";
- private static final int MOCK_SERVER_PORT = 8001;
- private static final String MOCK_SERVER_HOSTNAME = "http://127.0.0.1:" + MOCK_SERVER_PORT;
+ public static final String PLAYGROUND_CONTAINER = "playground";
+ public static final String MOCK_SERVER_REGION = "us-west-2";
+ public static final int MOCK_SERVER_PORT = 8001;
+ public static final String MOCK_SERVER_HOSTNAME = "http://127.0.0.1:" + MOCK_SERVER_PORT;
public CloudStorageTest(TestCaseContext tcCtx) {
this.tcCtx = tcCtx;
@@ -74,8 +74,12 @@
@BeforeClass
public static void setUp() throws Exception {
- LocalCloudUtilAdobeMock.startS3CloudEnvironment(true);
TestExecutor testExecutor = new TestExecutor(DELTA_RESULT_PATH);
+ setupEnv(testExecutor);
+ }
+
+ public static void setupEnv(TestExecutor testExecutor) throws Exception {
+ LocalCloudUtilAdobeMock.startS3CloudEnvironment(true);
testExecutor.executorId = "cloud";
testExecutor.stripSubstring = "//DB:";
LangExecutionUtil.setUp(CONFIG_FILE_NAME, testExecutor);
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java
index b4b7352..dac71ce 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java
@@ -49,6 +49,14 @@
private final ExecutorService executor = Executors.newSingleThreadExecutor();
+ public CancellationTestExecutor() {
+ super();
+ }
+
+ public CancellationTestExecutor(String deltaPath) {
+ super(deltaPath);
+ }
+
@Override
public InputStream executeQueryService(String str, TestCaseContext.OutputFormat fmt, URI uri,
List<TestCase.CompilationUnit.Parameter> params, boolean jsonEncoded, Charset responseCharset,
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/pom.xml b/hyracks-fullstack/algebricks/algebricks-runtime/pom.xml
index 7a272cc..8f95e34 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/pom.xml
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/pom.xml
@@ -49,6 +49,11 @@
</dependency>
<dependency>
<groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-util</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
<artifactId>algebricks-common</artifactId>
<version>${project.version}</version>
</dependency>
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
index f6d9cd1..560f817 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
@@ -38,6 +38,7 @@
import org.apache.hyracks.api.job.profiling.IOperatorStats;
import org.apache.hyracks.api.job.profiling.NoOpOperatorStats;
import org.apache.hyracks.api.job.profiling.OperatorStats;
+import org.apache.hyracks.api.util.InvokeUtil;
import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputIntrospectingOperatorNodePushable;
import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
@@ -170,25 +171,21 @@
PipelineAssembler pa =
new PipelineAssembler(pipeline, inputArity, outputArity, null, pipelineOutputRecordDescriptor);
startOfPipeline = pa.assemblePipeline(writer, ctx, new HashMap<>());
- HyracksDataException exception = null;
+ Exception exception = null;
try {
startOfPipeline.open();
} catch (Exception e) {
- startOfPipeline.fail();
- exception = HyracksDataException.create(e);
+ exception = e;
} finally {
- try {
- startOfPipeline.close();
- } catch (Exception e) {
- if (exception == null) {
- exception = HyracksDataException.create(e);
- } else {
- exception.addSuppressed(e);
- }
+ if (exception != null) {
+ exception = InvokeUtil.tryUninterruptibleWithCleanups(exception, startOfPipeline::fail,
+ startOfPipeline::close);
+ } else {
+ exception = InvokeUtil.runUninterruptible(exception, startOfPipeline::close);
}
}
if (exception != null) {
- throw exception;
+ throw HyracksDataException.create(exception);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
index ff0a3c5..9326427 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
@@ -285,6 +285,57 @@
}
}
+ @SuppressWarnings({ "squid:S1181", "squid:S1193", "ConstantConditions", "UnreachableCode" })
+ // catching Throwable, instanceofs, false-positive unreachable code
+ public static Exception tryUninterruptibleWithCleanups(Exception root, ThrowingAction action,
+ ThrowingAction... cleanups) {
+ try {
+ tryUninterruptibleWithCleanups(action, cleanups);
+ } catch (Exception e) {
+ root = ExceptionUtils.suppress(root, e);
+ }
+ return root;
+ }
+
+ @SuppressWarnings({ "squid:S1181", "squid:S1193", "ConstantConditions", "UnreachableCode" })
+ // catching Throwable, instanceofs, false-positive unreachable code
+ public static void tryUninterruptibleWithCleanups(ThrowingAction action, ThrowingAction... cleanups)
+ throws Exception {
+ Throwable savedT = null;
+ boolean suppressedInterrupted = false;
+ try {
+ runUninterruptible(action);
+ } catch (Throwable t) {
+ savedT = t;
+ } finally {
+ for (ThrowingAction cleanup : cleanups) {
+ try {
+ runUninterruptible(cleanup);
+ } catch (Throwable t) {
+ if (savedT != null) {
+ savedT.addSuppressed(t);
+ suppressedInterrupted = suppressedInterrupted || t instanceof InterruptedException;
+ } else {
+ savedT = t;
+ }
+ }
+ }
+ }
+ if (savedT == null) {
+ return;
+ }
+ if (suppressedInterrupted) {
+ Thread.currentThread().interrupt();
+ }
+ if (savedT instanceof Error) {
+ throw (Error) savedT;
+ } else if (savedT instanceof Exception) {
+ throw (Exception) savedT;
+ } else {
+ throw HyracksDataException.create(savedT);
+ }
+ }
+
// catching Throwable, instanceofs, false-positive unreachable code
public static void tryWithCleanups(ThrowingAction action, ThrowingConsumer<Throwable>... cleanups)
throws Exception {
@@ -426,6 +477,19 @@
* Runs the supplied action, after suspending any pending interruption. An error will be logged if
* the action is itself interrupted.
*/
+ public static Exception runUninterruptible(Exception root, ThrowingAction action) {
+ try {
+ runUninterruptible(action);
+ } catch (Exception e) {
+ root = ExceptionUtils.suppress(root, e);
+ }
+ return root;
+ }
+
+ /**
+ * Runs the supplied action, after suspending any pending interruption. An error will be logged if
+ * the action is itself interrupted.
+ */
public static void runUninterruptible(ThrowingAction action) throws Exception {
boolean interrupted = Thread.interrupted();
try {
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryableRequestUtil.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryableRequestUtil.java
index c36e4a7..da9c8fb 100644
--- a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryableRequestUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryableRequestUtil.java
@@ -166,8 +166,12 @@
LOGGER.warn("Lost suppressed interrupt during ICloudReturnableRequest", e);
Thread.currentThread().interrupt();
}
- if (Thread.currentThread().isInterrupted() || !retryPolicy.retry(e)) {
- throw HyracksDataException.create(e);
+ try {
+ if (Thread.currentThread().isInterrupted() || !retryPolicy.retry(e)) {
+ throw HyracksDataException.create(e);
+ }
+ } catch (InterruptedException interruptedEx) {
+ throw HyracksDataException.create(interruptedEx);
}
attempt++;
retry.beforeRetry();
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
index 0bf74ee..614a226 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
@@ -63,8 +63,8 @@
import org.apache.hyracks.api.partitions.PartitionId;
import org.apache.hyracks.api.resources.IDeallocatable;
import org.apache.hyracks.api.result.IResultPartitionManager;
-import org.apache.hyracks.api.util.CleanupUtils;
import org.apache.hyracks.api.util.ExceptionUtils;
+import org.apache.hyracks.api.util.InvokeUtil;
import org.apache.hyracks.api.util.JavaSerializationUtils;
import org.apache.hyracks.control.common.job.PartitionState;
import org.apache.hyracks.control.common.job.profiling.StatsCollector;
@@ -420,30 +420,29 @@
IFrameReader reader = collector.getReader();
reader.open();
try {
- try {
- writer.open();
- VSizeFrame frame = new VSizeFrame(this);
- while (reader.nextFrame(frame)) {
- if (aborted) {
- return;
- }
- ByteBuffer buffer = frame.getBuffer();
- writer.nextFrame(buffer);
- buffer.compact();
+ writer.open();
+ VSizeFrame frame = new VSizeFrame(this);
+ while (reader.nextFrame(frame)) {
+ if (aborted) {
+ return;
}
- } catch (Exception e) {
- originalEx = e;
- CleanupUtils.fail(writer, originalEx);
- } finally {
- originalEx = CleanupUtils.closeSilently(writer, originalEx);
+ ByteBuffer buffer = frame.getBuffer();
+ writer.nextFrame(buffer);
+ buffer.compact();
}
+ } catch (Exception e) {
+ originalEx = e;
} finally {
- originalEx = CleanupUtils.closeSilently(reader, originalEx);
+ if (originalEx != null) {
+ InvokeUtil.tryUninterruptibleWithCleanups(writer::fail, writer::close, reader::close);
+ } else {
+ InvokeUtil.tryUninterruptibleWithCleanups(writer::close, reader::close);
+ }
}
} catch (Exception e) {
originalEx = ExceptionUtils.suppress(originalEx, e);
} finally {
- originalEx = CleanupUtils.closeSilently(collector, originalEx);
+ InvokeUtil.runUninterruptible(collector::close);
}
} catch (Exception e) {
originalEx = ExceptionUtils.suppress(originalEx, e);
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
index 52bd07e..7ec4f86 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
@@ -127,12 +127,17 @@
if (operation.getStatus() == ILSMIOOperation.LSMIOOperationStatus.FAILURE) {
IRetryPolicy policy = new ExponentialRetryPolicy();
while (operation.getStatus() == ILSMIOOperation.LSMIOOperationStatus.FAILURE) {
- if (policy.retry(operation.getFailure())) {
- operation.setFailure(null);
- operation.setStatus(ILSMIOOperation.LSMIOOperationStatus.SUCCESS);
- lsmHarness.flush(operation);
- } else {
- break;
+ try {
+ if (policy.retry(operation.getFailure())) {
+ operation.setFailure(null);
+ operation.setStatus(ILSMIOOperation.LSMIOOperationStatus.SUCCESS);
+ lsmHarness.flush(operation);
+ } else {
+ break;
+ }
+ } catch (InterruptedException e) {
+ // in reality, this thread won't be interrupted
+ throw HyracksDataException.create(e);
}
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
index d8900ad..ee63547 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
@@ -98,12 +98,17 @@
if (operation.getStatus() == ILSMIOOperation.LSMIOOperationStatus.FAILURE) {
IRetryPolicy policy = new ExponentialRetryPolicy();
while (operation.getStatus() == ILSMIOOperation.LSMIOOperationStatus.FAILURE) {
- if (policy.retry(operation.getFailure())) {
- operation.setFailure(null);
- operation.setStatus(ILSMIOOperation.LSMIOOperationStatus.SUCCESS);
- lsmHarness.flush(operation);
- } else {
- break;
+ try {
+ if (policy.retry(operation.getFailure())) {
+ operation.setFailure(null);
+ operation.setStatus(ILSMIOOperation.LSMIOOperationStatus.SUCCESS);
+ lsmHarness.flush(operation);
+ } else {
+ break;
+ }
+ } catch (InterruptedException e) {
+ // in reality, this thread won't be interrupted
+ throw HyracksDataException.create(e);
}
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExponentialRetryPolicy.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExponentialRetryPolicy.java
index 080b9ea..72a3bb0 100644
--- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExponentialRetryPolicy.java
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExponentialRetryPolicy.java
@@ -84,17 +84,13 @@
}
@Override
- public boolean retry(Throwable failure) {
+ public boolean retry(Throwable failure) throws InterruptedException {
if (attempt < maxRetries) {
- try {
- long sleepTime = ThreadLocalRandom.current().nextLong(1 + delay);
- if (printDebugLines) {
- LOGGER.info("Retrying after {}ms, attempt: {}/{}", sleepTime, attempt + 1, maxRetries);
- }
- TimeUnit.MILLISECONDS.sleep(sleepTime);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
+ long sleepTime = ThreadLocalRandom.current().nextLong(1 + delay);
+ if (printDebugLines) {
+ LOGGER.info("Retrying after {}ms, attempt: {}/{}", sleepTime, attempt + 1, maxRetries);
}
+ TimeUnit.MILLISECONDS.sleep(sleepTime);
attempt++;
delay = delay > maxDelay / 2 ? maxDelay : delay * 2;
return true;
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/IRetryPolicy.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/IRetryPolicy.java
index 0d18a2b..2fd0191 100644
--- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/IRetryPolicy.java
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/IRetryPolicy.java
@@ -24,6 +24,7 @@
* @param failure
* the cause of the failure (this cannot be null)
* @return true if one more attempt should be done
+ * @throws InterruptedException if the retry policy can be interrupted
*/
- boolean retry(Throwable failure);
+ boolean retry(Throwable failure) throws InterruptedException;
}