[ASTERIXDB-3591][RT] Ensure close/fail of pipeline start uninterrupted

- user model changes: no
- storage format changes: no
- interface changes: no

Details:

- propagate interrupted exception while waiting in retry() of
ExponentialRetryPolicy.

Ext-ref: MB-66048

Change-Id: I49f859e1b8b72f7ae5e7bdbbb759389c6789fa0b
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19623
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-app/pom.xml b/asterixdb/asterix-app/pom.xml
index c801283..f74a995 100644
--- a/asterixdb/asterix-app/pom.xml
+++ b/asterixdb/asterix-app/pom.xml
@@ -490,7 +490,7 @@
       <id>asterix-gerrit-asterix-app</id>
       <properties>
         <test.excludes>
-          **/CloudStorageTest.java,**/CloudStorageGCSTest.java,**/CloudStorageUnstableTest.java,
+          **/CloudStorageTest.java,**/CloudStorageCancellationTest.java,**/CloudStorageGCSTest.java,**/CloudStorageUnstableTest.java,
           **/SqlppExecutionWithCancellationTest.java,**/DmlTest.java,**/RepeatedTest.java,**/SqlppExecutionTest.java,
           **/SqlppExecutionColumnTest.java,**/*StaticPartitioning*Test.java,**/*Ssl*Test.java,**/Podman*.java,
           **/*AnalyzedExecutionTest.java,**/SqlppProfiledExecutionTest.java,**/CloudPythonTest.java,
@@ -611,6 +611,7 @@
       <properties>
         <test.includes>
           **/CloudStorageTest.java,
+          **/CloudStorageCancellationTest.java,
           **/SqlppSinglePointLookupExecutionTest.java, **/AwsS3*.java
         </test.includes>
         <failIfNoTests>false</failIfNoTests>
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageCancellationTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageCancellationTest.java
new file mode 100644
index 0000000..2e1f94f
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageCancellationTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.cloud_storage;
+
+import static org.apache.asterix.test.cloud_storage.CloudStorageTest.DELTA_RESULT_PATH;
+import static org.apache.asterix.test.cloud_storage.CloudStorageTest.EXCLUDED_TESTS;
+import static org.apache.asterix.test.cloud_storage.CloudStorageTest.ONLY_TESTS;
+import static org.apache.asterix.test.cloud_storage.CloudStorageTest.SUITE_TESTS;
+
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.asterix.api.common.LocalCloudUtilAdobeMock;
+import org.apache.asterix.test.common.CancellationTestExecutor;
+import org.apache.asterix.test.common.TestExecutor;
+import org.apache.asterix.test.runtime.LangExecutionUtil;
+import org.apache.asterix.testframework.context.TestCaseContext;
+import org.apache.asterix.testframework.xml.Description;
+import org.apache.asterix.testframework.xml.TestCase;
+import org.junit.AfterClass;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.FixMethodOrder;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.MethodSorters;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+/**
+ * Run tests in cloud deployment environment
+ */
+@RunWith(Parameterized.class)
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+public class CloudStorageCancellationTest {
+
+    private final TestCaseContext tcCtx;
+
+    public CloudStorageCancellationTest(TestCaseContext tcCtx) {
+        this.tcCtx = tcCtx;
+    }
+
+    @BeforeClass
+    public static void setUp() throws Exception {
+        TestExecutor testExecutor = new CancellationTestExecutor(DELTA_RESULT_PATH);
+        CloudStorageTest.setupEnv(testExecutor);
+    }
+
+    @AfterClass
+    public static void tearDown() throws Exception {
+        LangExecutionUtil.tearDown();
+        LocalCloudUtilAdobeMock.shutdownSilently();
+    }
+
+    @Parameters(name = "CloudStorageCancellationTest {index}: {0}")
+    public static Collection<Object[]> tests() throws Exception {
+        return LangExecutionUtil.tests(ONLY_TESTS, SUITE_TESTS);
+    }
+
+    @Test
+    public void test() throws Exception {
+        List<TestCase.CompilationUnit> cu = tcCtx.getTestCase().getCompilationUnit();
+        Assume.assumeTrue(cu.size() > 1 || !EXCLUDED_TESTS.equals(getText(cu.get(0).getDescription())));
+        LangExecutionUtil.test(tcCtx);
+    }
+
+    private static String getText(Description description) {
+        return description == null ? "" : description.getValue();
+    }
+}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageTest.java
index 498f060..3405838 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageTest.java
@@ -57,16 +57,16 @@
     private static final Logger LOGGER = LogManager.getLogger();
 
     private final TestCaseContext tcCtx;
-    private static final String SUITE_TESTS = "testsuite_cloud_storage.xml";
-    private static final String ONLY_TESTS = "testsuite_cloud_storage_only.xml";
-    private static final String CONFIG_FILE_NAME = "src/test/resources/cc-cloud-storage.conf";
-    private static final String DELTA_RESULT_PATH = "results_cloud";
-    private static final String EXCLUDED_TESTS = "MP";
+    public static final String SUITE_TESTS = "testsuite_cloud_storage.xml";
+    public static final String ONLY_TESTS = "testsuite_cloud_storage_only.xml";
+    public static final String CONFIG_FILE_NAME = "src/test/resources/cc-cloud-storage.conf";
+    public static final String DELTA_RESULT_PATH = "results_cloud";
+    public static final String EXCLUDED_TESTS = "MP";
 
-    private static final String PLAYGROUND_CONTAINER = "playground";
-    private static final String MOCK_SERVER_REGION = "us-west-2";
-    private static final int MOCK_SERVER_PORT = 8001;
-    private static final String MOCK_SERVER_HOSTNAME = "http://127.0.0.1:" + MOCK_SERVER_PORT;
+    public static final String PLAYGROUND_CONTAINER = "playground";
+    public static final String MOCK_SERVER_REGION = "us-west-2";
+    public static final int MOCK_SERVER_PORT = 8001;
+    public static final String MOCK_SERVER_HOSTNAME = "http://127.0.0.1:" + MOCK_SERVER_PORT;
 
     public CloudStorageTest(TestCaseContext tcCtx) {
         this.tcCtx = tcCtx;
@@ -74,8 +74,12 @@
 
     @BeforeClass
     public static void setUp() throws Exception {
-        LocalCloudUtilAdobeMock.startS3CloudEnvironment(true);
         TestExecutor testExecutor = new TestExecutor(DELTA_RESULT_PATH);
+        setupEnv(testExecutor);
+    }
+
+    public static void setupEnv(TestExecutor testExecutor) throws Exception {
+        LocalCloudUtilAdobeMock.startS3CloudEnvironment(true);
         testExecutor.executorId = "cloud";
         testExecutor.stripSubstring = "//DB:";
         LangExecutionUtil.setUp(CONFIG_FILE_NAME, testExecutor);
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java
index b4b7352..dac71ce 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java
@@ -49,6 +49,14 @@
 
     private final ExecutorService executor = Executors.newSingleThreadExecutor();
 
+    public CancellationTestExecutor() {
+        super();
+    }
+
+    public CancellationTestExecutor(String deltaPath) {
+        super(deltaPath);
+    }
+
     @Override
     public InputStream executeQueryService(String str, TestCaseContext.OutputFormat fmt, URI uri,
             List<TestCase.CompilationUnit.Parameter> params, boolean jsonEncoded, Charset responseCharset,
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/pom.xml b/hyracks-fullstack/algebricks/algebricks-runtime/pom.xml
index 7a272cc..8f95e34 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/pom.xml
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/pom.xml
@@ -49,6 +49,11 @@
     </dependency>
     <dependency>
       <groupId>org.apache.hyracks</groupId>
+      <artifactId>hyracks-util</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hyracks</groupId>
       <artifactId>algebricks-common</artifactId>
       <version>${project.version}</version>
     </dependency>
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
index f6d9cd1..560f817 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
@@ -38,6 +38,7 @@
 import org.apache.hyracks.api.job.profiling.IOperatorStats;
 import org.apache.hyracks.api.job.profiling.NoOpOperatorStats;
 import org.apache.hyracks.api.job.profiling.OperatorStats;
+import org.apache.hyracks.api.util.InvokeUtil;
 import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputIntrospectingOperatorNodePushable;
 import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
@@ -170,25 +171,21 @@
             PipelineAssembler pa =
                     new PipelineAssembler(pipeline, inputArity, outputArity, null, pipelineOutputRecordDescriptor);
             startOfPipeline = pa.assemblePipeline(writer, ctx, new HashMap<>());
-            HyracksDataException exception = null;
+            Exception exception = null;
             try {
                 startOfPipeline.open();
             } catch (Exception e) {
-                startOfPipeline.fail();
-                exception = HyracksDataException.create(e);
+                exception = e;
             } finally {
-                try {
-                    startOfPipeline.close();
-                } catch (Exception e) {
-                    if (exception == null) {
-                        exception = HyracksDataException.create(e);
-                    } else {
-                        exception.addSuppressed(e);
-                    }
+                if (exception != null) {
+                    exception = InvokeUtil.tryUninterruptibleWithCleanups(exception, startOfPipeline::fail,
+                            startOfPipeline::close);
+                } else {
+                    exception = InvokeUtil.runUninterruptible(exception, startOfPipeline::close);
                 }
             }
             if (exception != null) {
-                throw exception;
+                throw HyracksDataException.create(exception);
             }
         }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
index ff0a3c5..9326427 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
@@ -285,6 +285,57 @@
         }
     }
 
+    @SuppressWarnings({ "squid:S1181", "squid:S1193", "ConstantConditions", "UnreachableCode" })
+    // catching Throwable, instanceofs, false-positive unreachable code
+    public static Exception tryUninterruptibleWithCleanups(Exception root, ThrowingAction action,
+            ThrowingAction... cleanups) {
+        try {
+            tryUninterruptibleWithCleanups(action, cleanups);
+        } catch (Exception e) {
+            root = ExceptionUtils.suppress(root, e);
+        }
+        return root;
+    }
+
+    @SuppressWarnings({ "squid:S1181", "squid:S1193", "ConstantConditions", "UnreachableCode" })
+    // catching Throwable, instanceofs, false-positive unreachable code
+    public static void tryUninterruptibleWithCleanups(ThrowingAction action, ThrowingAction... cleanups)
+            throws Exception {
+        Throwable savedT = null;
+        boolean suppressedInterrupted = false;
+        try {
+            runUninterruptible(action);
+        } catch (Throwable t) {
+            savedT = t;
+        } finally {
+            for (ThrowingAction cleanup : cleanups) {
+                try {
+                    runUninterruptible(cleanup);
+                } catch (Throwable t) {
+                    if (savedT != null) {
+                        savedT.addSuppressed(t);
+                        suppressedInterrupted = suppressedInterrupted || t instanceof InterruptedException;
+                    } else {
+                        savedT = t;
+                    }
+                }
+            }
+        }
+        if (savedT == null) {
+            return;
+        }
+        if (suppressedInterrupted) {
+            Thread.currentThread().interrupt();
+        }
+        if (savedT instanceof Error) {
+            throw (Error) savedT;
+        } else if (savedT instanceof Exception) {
+            throw (Exception) savedT;
+        } else {
+            throw HyracksDataException.create(savedT);
+        }
+    }
+
     // catching Throwable, instanceofs, false-positive unreachable code
     public static void tryWithCleanups(ThrowingAction action, ThrowingConsumer<Throwable>... cleanups)
             throws Exception {
@@ -426,6 +477,19 @@
      * Runs the supplied action, after suspending any pending interruption. An error will be logged if
      * the action is itself interrupted.
      */
+    public static Exception runUninterruptible(Exception root, ThrowingAction action) {
+        try {
+            runUninterruptible(action);
+        } catch (Exception e) {
+            root = ExceptionUtils.suppress(root, e);
+        }
+        return root;
+    }
+
+    /**
+     * Runs the supplied action, after suspending any pending interruption. An error will be logged if
+     * the action is itself interrupted.
+     */
     public static void runUninterruptible(ThrowingAction action) throws Exception {
         boolean interrupted = Thread.interrupted();
         try {
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryableRequestUtil.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryableRequestUtil.java
index c36e4a7..da9c8fb 100644
--- a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryableRequestUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryableRequestUtil.java
@@ -166,8 +166,12 @@
                     LOGGER.warn("Lost suppressed interrupt during ICloudReturnableRequest", e);
                     Thread.currentThread().interrupt();
                 }
-                if (Thread.currentThread().isInterrupted() || !retryPolicy.retry(e)) {
-                    throw HyracksDataException.create(e);
+                try {
+                    if (Thread.currentThread().isInterrupted() || !retryPolicy.retry(e)) {
+                        throw HyracksDataException.create(e);
+                    }
+                } catch (InterruptedException interruptedEx) {
+                    throw HyracksDataException.create(interruptedEx);
                 }
                 attempt++;
                 retry.beforeRetry();
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
index 0bf74ee..614a226 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
@@ -63,8 +63,8 @@
 import org.apache.hyracks.api.partitions.PartitionId;
 import org.apache.hyracks.api.resources.IDeallocatable;
 import org.apache.hyracks.api.result.IResultPartitionManager;
-import org.apache.hyracks.api.util.CleanupUtils;
 import org.apache.hyracks.api.util.ExceptionUtils;
+import org.apache.hyracks.api.util.InvokeUtil;
 import org.apache.hyracks.api.util.JavaSerializationUtils;
 import org.apache.hyracks.control.common.job.PartitionState;
 import org.apache.hyracks.control.common.job.profiling.StatsCollector;
@@ -420,30 +420,29 @@
                 IFrameReader reader = collector.getReader();
                 reader.open();
                 try {
-                    try {
-                        writer.open();
-                        VSizeFrame frame = new VSizeFrame(this);
-                        while (reader.nextFrame(frame)) {
-                            if (aborted) {
-                                return;
-                            }
-                            ByteBuffer buffer = frame.getBuffer();
-                            writer.nextFrame(buffer);
-                            buffer.compact();
+                    writer.open();
+                    VSizeFrame frame = new VSizeFrame(this);
+                    while (reader.nextFrame(frame)) {
+                        if (aborted) {
+                            return;
                         }
-                    } catch (Exception e) {
-                        originalEx = e;
-                        CleanupUtils.fail(writer, originalEx);
-                    } finally {
-                        originalEx = CleanupUtils.closeSilently(writer, originalEx);
+                        ByteBuffer buffer = frame.getBuffer();
+                        writer.nextFrame(buffer);
+                        buffer.compact();
                     }
+                } catch (Exception e) {
+                    originalEx = e;
                 } finally {
-                    originalEx = CleanupUtils.closeSilently(reader, originalEx);
+                    if (originalEx != null) {
+                        InvokeUtil.tryUninterruptibleWithCleanups(writer::fail, writer::close, reader::close);
+                    } else {
+                        InvokeUtil.tryUninterruptibleWithCleanups(writer::close, reader::close);
+                    }
                 }
             } catch (Exception e) {
                 originalEx = ExceptionUtils.suppress(originalEx, e);
             } finally {
-                originalEx = CleanupUtils.closeSilently(collector, originalEx);
+                InvokeUtil.runUninterruptible(collector::close);
             }
         } catch (Exception e) {
             originalEx = ExceptionUtils.suppress(originalEx, e);
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
index 52bd07e..7ec4f86 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
@@ -127,12 +127,17 @@
         if (operation.getStatus() == ILSMIOOperation.LSMIOOperationStatus.FAILURE) {
             IRetryPolicy policy = new ExponentialRetryPolicy();
             while (operation.getStatus() == ILSMIOOperation.LSMIOOperationStatus.FAILURE) {
-                if (policy.retry(operation.getFailure())) {
-                    operation.setFailure(null);
-                    operation.setStatus(ILSMIOOperation.LSMIOOperationStatus.SUCCESS);
-                    lsmHarness.flush(operation);
-                } else {
-                    break;
+                try {
+                    if (policy.retry(operation.getFailure())) {
+                        operation.setFailure(null);
+                        operation.setStatus(ILSMIOOperation.LSMIOOperationStatus.SUCCESS);
+                        lsmHarness.flush(operation);
+                    } else {
+                        break;
+                    }
+                } catch (InterruptedException e) {
+                    // in reality, this thread won't be interrupted
+                    throw HyracksDataException.create(e);
                 }
             }
         }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
index d8900ad..ee63547 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
@@ -98,12 +98,17 @@
         if (operation.getStatus() == ILSMIOOperation.LSMIOOperationStatus.FAILURE) {
             IRetryPolicy policy = new ExponentialRetryPolicy();
             while (operation.getStatus() == ILSMIOOperation.LSMIOOperationStatus.FAILURE) {
-                if (policy.retry(operation.getFailure())) {
-                    operation.setFailure(null);
-                    operation.setStatus(ILSMIOOperation.LSMIOOperationStatus.SUCCESS);
-                    lsmHarness.flush(operation);
-                } else {
-                    break;
+                try {
+                    if (policy.retry(operation.getFailure())) {
+                        operation.setFailure(null);
+                        operation.setStatus(ILSMIOOperation.LSMIOOperationStatus.SUCCESS);
+                        lsmHarness.flush(operation);
+                    } else {
+                        break;
+                    }
+                } catch (InterruptedException e) {
+                    // in reality, this thread won't be interrupted
+                    throw HyracksDataException.create(e);
                 }
             }
         }
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExponentialRetryPolicy.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExponentialRetryPolicy.java
index 080b9ea..72a3bb0 100644
--- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExponentialRetryPolicy.java
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExponentialRetryPolicy.java
@@ -84,17 +84,13 @@
     }
 
     @Override
-    public boolean retry(Throwable failure) {
+    public boolean retry(Throwable failure) throws InterruptedException {
         if (attempt < maxRetries) {
-            try {
-                long sleepTime = ThreadLocalRandom.current().nextLong(1 + delay);
-                if (printDebugLines) {
-                    LOGGER.info("Retrying after {}ms, attempt: {}/{}", sleepTime, attempt + 1, maxRetries);
-                }
-                TimeUnit.MILLISECONDS.sleep(sleepTime);
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
+            long sleepTime = ThreadLocalRandom.current().nextLong(1 + delay);
+            if (printDebugLines) {
+                LOGGER.info("Retrying after {}ms, attempt: {}/{}", sleepTime, attempt + 1, maxRetries);
             }
+            TimeUnit.MILLISECONDS.sleep(sleepTime);
             attempt++;
             delay = delay > maxDelay / 2 ? maxDelay : delay * 2;
             return true;
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/IRetryPolicy.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/IRetryPolicy.java
index 0d18a2b..2fd0191 100644
--- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/IRetryPolicy.java
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/IRetryPolicy.java
@@ -24,6 +24,7 @@
      * @param failure
      *            the cause of the failure (this cannot be null)
      * @return true if one more attempt should be done
+     * @throws InterruptedException if the retry policy can be interrupted
      */
-    boolean retry(Throwable failure);
+    boolean retry(Throwable failure) throws InterruptedException;
 }