Merge branch 'gerrit/mad-hatter'

Change-Id: Icb703199fd6eb3e41e19e288192f2b72215b4b06
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java
index 77e7c5a..4f20174 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java
@@ -20,6 +20,7 @@
 
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.io.Reader;
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
@@ -194,13 +195,25 @@
     }
 
     public static void handleStreamInterruptibly(CloseableHttpResponse response,
-            ThrowingConsumer<InputStreamReader> streamProcessor, ExecutorService executor,
-            Supplier<String> taskDescription) throws IOException, InterruptedException, ExecutionException {
+            ThrowingConsumer<Reader> streamProcessor, ExecutorService executor, Supplier<String> taskDescription)
+            throws IOException, InterruptedException, ExecutionException {
         // we have to consume the stream in a separate thread, as it not stop on interrupt; we need to
         // instead close the connection to achieve the interrupt
         Future<Void> readFuture = executor.submit(() -> {
             InputStreamReader reader = new InputStreamReader(response.getEntity().getContent(), StandardCharsets.UTF_8);
-            streamProcessor.process(reader);
+            streamProcessor.process(new Reader() {
+                @Override
+                public int read(char[] cbuf, int off, int len) throws IOException {
+                    return reader.read(cbuf, off, len);
+                }
+
+                @Override
+                public void close() throws IOException {
+                    // this will block until the response is closed, which will cause hangs if the stream processor
+                    // tries to close the reader e.g. on processing failure
+                    LOGGER.debug("ignoring close on {}", reader);
+                }
+            });
             return null;
         });
         try {
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
index 4991f86..abd9fda 100644
--- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
@@ -20,6 +20,9 @@
 
 import java.lang.reflect.Field;
 import java.util.IdentityHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 
@@ -63,6 +66,8 @@
     private static final ExitThread exitThread = new ExitThread();
     private static final ShutdownWatchdog watchdogThread = new ShutdownWatchdog();
     private static final MutableLong shutdownHaltDelay = new MutableLong(10 * 60 * 1000L); // 10 minutes default
+    private static final ExecutorService haltThreadDumpExecutor = Executors.newSingleThreadExecutor();
+    private static final long HALT_THREADDUMP_TIMEOUT_SECONDS = 60;
 
     static {
         watchdogThread.start();
@@ -97,11 +102,19 @@
     }
 
     public static synchronized void halt(int status, Level logLevel) {
-        LOGGER.log(logLevel, "JVM halting with status {}; thread dump at halt: {}", status,
-                ThreadDumpUtil.takeDumpString());
-        // try to give time for the log to be emitted...
-        LogManager.shutdown();
-        Runtime.getRuntime().halt(status);
+        try {
+            Future<?> future = haltThreadDumpExecutor.submit(() -> {
+                LOGGER.log(logLevel, "JVM halting with status {}; thread dump at halt: {}", status,
+                        ThreadDumpUtil.takeDumpString());
+                // try to give time for the log to be emitted...
+                LogManager.shutdown();
+            });
+            future.get(HALT_THREADDUMP_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+        } catch (Exception e) {
+            LOGGER.warn("exception logging thread dump on halt", e);
+        } finally {
+            Runtime.getRuntime().halt(status);
+        }
     }
 
     public static boolean registerShutdownHook(Thread shutdownHook) {