[NO ISSUE][HYR][HTTP] Utility method to enable interruptable http requests
Change-Id: Id880af7e1e60195c53a043752d9339818c002a2c
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/6164
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksThrowingConsumer.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksThrowingConsumer.java
new file mode 100644
index 0000000..b677132
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksThrowingConsumer.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.util;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+@FunctionalInterface
+public interface HyracksThrowingConsumer<V> {
+ void process(V value) throws HyracksDataException;
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java
index 2c9790e..77f6c6a 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java
@@ -19,6 +19,7 @@
package org.apache.hyracks.http.server.utils;
import java.io.IOException;
+import java.io.InputStreamReader;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
@@ -27,14 +28,22 @@
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalDouble;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
+import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.hyracks.http.api.IServletRequest;
import org.apache.hyracks.http.api.IServletResponse;
import org.apache.hyracks.http.server.BaseRequest;
import org.apache.hyracks.http.server.FormUrlEncodedRequest;
+import org.apache.hyracks.util.ThrowingConsumer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -184,6 +193,32 @@
return i < 0 ? uri : uri.substring(0, i);
}
+ public static void handleStreamInterruptibly(CloseableHttpResponse response,
+ ThrowingConsumer<InputStreamReader> streamProcessor, ExecutorService executor,
+ Supplier<String> taskDescription) throws IOException, InterruptedException, ExecutionException {
+ // we have to consume the stream in a separate thread, as it not stop on interrupt; we need to
+ // instead close the connection to achieve the interrupt
+ Future<Void> readFuture = executor.submit(() -> {
+ InputStreamReader reader = new InputStreamReader(response.getEntity().getContent(), StandardCharsets.UTF_8);
+ streamProcessor.process(reader);
+ return null;
+ });
+ try {
+ readFuture.get();
+ } catch (InterruptedException ex) { // NOSONAR -- interrupt or rethrow
+ response.close();
+ try {
+ readFuture.get(1, TimeUnit.SECONDS);
+ } catch (TimeoutException te) {
+ LOGGER.warn("{} did not exit on stream close due to interrupt after 1s", taskDescription);
+ readFuture.cancel(true);
+ } catch (ExecutionException ee) {
+ LOGGER.debug("ignoring exception awaiting aborted {} shutdown", taskDescription, ee);
+ }
+ throw ex;
+ }
+ }
+
public static class ContentType {
public static final String APPLICATION_ADM = "application/x-adm";
public static final String APPLICATION_JSON = "application/json";
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ThrowingConsumer.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ThrowingConsumer.java
new file mode 100644
index 0000000..bfc5fdd
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ThrowingConsumer.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.util;
+
+@FunctionalInterface
+public interface ThrowingConsumer<V> {
+ void process(V value) throws Exception;
+}