[ASTERIXDB-2039][OTH] Log Http Server direct memory budget
- user model changes: no
- storage format changes: no
- interface changes: no
details:
- Calculate mem budget = number of executors x max high watermark.
- Log the calculated budget.
Change-Id: I4a324f10db52e77c7e69ca4246b9d84b4479f25d
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1941
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
index 9ace417..6041ab5 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
@@ -72,7 +72,7 @@
@Override
public String toString() {
- return ActivePartitionMessage.class.getSimpleName() + event;
+ return ActivePartitionMessage.class.getSimpleName() + '-' + event;
}
@Override
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
index e30272c..c6f41bf 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
@@ -362,7 +362,16 @@
cancelRecovery = false;
setState(ActivityState.TEMPORARILY_FAILED);
LOGGER.log(level, "Recovery task has been submitted");
- recoveryTask = executor.submit(() -> doRecover(policy));
+ recoveryTask = executor.submit(() -> {
+ String nameBefore = Thread.currentThread().getName();
+ try {
+ Thread.currentThread().setName("RecoveryTask (" + entityId + ")");
+ doRecover(policy);
+ } finally {
+ Thread.currentThread().setName(nameBefore);
+ }
+ return null;
+ });
}
}
@@ -378,11 +387,13 @@
synchronized (this) {
if (cancelRecovery) {
recoveryTask = null;
+ notifyAll();
return null;
}
while (clusterStateManager.getState() != ClusterState.ACTIVE) {
if (cancelRecovery) {
recoveryTask = null;
+ notifyAll();
return null;
}
wait();
@@ -398,8 +409,15 @@
}
synchronized (this) {
try {
+ if (cancelRecovery) {
+ recoveryTask = null;
+ notifyAll();
+ return null;
+ }
setState(ActivityState.RECOVERING);
doStart(metadataProvider);
+ recoveryTask = null;
+ notifyAll();
return null;
} catch (Exception e) {
LOGGER.log(level, "Attempt to revive " + entityId + " failed", e);
@@ -411,6 +429,14 @@
notifyAll();
}
}
+ // Recovery task is essntially over now either through failure or through cancellation(stop)
+ synchronized (this) {
+ recoveryTask = null;
+ notifyAll();
+ if (state != ActivityState.TEMPORARILY_FAILED) {
+ return null;
+ }
+ }
IMetadataLockManager lockManager = metadataProvider.getApplicationContext().getMetadataLockManager();
try {
lockManager.acquireActiveEntityWriteLock(metadataProvider.getLocks(),
@@ -422,7 +448,6 @@
synchronized (this) {
if (state == ActivityState.TEMPORARILY_FAILED) {
setState(ActivityState.PERMANENTLY_FAILED);
- recoveryTask = null;
}
notifyAll();
}
@@ -464,49 +489,40 @@
throws HyracksDataException, AlgebricksException;
@Override
- public void stop(MetadataProvider metadataProvider) throws HyracksDataException, InterruptedException {
- Future<Void> aRecoveryTask = null;
- synchronized (this) {
- waitForNonTransitionState();
- if (state != ActivityState.RUNNING && state != ActivityState.PERMANENTLY_FAILED
- && state != ActivityState.TEMPORARILY_FAILED) {
- throw new RuntimeDataException(ErrorCode.ACTIVE_ENTITY_CANNOT_BE_STOPPED, entityId, state);
- }
- if (state == ActivityState.TEMPORARILY_FAILED || state == ActivityState.PERMANENTLY_FAILED) {
- if (recoveryTask != null) {
- aRecoveryTask = recoveryTask;
- cancelRecovery = true;
- recoveryTask.cancel(true);
- }
- setState(ActivityState.STOPPED);
- try {
- setRunning(metadataProvider, false);
- } catch (Exception e) {
- LOGGER.log(Level.SEVERE, "Failed to set the entity state as not running " + entityId, e);
- throw HyracksDataException.create(e);
- }
- } else if (state == ActivityState.RUNNING) {
- setState(ActivityState.STOPPING);
- try {
- doStop(metadataProvider);
- setRunning(metadataProvider, false);
- } catch (Exception e) {
- setState(ActivityState.PERMANENTLY_FAILED);
- LOGGER.log(Level.SEVERE, "Failed to stop the entity " + entityId, e);
- throw HyracksDataException.create(e);
- }
- } else {
- throw new RuntimeDataException(ErrorCode.ACTIVE_ENTITY_CANNOT_BE_STOPPED, entityId, state);
- }
+ public synchronized void stop(MetadataProvider metadataProvider) throws HyracksDataException, InterruptedException {
+ waitForNonTransitionState();
+ if (state != ActivityState.RUNNING && state != ActivityState.PERMANENTLY_FAILED
+ && state != ActivityState.TEMPORARILY_FAILED) {
+ throw new RuntimeDataException(ErrorCode.ACTIVE_ENTITY_CANNOT_BE_STOPPED, entityId, state);
}
- try {
- if (aRecoveryTask != null) {
- aRecoveryTask.get();
+ if (state == ActivityState.TEMPORARILY_FAILED || state == ActivityState.PERMANENTLY_FAILED) {
+ if (recoveryTask != null) {
+ setState(ActivityState.STOPPING);
+ cancelRecovery = true;
+ recoveryTask.cancel(true);
+ while (recoveryTask != null) {
+ wait();
+ }
}
- } catch (InterruptedException e) {
- throw e;
- } catch (Exception e) {
- throw HyracksDataException.create(e);
+ setState(ActivityState.STOPPED);
+ try {
+ setRunning(metadataProvider, false);
+ } catch (Exception e) {
+ LOGGER.log(Level.SEVERE, "Failed to set the entity state as not running " + entityId, e);
+ throw HyracksDataException.create(e);
+ }
+ } else if (state == ActivityState.RUNNING) {
+ setState(ActivityState.STOPPING);
+ try {
+ doStop(metadataProvider);
+ setRunning(metadataProvider, false);
+ } catch (Exception e) {
+ setState(ActivityState.PERMANENTLY_FAILED);
+ LOGGER.log(Level.SEVERE, "Failed to stop the entity " + entityId, e);
+ throw HyracksDataException.create(e);
+ }
+ } else {
+ throw new RuntimeDataException(ErrorCode.ACTIVE_ENTITY_CANNOT_BE_STOPPED, entityId, state);
}
}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java
index a256bcf..d38a363 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java
@@ -302,6 +302,7 @@
listener.onStop(Behavior.SUCCEED);
Action stopAction = users[2].stopActivity(listener);
stopAction.sync();
+ assertSuccess(stopAction);
Assert.assertEquals(ActivityState.STOPPED, listener.getState());
}
@@ -423,8 +424,10 @@
Assert.assertEquals(ActivityState.RUNNING, listener.getState());
listener.onStop(Behavior.SUCCEED);
WaitForStateSubscriber subscriber = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.STOPPED));
- users[1].stopActivity(listener);
+ Action stopAction = users[1].stopActivity(listener);
subscriber.sync();
+ stopAction.sync();
+ assertSuccess(stopAction);
Assert.assertEquals(ActivityState.STOPPED, listener.getState());
}
@@ -485,10 +488,12 @@
new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RECOVERING));
recoveringSubscriber.sync();
listener.onStop(Behavior.SUCCEED);
- users[0].stopActivity(listener);
+ Action stopAction = users[0].stopActivity(listener);
listener.allowStep();
runningSubscriber.sync();
stopSubscriber.sync();
+ stopAction.sync();
+ assertSuccess(stopAction);
Assert.assertEquals(ActivityState.STOPPED, listener.getState());
}
@@ -511,10 +516,12 @@
new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RECOVERING));
recoveringSubscriber.sync();
listener.onStop(Behavior.SUCCEED);
- users[0].stopActivity(listener);
+ Action stopAction = users[0].stopActivity(listener);
listener.allowStep();
secondTempFailSubscriber.sync();
stopSubscriber.sync();
+ stopAction.sync();
+ assertSuccess(stopAction);
Assert.assertEquals(ActivityState.STOPPED, listener.getState());
}
@@ -537,10 +544,12 @@
new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RECOVERING));
recoveringSubscriber.sync();
listener.onStop(Behavior.SUCCEED);
- users[0].stopActivity(listener);
+ Action stopAction = users[0].stopActivity(listener);
listener.allowStep();
secondTempFailSubscriber.sync();
stopSubscriber.sync();
+ stopAction.sync();
+ assertSuccess(stopAction);
Assert.assertEquals(ActivityState.STOPPED, listener.getState());
}
diff --git a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/ByteArrayAccessibleOutputStream.java b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/ByteArrayAccessibleOutputStream.java
index 85ba115..bf0e1dd 100644
--- a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/ByteArrayAccessibleOutputStream.java
+++ b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/ByteArrayAccessibleOutputStream.java
@@ -23,7 +23,7 @@
public class ByteArrayAccessibleOutputStream extends ByteArrayOutputStream {
- private static final int MAX_SIZE = 1024 * 1024 * 64;
+ private static final int MAX_SIZE = 1024 * 1024 * 32;
private static final double BUFFER_INCREMENT_FACTOR = 1.5;
public ByteArrayAccessibleOutputStream() {
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
index ca20f4a..e190bfa 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
@@ -34,6 +34,7 @@
import org.apache.hyracks.http.api.IServlet;
import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
@@ -83,6 +84,8 @@
executor = new ThreadPoolExecutor(numExecutorThreads, numExecutorThreads, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(requestQueueSize),
runnable -> new Thread(runnable, "HttpExecutor(port:" + port + ")-" + threadId.getAndIncrement()));
+ long directMemoryBudget = numExecutorThreads * (long) HIGH_WRITE_BUFFER_WATER_MARK;
+ LOGGER.log(Level.INFO, "The direct memory budget for this server is " + directMemoryBudget + " bytes");
}
public final void start() throws Exception { // NOSONAR
@@ -194,6 +197,7 @@
Collections.sort(servlets, (l1, l2) -> l2.getPaths()[0].length() - l1.getPaths()[0].length());
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
+ .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, WRITE_BUFFER_WATER_MARK)
.handler(new LoggingHandler(LogLevel.DEBUG)).childHandler(new HttpServerInitializer(this));
channel = b.bind(port).sync().channel();
@@ -255,7 +259,7 @@
}
protected HttpServerHandler createHttpHandler(int chunkSize) {
- return new HttpServerHandler<>(this, chunkSize);
+ return new HttpServerHandler(this, chunkSize);
}
public ExecutorService getExecutor() {
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/ChattyServlet.java b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/ChattyServlet.java
index 30df003..863eddd 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/ChattyServlet.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/ChattyServlet.java
@@ -18,7 +18,13 @@
*/
package org.apache.hyracks.http.servlet;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryPoolMXBean;
+import java.lang.management.MemoryType;
+import java.lang.reflect.Field;
+import java.util.List;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -27,10 +33,13 @@
import org.apache.hyracks.http.server.AbstractServlet;
import org.apache.hyracks.http.server.utils.HttpUtil;
+import io.netty.buffer.PooledByteBufAllocator;
import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.util.internal.PlatformDependent;
public class ChattyServlet extends AbstractServlet {
private static final Logger LOGGER = Logger.getLogger(ChattyServlet.class.getName());
+ private static long MAX = 0L;
private byte[] bytes;
public ChattyServlet(ConcurrentMap<String, Object> ctx, String[] paths) {
@@ -52,5 +61,57 @@
for (int i = 0; i < 100; i++) {
response.outputStream().write(bytes);
}
+ printMemUsage();
+ }
+
+ @SuppressWarnings("restriction")
+ public synchronized static void printMemUsage() {
+ StringBuilder report = new StringBuilder();
+ report.append("sun.misc.VM.maxDirectMemory: ");
+ report.append(sun.misc.VM.maxDirectMemory());
+ report.append('\n');
+ report.append("sun.misc.SharedSecrets.getJavaNioAccess().getDirectBufferPool().getMemoryUsed(): ");
+ report.append(sun.misc.SharedSecrets.getJavaNioAccess().getDirectBufferPool().getMemoryUsed());
+ report.append('\n');
+ report.append("sun.misc.SharedSecrets.getJavaNioAccess().getDirectBufferPool().getTotalCapacity(): ");
+ report.append(sun.misc.SharedSecrets.getJavaNioAccess().getDirectBufferPool().getTotalCapacity());
+ report.append('\n');
+ report.append("ManagementFactory.getMemoryMXBean().getNonHeapMemoryUsage(): ");
+ report.append(ManagementFactory.getMemoryMXBean().getNonHeapMemoryUsage());
+ report.append('\n');
+ report.append("---------------------------- Beans ----------------------------");
+ report.append('\n');
+ List<MemoryPoolMXBean> memPoolBeans = ManagementFactory.getMemoryPoolMXBeans();
+ for (MemoryPoolMXBean bean : memPoolBeans) {
+ if (bean.isValid() && bean.getType() == MemoryType.NON_HEAP) {
+ report.append(bean.getName());
+ report.append(": ");
+ report.append(bean.getUsage());
+ report.append('\n');
+ }
+ }
+ report.append("---------------------------- Netty ----------------------------");
+ report.append('\n');
+ try {
+ Field field = PlatformDependent.class.getDeclaredField("DIRECT_MEMORY_COUNTER");
+ field.setAccessible(true);
+ AtomicLong usedDirectMemory = (AtomicLong) field.get(null);
+ long used = usedDirectMemory.get();
+ report.append("Current PlatformDependent.DIRECT_MEMORY_COUNTER: ");
+ report.append(used);
+ report.append('\n');
+ report.append("Maximum PlatformDependent.DIRECT_MEMORY_COUNTER: ");
+ MAX = Math.max(MAX, used);
+ report.append(MAX);
+ report.append('\n');
+ report.append('\n');
+ } catch (Throwable th) {
+ th.printStackTrace();
+ LOGGER.log(Level.WARNING, "Failed to access PlatformDependent.DIRECT_MEMORY_COUNTER", th);
+ return;
+ }
+ report.append("--------------- PooledByteBufAllocator.DEFAULT ----------------");
+ report.append(PooledByteBufAllocator.DEFAULT.dumpStats());
+ LOGGER.log(Level.INFO, report.toString());
}
}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java
index 854980e..6512dc1 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java
@@ -94,6 +94,7 @@
@Test
public void testChattyServer() throws Exception {
+ ChattyServlet.printMemUsage();
int numRequests = 64;
int numExecutors = 32;
int serverQueueSize = 32;