Merge branch 'gerrit/mad-hatter'
Change-Id: Icb703199fd6eb3e41e19e288192f2b72215b4b06
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java
index 77e7c5a..4f20174 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java
@@ -20,6 +20,7 @@
import java.io.IOException;
import java.io.InputStreamReader;
+import java.io.Reader;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
@@ -194,13 +195,25 @@
}
public static void handleStreamInterruptibly(CloseableHttpResponse response,
- ThrowingConsumer<InputStreamReader> streamProcessor, ExecutorService executor,
- Supplier<String> taskDescription) throws IOException, InterruptedException, ExecutionException {
+ ThrowingConsumer<Reader> streamProcessor, ExecutorService executor, Supplier<String> taskDescription)
+ throws IOException, InterruptedException, ExecutionException {
// we have to consume the stream in a separate thread, as it not stop on interrupt; we need to
// instead close the connection to achieve the interrupt
Future<Void> readFuture = executor.submit(() -> {
InputStreamReader reader = new InputStreamReader(response.getEntity().getContent(), StandardCharsets.UTF_8);
- streamProcessor.process(reader);
+ streamProcessor.process(new Reader() {
+ @Override
+ public int read(char[] cbuf, int off, int len) throws IOException {
+ return reader.read(cbuf, off, len);
+ }
+
+ @Override
+ public void close() throws IOException {
+ // this will block until the response is closed, which will cause hangs if the stream processor
+ // tries to close the reader e.g. on processing failure
+ LOGGER.debug("ignoring close on {}", reader);
+ }
+ });
return null;
});
try {
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
index 4991f86..abd9fda 100644
--- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
@@ -20,6 +20,9 @@
import java.lang.reflect.Field;
import java.util.IdentityHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
@@ -63,6 +66,8 @@
private static final ExitThread exitThread = new ExitThread();
private static final ShutdownWatchdog watchdogThread = new ShutdownWatchdog();
private static final MutableLong shutdownHaltDelay = new MutableLong(10 * 60 * 1000L); // 10 minutes default
+ private static final ExecutorService haltThreadDumpExecutor = Executors.newSingleThreadExecutor();
+ private static final long HALT_THREADDUMP_TIMEOUT_SECONDS = 60;
static {
watchdogThread.start();
@@ -97,11 +102,19 @@
}
public static synchronized void halt(int status, Level logLevel) {
- LOGGER.log(logLevel, "JVM halting with status {}; thread dump at halt: {}", status,
- ThreadDumpUtil.takeDumpString());
- // try to give time for the log to be emitted...
- LogManager.shutdown();
- Runtime.getRuntime().halt(status);
+ try {
+ Future<?> future = haltThreadDumpExecutor.submit(() -> {
+ LOGGER.log(logLevel, "JVM halting with status {}; thread dump at halt: {}", status,
+ ThreadDumpUtil.takeDumpString());
+ // try to give time for the log to be emitted...
+ LogManager.shutdown();
+ });
+ future.get(HALT_THREADDUMP_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ LOGGER.warn("exception logging thread dump on halt", e);
+ } finally {
+ Runtime.getRuntime().halt(status);
+ }
}
public static boolean registerShutdownHook(Thread shutdownHook) {