[NO ISSUE][HYR][HTTP] http stream handler -> consumer / processor

Change-Id: I7cd7622dbee880845d0b4233ce3a3b17af15eebc
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/11524
Reviewed-by: Michael Blow <mblow@apache.org>
Reviewed-by: Ian Maxon <imaxon@uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java
index c61e0d2..835cd54 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java
@@ -45,6 +45,7 @@
 import org.apache.hyracks.http.server.FormUrlEncodedRequest;
 import org.apache.hyracks.http.server.HttpServer;
 import org.apache.hyracks.util.ThrowingConsumer;
+import org.apache.hyracks.util.ThrowingFunction;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -191,16 +192,23 @@
         return i < 0 ? uri : uri.substring(0, i);
     }
 
-    public static void handleStreamInterruptibly(CloseableHttpResponse response,
+    public static void consumeStreamInterruptibly(CloseableHttpResponse response,
             ThrowingConsumer<Reader> streamProcessor, ExecutorService executor, Supplier<String> descriptionSupplier)
+            throws InterruptedException, ExecutionException, IOException {
+        processStreamInterruptibly(response, ThrowingConsumer.asFunction(streamProcessor), executor,
+                descriptionSupplier);
+    }
+
+    public static <T> T processStreamInterruptibly(CloseableHttpResponse response,
+            ThrowingFunction<Reader, T> streamProcessor, ExecutorService executor, Supplier<String> descriptionSupplier)
             throws IOException, InterruptedException, ExecutionException {
         // we have to consume the stream in a separate thread, as it not stop on interrupt; we need to
         // instead close the connection to achieve the interrupt
         String description = descriptionSupplier.get();
-        Future<Void> readFuture = executor.submit(() -> {
+        Future<T> readFuture = executor.submit(() -> {
             Thread.currentThread().setName(description);
             InputStreamReader reader = new InputStreamReader(response.getEntity().getContent(), StandardCharsets.UTF_8);
-            streamProcessor.process(new Reader() {
+            return streamProcessor.process(new Reader() {
                 @Override
                 public int read(char[] cbuf, int off, int len) throws IOException {
                     return reader.read(cbuf, off, len);
@@ -213,10 +221,9 @@
                     LOGGER.debug("ignoring close on {}", reader);
                 }
             });
-            return null;
         });
         try {
-            readFuture.get();
+            return readFuture.get();
         } catch (InterruptedException ex) { // NOSONAR -- interrupt or rethrow
             response.close();
             try {