[NO ISSUE][HYR][HTTP] Utility method to enable interruptable http requests

Change-Id: Id880af7e1e60195c53a043752d9339818c002a2c
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/6164
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksThrowingConsumer.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksThrowingConsumer.java
new file mode 100644
index 0000000..b677132
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksThrowingConsumer.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.util;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+@FunctionalInterface
+public interface HyracksThrowingConsumer<V> {
+    void process(V value) throws HyracksDataException;
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java
index 2c9790e..77f6c6a 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java
@@ -19,6 +19,7 @@
 package org.apache.hyracks.http.server.utils;
 
 import java.io.IOException;
+import java.io.InputStreamReader;
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
@@ -27,14 +28,22 @@
 import java.util.Objects;
 import java.util.Optional;
 import java.util.OptionalDouble;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Supplier;
 import java.util.regex.Pattern;
 import java.util.stream.Stream;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.http.client.methods.CloseableHttpResponse;
 import org.apache.hyracks.http.api.IServletRequest;
 import org.apache.hyracks.http.api.IServletResponse;
 import org.apache.hyracks.http.server.BaseRequest;
 import org.apache.hyracks.http.server.FormUrlEncodedRequest;
+import org.apache.hyracks.util.ThrowingConsumer;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -184,6 +193,32 @@
         return i < 0 ? uri : uri.substring(0, i);
     }
 
+    public static void handleStreamInterruptibly(CloseableHttpResponse response,
+            ThrowingConsumer<InputStreamReader> streamProcessor, ExecutorService executor,
+            Supplier<String> taskDescription) throws IOException, InterruptedException, ExecutionException {
+        // we have to consume the stream in a separate thread, as it not stop on interrupt; we need to
+        // instead close the connection to achieve the interrupt
+        Future<Void> readFuture = executor.submit(() -> {
+            InputStreamReader reader = new InputStreamReader(response.getEntity().getContent(), StandardCharsets.UTF_8);
+            streamProcessor.process(reader);
+            return null;
+        });
+        try {
+            readFuture.get();
+        } catch (InterruptedException ex) { // NOSONAR -- interrupt or rethrow
+            response.close();
+            try {
+                readFuture.get(1, TimeUnit.SECONDS);
+            } catch (TimeoutException te) {
+                LOGGER.warn("{} did not exit on stream close due to interrupt after 1s", taskDescription);
+                readFuture.cancel(true);
+            } catch (ExecutionException ee) {
+                LOGGER.debug("ignoring exception awaiting aborted {} shutdown", taskDescription, ee);
+            }
+            throw ex;
+        }
+    }
+
     public static class ContentType {
         public static final String APPLICATION_ADM = "application/x-adm";
         public static final String APPLICATION_JSON = "application/json";
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ThrowingConsumer.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ThrowingConsumer.java
new file mode 100644
index 0000000..bfc5fdd
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ThrowingConsumer.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.util;
+
+@FunctionalInterface
+public interface ThrowingConsumer<V> {
+    void process(V value) throws Exception;
+}