diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
index 9ace417..6041ab5 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
@@ -72,7 +72,7 @@
 
     @Override
     public String toString() {
-        return ActivePartitionMessage.class.getSimpleName() + event;
+        return ActivePartitionMessage.class.getSimpleName() + '-' + event;
     }
 
     @Override
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
index e30272c..c6f41bf 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
@@ -362,7 +362,16 @@
             cancelRecovery = false;
             setState(ActivityState.TEMPORARILY_FAILED);
             LOGGER.log(level, "Recovery task has been submitted");
-            recoveryTask = executor.submit(() -> doRecover(policy));
+            recoveryTask = executor.submit(() -> {
+                String nameBefore = Thread.currentThread().getName();
+                try {
+                    Thread.currentThread().setName("RecoveryTask (" + entityId + ")");
+                    doRecover(policy);
+                } finally {
+                    Thread.currentThread().setName(nameBefore);
+                }
+                return null;
+            });
         }
     }
 
@@ -378,11 +387,13 @@
             synchronized (this) {
                 if (cancelRecovery) {
                     recoveryTask = null;
+                    notifyAll();
                     return null;
                 }
                 while (clusterStateManager.getState() != ClusterState.ACTIVE) {
                     if (cancelRecovery) {
                         recoveryTask = null;
+                        notifyAll();
                         return null;
                     }
                     wait();
@@ -398,8 +409,15 @@
             }
             synchronized (this) {
                 try {
+                    if (cancelRecovery) {
+                        recoveryTask = null;
+                        notifyAll();
+                        return null;
+                    }
                     setState(ActivityState.RECOVERING);
                     doStart(metadataProvider);
+                    recoveryTask = null;
+                    notifyAll();
                     return null;
                 } catch (Exception e) {
                     LOGGER.log(level, "Attempt to revive " + entityId + " failed", e);
@@ -411,6 +429,14 @@
                 notifyAll();
             }
         }
+        // Recovery task is essntially over now either through failure or through cancellation(stop)
+        synchronized (this) {
+            recoveryTask = null;
+            notifyAll();
+            if (state != ActivityState.TEMPORARILY_FAILED) {
+                return null;
+            }
+        }
         IMetadataLockManager lockManager = metadataProvider.getApplicationContext().getMetadataLockManager();
         try {
             lockManager.acquireActiveEntityWriteLock(metadataProvider.getLocks(),
@@ -422,7 +448,6 @@
             synchronized (this) {
                 if (state == ActivityState.TEMPORARILY_FAILED) {
                     setState(ActivityState.PERMANENTLY_FAILED);
-                    recoveryTask = null;
                 }
                 notifyAll();
             }
@@ -464,49 +489,40 @@
             throws HyracksDataException, AlgebricksException;
 
     @Override
-    public void stop(MetadataProvider metadataProvider) throws HyracksDataException, InterruptedException {
-        Future<Void> aRecoveryTask = null;
-        synchronized (this) {
-            waitForNonTransitionState();
-            if (state != ActivityState.RUNNING && state != ActivityState.PERMANENTLY_FAILED
-                    && state != ActivityState.TEMPORARILY_FAILED) {
-                throw new RuntimeDataException(ErrorCode.ACTIVE_ENTITY_CANNOT_BE_STOPPED, entityId, state);
-            }
-            if (state == ActivityState.TEMPORARILY_FAILED || state == ActivityState.PERMANENTLY_FAILED) {
-                if (recoveryTask != null) {
-                    aRecoveryTask = recoveryTask;
-                    cancelRecovery = true;
-                    recoveryTask.cancel(true);
-                }
-                setState(ActivityState.STOPPED);
-                try {
-                    setRunning(metadataProvider, false);
-                } catch (Exception e) {
-                    LOGGER.log(Level.SEVERE, "Failed to set the entity state as not running " + entityId, e);
-                    throw HyracksDataException.create(e);
-                }
-            } else if (state == ActivityState.RUNNING) {
-                setState(ActivityState.STOPPING);
-                try {
-                    doStop(metadataProvider);
-                    setRunning(metadataProvider, false);
-                } catch (Exception e) {
-                    setState(ActivityState.PERMANENTLY_FAILED);
-                    LOGGER.log(Level.SEVERE, "Failed to stop the entity " + entityId, e);
-                    throw HyracksDataException.create(e);
-                }
-            } else {
-                throw new RuntimeDataException(ErrorCode.ACTIVE_ENTITY_CANNOT_BE_STOPPED, entityId, state);
-            }
+    public synchronized void stop(MetadataProvider metadataProvider) throws HyracksDataException, InterruptedException {
+        waitForNonTransitionState();
+        if (state != ActivityState.RUNNING && state != ActivityState.PERMANENTLY_FAILED
+                && state != ActivityState.TEMPORARILY_FAILED) {
+            throw new RuntimeDataException(ErrorCode.ACTIVE_ENTITY_CANNOT_BE_STOPPED, entityId, state);
         }
-        try {
-            if (aRecoveryTask != null) {
-                aRecoveryTask.get();
+        if (state == ActivityState.TEMPORARILY_FAILED || state == ActivityState.PERMANENTLY_FAILED) {
+            if (recoveryTask != null) {
+                setState(ActivityState.STOPPING);
+                cancelRecovery = true;
+                recoveryTask.cancel(true);
+                while (recoveryTask != null) {
+                    wait();
+                }
             }
-        } catch (InterruptedException e) {
-            throw e;
-        } catch (Exception e) {
-            throw HyracksDataException.create(e);
+            setState(ActivityState.STOPPED);
+            try {
+                setRunning(metadataProvider, false);
+            } catch (Exception e) {
+                LOGGER.log(Level.SEVERE, "Failed to set the entity state as not running " + entityId, e);
+                throw HyracksDataException.create(e);
+            }
+        } else if (state == ActivityState.RUNNING) {
+            setState(ActivityState.STOPPING);
+            try {
+                doStop(metadataProvider);
+                setRunning(metadataProvider, false);
+            } catch (Exception e) {
+                setState(ActivityState.PERMANENTLY_FAILED);
+                LOGGER.log(Level.SEVERE, "Failed to stop the entity " + entityId, e);
+                throw HyracksDataException.create(e);
+            }
+        } else {
+            throw new RuntimeDataException(ErrorCode.ACTIVE_ENTITY_CANNOT_BE_STOPPED, entityId, state);
         }
     }
 
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java
index a256bcf..d38a363 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java
@@ -302,6 +302,7 @@
         listener.onStop(Behavior.SUCCEED);
         Action stopAction = users[2].stopActivity(listener);
         stopAction.sync();
+        assertSuccess(stopAction);
         Assert.assertEquals(ActivityState.STOPPED, listener.getState());
     }
 
@@ -423,8 +424,10 @@
         Assert.assertEquals(ActivityState.RUNNING, listener.getState());
         listener.onStop(Behavior.SUCCEED);
         WaitForStateSubscriber subscriber = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.STOPPED));
-        users[1].stopActivity(listener);
+        Action stopAction = users[1].stopActivity(listener);
         subscriber.sync();
+        stopAction.sync();
+        assertSuccess(stopAction);
         Assert.assertEquals(ActivityState.STOPPED, listener.getState());
     }
 
@@ -485,10 +488,12 @@
                 new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RECOVERING));
         recoveringSubscriber.sync();
         listener.onStop(Behavior.SUCCEED);
-        users[0].stopActivity(listener);
+        Action stopAction = users[0].stopActivity(listener);
         listener.allowStep();
         runningSubscriber.sync();
         stopSubscriber.sync();
+        stopAction.sync();
+        assertSuccess(stopAction);
         Assert.assertEquals(ActivityState.STOPPED, listener.getState());
     }
 
@@ -511,10 +516,12 @@
                 new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RECOVERING));
         recoveringSubscriber.sync();
         listener.onStop(Behavior.SUCCEED);
-        users[0].stopActivity(listener);
+        Action stopAction = users[0].stopActivity(listener);
         listener.allowStep();
         secondTempFailSubscriber.sync();
         stopSubscriber.sync();
+        stopAction.sync();
+        assertSuccess(stopAction);
         Assert.assertEquals(ActivityState.STOPPED, listener.getState());
     }
 
@@ -537,10 +544,12 @@
                 new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RECOVERING));
         recoveringSubscriber.sync();
         listener.onStop(Behavior.SUCCEED);
-        users[0].stopActivity(listener);
+        Action stopAction = users[0].stopActivity(listener);
         listener.allowStep();
         secondTempFailSubscriber.sync();
         stopSubscriber.sync();
+        stopAction.sync();
+        assertSuccess(stopAction);
         Assert.assertEquals(ActivityState.STOPPED, listener.getState());
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/ByteArrayAccessibleOutputStream.java b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/ByteArrayAccessibleOutputStream.java
index 85ba115..bf0e1dd 100644
--- a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/ByteArrayAccessibleOutputStream.java
+++ b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/ByteArrayAccessibleOutputStream.java
@@ -23,7 +23,7 @@
 
 public class ByteArrayAccessibleOutputStream extends ByteArrayOutputStream {
 
-    private static final int MAX_SIZE = 1024 * 1024 * 64;
+    private static final int MAX_SIZE = 1024 * 1024 * 32;
     private static final double BUFFER_INCREMENT_FACTOR = 1.5;
 
     public ByteArrayAccessibleOutputStream() {
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
index ca20f4a..e190bfa 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
@@ -34,6 +34,7 @@
 import org.apache.hyracks.http.api.IServlet;
 
 import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.EventLoopGroup;
@@ -83,6 +84,8 @@
         executor = new ThreadPoolExecutor(numExecutorThreads, numExecutorThreads, 0L, TimeUnit.MILLISECONDS,
                 new LinkedBlockingQueue<>(requestQueueSize),
                 runnable -> new Thread(runnable, "HttpExecutor(port:" + port + ")-" + threadId.getAndIncrement()));
+        long directMemoryBudget = numExecutorThreads * (long) HIGH_WRITE_BUFFER_WATER_MARK;
+        LOGGER.log(Level.INFO, "The direct memory budget for this server is " + directMemoryBudget + " bytes");
     }
 
     public final void start() throws Exception { // NOSONAR
@@ -194,6 +197,7 @@
         Collections.sort(servlets, (l1, l2) -> l2.getPaths()[0].length() - l1.getPaths()[0].length());
         ServerBootstrap b = new ServerBootstrap();
         b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
+                .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                 .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, WRITE_BUFFER_WATER_MARK)
                 .handler(new LoggingHandler(LogLevel.DEBUG)).childHandler(new HttpServerInitializer(this));
         channel = b.bind(port).sync().channel();
@@ -255,7 +259,7 @@
     }
 
     protected HttpServerHandler createHttpHandler(int chunkSize) {
-        return new HttpServerHandler<>(this, chunkSize);
+        return new HttpServerHandler(this, chunkSize);
     }
 
     public ExecutorService getExecutor() {
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/ChattyServlet.java b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/ChattyServlet.java
index 30df003..863eddd 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/ChattyServlet.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/ChattyServlet.java
@@ -18,7 +18,13 @@
  */
 package org.apache.hyracks.http.servlet;
 
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryPoolMXBean;
+import java.lang.management.MemoryType;
+import java.lang.reflect.Field;
+import java.util.List;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -27,10 +33,13 @@
 import org.apache.hyracks.http.server.AbstractServlet;
 import org.apache.hyracks.http.server.utils.HttpUtil;
 
+import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.util.internal.PlatformDependent;
 
 public class ChattyServlet extends AbstractServlet {
     private static final Logger LOGGER = Logger.getLogger(ChattyServlet.class.getName());
+    private static long MAX = 0L;
     private byte[] bytes;
 
     public ChattyServlet(ConcurrentMap<String, Object> ctx, String[] paths) {
@@ -52,5 +61,57 @@
         for (int i = 0; i < 100; i++) {
             response.outputStream().write(bytes);
         }
+        printMemUsage();
+    }
+
+    @SuppressWarnings("restriction")
+    public synchronized static void printMemUsage() {
+        StringBuilder report = new StringBuilder();
+        report.append("sun.misc.VM.maxDirectMemory: ");
+        report.append(sun.misc.VM.maxDirectMemory());
+        report.append('\n');
+        report.append("sun.misc.SharedSecrets.getJavaNioAccess().getDirectBufferPool().getMemoryUsed(): ");
+        report.append(sun.misc.SharedSecrets.getJavaNioAccess().getDirectBufferPool().getMemoryUsed());
+        report.append('\n');
+        report.append("sun.misc.SharedSecrets.getJavaNioAccess().getDirectBufferPool().getTotalCapacity(): ");
+        report.append(sun.misc.SharedSecrets.getJavaNioAccess().getDirectBufferPool().getTotalCapacity());
+        report.append('\n');
+        report.append("ManagementFactory.getMemoryMXBean().getNonHeapMemoryUsage(): ");
+        report.append(ManagementFactory.getMemoryMXBean().getNonHeapMemoryUsage());
+        report.append('\n');
+        report.append("---------------------------- Beans ----------------------------");
+        report.append('\n');
+        List<MemoryPoolMXBean> memPoolBeans = ManagementFactory.getMemoryPoolMXBeans();
+        for (MemoryPoolMXBean bean : memPoolBeans) {
+            if (bean.isValid() && bean.getType() == MemoryType.NON_HEAP) {
+                report.append(bean.getName());
+                report.append(": ");
+                report.append(bean.getUsage());
+                report.append('\n');
+            }
+        }
+        report.append("---------------------------- Netty ----------------------------");
+        report.append('\n');
+        try {
+            Field field = PlatformDependent.class.getDeclaredField("DIRECT_MEMORY_COUNTER");
+            field.setAccessible(true);
+            AtomicLong usedDirectMemory = (AtomicLong) field.get(null);
+            long used = usedDirectMemory.get();
+            report.append("Current PlatformDependent.DIRECT_MEMORY_COUNTER: ");
+            report.append(used);
+            report.append('\n');
+            report.append("Maximum PlatformDependent.DIRECT_MEMORY_COUNTER: ");
+            MAX = Math.max(MAX, used);
+            report.append(MAX);
+            report.append('\n');
+            report.append('\n');
+        } catch (Throwable th) {
+            th.printStackTrace();
+            LOGGER.log(Level.WARNING, "Failed to access PlatformDependent.DIRECT_MEMORY_COUNTER", th);
+            return;
+        }
+        report.append("--------------- PooledByteBufAllocator.DEFAULT ----------------");
+        report.append(PooledByteBufAllocator.DEFAULT.dumpStats());
+        LOGGER.log(Level.INFO, report.toString());
     }
 }
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java
index 854980e..6512dc1 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java
@@ -94,6 +94,7 @@
 
     @Test
     public void testChattyServer() throws Exception {
+        ChattyServlet.printMemUsage();
         int numRequests = 64;
         int numExecutors = 32;
         int serverQueueSize = 32;
