[NO ISSUE][HYR][HTTP] http stream handler -> consumer / processor
Change-Id: I7cd7622dbee880845d0b4233ce3a3b17af15eebc
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/11524
Reviewed-by: Michael Blow <mblow@apache.org>
Reviewed-by: Ian Maxon <imaxon@uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java
index c61e0d2..835cd54 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java
@@ -45,6 +45,7 @@
import org.apache.hyracks.http.server.FormUrlEncodedRequest;
import org.apache.hyracks.http.server.HttpServer;
import org.apache.hyracks.util.ThrowingConsumer;
+import org.apache.hyracks.util.ThrowingFunction;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -191,16 +192,23 @@
return i < 0 ? uri : uri.substring(0, i);
}
- public static void handleStreamInterruptibly(CloseableHttpResponse response,
+ public static void consumeStreamInterruptibly(CloseableHttpResponse response,
ThrowingConsumer<Reader> streamProcessor, ExecutorService executor, Supplier<String> descriptionSupplier)
+ throws InterruptedException, ExecutionException, IOException {
+ processStreamInterruptibly(response, ThrowingConsumer.asFunction(streamProcessor), executor,
+ descriptionSupplier);
+ }
+
+ public static <T> T processStreamInterruptibly(CloseableHttpResponse response,
+ ThrowingFunction<Reader, T> streamProcessor, ExecutorService executor, Supplier<String> descriptionSupplier)
throws IOException, InterruptedException, ExecutionException {
// we have to consume the stream in a separate thread, as it not stop on interrupt; we need to
// instead close the connection to achieve the interrupt
String description = descriptionSupplier.get();
- Future<Void> readFuture = executor.submit(() -> {
+ Future<T> readFuture = executor.submit(() -> {
Thread.currentThread().setName(description);
InputStreamReader reader = new InputStreamReader(response.getEntity().getContent(), StandardCharsets.UTF_8);
- streamProcessor.process(new Reader() {
+ return streamProcessor.process(new Reader() {
@Override
public int read(char[] cbuf, int off, int len) throws IOException {
return reader.read(cbuf, off, len);
@@ -213,10 +221,9 @@
LOGGER.debug("ignoring close on {}", reader);
}
});
- return null;
});
try {
- readFuture.get();
+ return readFuture.get();
} catch (InterruptedException ex) { // NOSONAR -- interrupt or rethrow
response.close();
try {