[NO ISSUE] Minor active refactoring
- Remove unused ActiveRuntimeManager
- Rename StatsRequestMessage -> ActiveStatsRequestMessage
- Add ActiveManager API to return all active runtimes
- Interrupt running HTTP requests after 5s upon shutdown
- Log thread dump when HTTP requests do not complete after interruption
Change-Id: I79249f7cd42496d6679eb9b0acbe8cda1892f9d3
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2021
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
Tested-by: Michael Blow <mblow@apache.org>
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
index df59dca..264e9bc 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
@@ -18,8 +18,10 @@
*/
package org.apache.asterix.active;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
@@ -32,7 +34,7 @@
import org.apache.asterix.active.message.ActiveManagerMessage;
import org.apache.asterix.active.message.ActiveStatsResponse;
-import org.apache.asterix.active.message.StatsRequestMessage;
+import org.apache.asterix.active.message.ActiveStatsRequestMessage;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.common.memory.ConcurrentFramePool;
@@ -54,7 +56,7 @@
private volatile boolean shutdown;
public ActiveManager(ExecutorService executor, String nodeId, long activeMemoryBudget, int frameSize,
- INCServiceContext serviceCtx) throws HyracksDataException {
+ INCServiceContext serviceCtx) throws HyracksDataException {
this.executor = executor;
this.nodeId = nodeId;
this.activeFramePool = new ConcurrentFramePool(nodeId, activeMemoryBudget, frameSize);
@@ -77,6 +79,10 @@
runtimes.remove(id);
}
+ public Set<ActiveRuntimeId> getRuntimeIds() {
+ return Collections.unmodifiableSet(runtimes.keySet());
+ }
+
public IActiveRuntime getRuntime(ActiveRuntimeId runtimeId) {
return runtimes.get(runtimeId);
}
@@ -93,14 +99,14 @@
stopRuntime(message);
break;
case REQUEST_STATS:
- requestStats((StatsRequestMessage) message);
+ requestStats((ActiveStatsRequestMessage) message);
break;
default:
LOGGER.warning("Unknown message type received: " + message.getKind());
}
}
- private void requestStats(StatsRequestMessage message) throws HyracksDataException {
+ private void requestStats(ActiveStatsRequestMessage message) throws HyracksDataException {
try {
ActiveRuntimeId runtimeId = (ActiveRuntimeId) message.getPayload();
IActiveRuntime runtime = runtimes.get(runtimeId);
@@ -111,9 +117,9 @@
((NodeControllerService) serviceCtx.getControllerService())
.sendApplicationMessageToCC(
JavaSerializationUtils
- .serialize(new ActiveStatsResponse(reqId, null, new RuntimeDataException(
- ErrorCode.ACTIVE_MANAGER_INVALID_RUNTIME, runtimeId.toString()))),
- null);
+ .serialize(new ActiveStatsResponse(reqId, null,
+ new RuntimeDataException(ErrorCode.ACTIVE_MANAGER_INVALID_RUNTIME,
+ runtimeId.toString()))), null);
return;
}
String stats = runtime.getStats();
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntimeManager.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntimeManager.java
deleted file mode 100644
index 18368ae..0000000
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntimeManager.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.active;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.common.exceptions.RuntimeDataException;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public class ActiveRuntimeManager {
-
- private static final Logger LOGGER = Logger.getLogger(ActiveRuntimeManager.class.getName());
- private final Map<ActiveRuntimeId, ActiveSourceOperatorNodePushable> activeRuntimes;
-
- private final ExecutorService executorService;
-
- public ActiveRuntimeManager() {
- this.activeRuntimes = new ConcurrentHashMap<>();
- this.executorService = Executors.newCachedThreadPool();
- }
-
- public void close() throws IOException {
- if (executorService != null) {
- executorService.shutdown();
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Shut down executor service for :" + ActiveRuntimeManager.class.getSimpleName());
- }
- try {
- executorService.awaitTermination(10L, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- LOGGER.log(Level.SEVERE, ActiveRuntimeManager.class.getSimpleName()
- + " was interrupted while waiting for runtime managers to shutdown", e);
- }
- if (!executorService.isTerminated()) {
- LOGGER.severe(ActiveRuntimeManager.class.getSimpleName()
- + " failed to shutdown successfully. Will be forced to shutdown");
- executorService.shutdownNow();
- }
- }
- }
-
- public ActiveSourceOperatorNodePushable getRuntime(ActiveRuntimeId runtimeId) {
- return activeRuntimes.get(runtimeId);
- }
-
- public void registerRuntime(ActiveRuntimeId runtimeId, ActiveSourceOperatorNodePushable feedRuntime)
- throws HyracksDataException {
- if (activeRuntimes.containsKey(runtimeId)) {
- throw new RuntimeDataException(ErrorCode.ACTIVE_RUNTIME_IS_ALREADY_REGISTERED, runtimeId);
- }
- activeRuntimes.put(runtimeId, feedRuntime);
- }
-
- public void deregisterRuntime(ActiveRuntimeId runtimeId) throws HyracksDataException {
- if (!activeRuntimes.containsKey(runtimeId)) {
- throw new RuntimeDataException(ErrorCode.ACTIVE_RUNTIME_IS_NOT_REGISTERED, runtimeId);
- }
- activeRuntimes.remove(runtimeId);
- }
-
- public ExecutorService getExecutorService() {
- return executorService;
- }
-
- public Set<ActiveRuntimeId> getFeedRuntimes() {
- return activeRuntimes.keySet();
- }
-
-}
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StatsRequestMessage.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveStatsRequestMessage.java
similarity index 88%
rename from asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StatsRequestMessage.java
rename to asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveStatsRequestMessage.java
index d43f00e..0dbba52 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StatsRequestMessage.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveStatsRequestMessage.java
@@ -20,11 +20,11 @@
import java.io.Serializable;
-public class StatsRequestMessage extends ActiveManagerMessage {
+public class ActiveStatsRequestMessage extends ActiveManagerMessage {
private static final long serialVersionUID = 1L;
private final long reqId;
- public StatsRequestMessage(Serializable payload, long reqId) {
+ public ActiveStatsRequestMessage(Serializable payload, long reqId) {
super(Kind.REQUEST_STATS, payload);
this.reqId = reqId;
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
index ebc2db5..254b6f0 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
@@ -22,6 +22,7 @@
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.logging.Level;
@@ -132,8 +133,8 @@
CancelQueryRequest cancelQueryMessage =
new CancelQueryRequest(nodeId, cancelQueryFuture.getFutureId(), clientContextID);
messageBroker.sendMessageToCC(cancelQueryMessage);
- cancelQueryFuture.get(ExecuteStatementRequestMessage.DEFAULT_QUERY_CANCELLATION_TIMEOUT_MILLIS,
- java.util.concurrent.TimeUnit.MILLISECONDS);
+ cancelQueryFuture.get(ExecuteStatementRequestMessage.DEFAULT_QUERY_CANCELLATION_WAIT_MILLIS,
+ TimeUnit.MILLISECONDS);
} catch (Exception e) {
exception.addSuppressed(e);
} finally {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
index 77f2b23..73d0840 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
@@ -38,7 +38,7 @@
import org.apache.asterix.active.NoRetryPolicyFactory;
import org.apache.asterix.active.message.ActivePartitionMessage;
import org.apache.asterix.active.message.ActivePartitionMessage.Event;
-import org.apache.asterix.active.message.StatsRequestMessage;
+import org.apache.asterix.active.message.ActiveStatsRequestMessage;
import org.apache.asterix.common.cluster.IClusterStateManager;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.ErrorCode;
@@ -289,7 +289,7 @@
List<INcAddressedMessage> requests = new ArrayList<>();
List<String> ncs = Arrays.asList(locations.getLocations());
for (int i = 0; i < ncs.size(); i++) {
- requests.add(new StatsRequestMessage(new ActiveRuntimeId(entityId, runtimeName, i), reqId));
+ requests.add(new ActiveStatsRequestMessage(new ActiveRuntimeId(entityId, runtimeName, i), reqId));
}
try {
List<String> responses = (List<String>) messageBroker.sendSyncRequestToNCs(reqId, ncs, requests, timeout);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
index ce57a0c..28e55a6 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
@@ -62,7 +62,7 @@
//TODO: Make configurable: https://issues.apache.org/jira/browse/ASTERIXDB-2062
public static final long DEFAULT_NC_TIMEOUT_MILLIS = TimeUnit.MINUTES.toMillis(5);
//TODO: Make configurable: https://issues.apache.org/jira/browse/ASTERIXDB-2063
- public static final long DEFAULT_QUERY_CANCELLATION_TIMEOUT_MILLIS = TimeUnit.MINUTES.toMillis(1);
+ public static final long DEFAULT_QUERY_CANCELLATION_WAIT_MILLIS = TimeUnit.MINUTES.toMillis(0);
private final String requestNodeId;
private final long requestMessageId;
private final ILangExtension.Language lang;
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java
index c5b9d11..03f42f5 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java
@@ -19,7 +19,7 @@
package org.apache.asterix.test.runtime;
-import static org.apache.hyracks.control.common.utils.ThreadDumpHelper.takeDumpJSONString;
+import static org.apache.hyracks.util.ThreadDumpUtil.takeDumpJSONString;
import java.io.BufferedReader;
import java.io.File;
@@ -39,7 +39,7 @@
import org.apache.commons.lang.SystemUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hyracks.api.io.IODeviceHandle;
-import org.apache.hyracks.control.common.utils.ThreadDumpHelper;
+import org.apache.hyracks.util.ThreadDumpUtil;
import org.apache.hyracks.control.nc.NodeControllerService;
/**
@@ -190,7 +190,7 @@
}
private static void checkThreadLeaks() throws IOException {
- String threadDump = ThreadDumpHelper.takeDumpJSONString(ManagementFactory.getThreadMXBean());
+ String threadDump = ThreadDumpUtil.takeDumpJSONString();
// Currently we only do sanity check for threads used in the execution engine.
// Later we should check if there are leaked storage threads as well.
if (threadDump.contains("Operator") || threadDump.contains("SuperActivity")
@@ -215,7 +215,7 @@
try (BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()))) {
int runFileCount = Integer.parseInt(reader.readLine().trim());
if (runFileCount != 0) {
- System.out.print(takeDumpJSONString(ManagementFactory.getThreadMXBean()));
+ System.out.print(takeDumpJSONString());
outputLeakedOpenFiles(processId);
throw new AssertionError("There are " + runFileCount + " leaked run files.");
}
diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index fd5dba2..b1a5ff2 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -63,7 +63,7 @@
27 = Operation not supported
28 = Invalid duration %1$s
29 = Unknown duration unit %1$s
-30 = Query timed out
+30 = Query timed out and will be cancelled
100 = Unable to instantiate class %1$s
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
index c85e236..824f51a 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
@@ -36,6 +36,9 @@
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
public class FeedRecordDataFlowController<T> extends AbstractFeedDataFlowController {
+ public static final String INCOMING_RECORDS_COUNT_FIELD_NAME = "incoming-records-count";
+ public static final String FAILED_AT_PARSER_RECORDS_COUNT_FIELD_NAME = "failed-at-parser-records-count";
+
public enum State {
CREATED,
STARTED,
@@ -278,7 +281,7 @@
@Override
public String getStats() {
- return "{\"incoming-records-count\": " + incomingRecordsCount + ", \"failed-at-parser-records-count\": "
- + failedRecordsCount + "}";
+ return "{\"" + INCOMING_RECORDS_COUNT_FIELD_NAME + "\": " + incomingRecordsCount + ", \"" +
+ FAILED_AT_PARSER_RECORDS_COUNT_FIELD_NAME + "\": " + failedRecordsCount + "}";
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetThreadDumpWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetThreadDumpWork.java
index b5388c2..407f9cd 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetThreadDumpWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetThreadDumpWork.java
@@ -19,7 +19,6 @@
package org.apache.hyracks.control.cc.work;
-import java.lang.management.ManagementFactory;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -29,7 +28,7 @@
import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.control.cc.NodeControllerState;
import org.apache.hyracks.control.cc.cluster.INodeManager;
-import org.apache.hyracks.control.common.utils.ThreadDumpHelper;
+import org.apache.hyracks.util.ThreadDumpUtil;
import org.apache.hyracks.control.common.work.AbstractWork;
import org.apache.hyracks.control.common.work.IResultCallback;
@@ -55,7 +54,7 @@
if (nodeId == null) {
// null nodeId means the request is for the cluster controller
try {
- callback.setValue(ThreadDumpHelper.takeDumpJSONString(ManagementFactory.getThreadMXBean()));
+ callback.setValue(ThreadDumpUtil.takeDumpJSONString());
} catch (Exception e) {
LOGGER.log(Level.WARNING, "Exception taking CC thread dump", e);
callback.setException(e);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index ed5598b..a426d47 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -210,7 +210,7 @@
osMXBean = ManagementFactory.getOperatingSystemMXBean();
getNodeControllerInfosAcceptor = new MutableObject<>();
memoryManager = new MemoryManager((long) (memoryMXBean.getHeapMemoryUsage().getMax() * MEMORY_FUDGE_FACTOR));
- ioCounter = new IOCounterFactory().getIOCounter();
+ ioCounter = IOCounterFactory.INSTANCE.getIOCounter();
}
public IOManager getIoManager() {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/profiling/IOCounterFactory.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/profiling/IOCounterFactory.java
index 1b7cf8f..2301ae6 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/profiling/IOCounterFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/profiling/IOCounterFactory.java
@@ -21,6 +21,11 @@
public class IOCounterFactory {
+ public static final IOCounterFactory INSTANCE = new IOCounterFactory();
+
+ private IOCounterFactory() {
+ }
+
/**
* Get the IOCounter for the specific underlying OS
*
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/ThreadDumpTask.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/ThreadDumpTask.java
index abde87f..e23aaaa 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/ThreadDumpTask.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/ThreadDumpTask.java
@@ -21,7 +21,7 @@
import java.util.logging.Level;
import java.util.logging.Logger;
-import org.apache.hyracks.control.common.utils.ThreadDumpHelper;
+import org.apache.hyracks.util.ThreadDumpUtil;
import org.apache.hyracks.control.nc.NodeControllerService;
public class ThreadDumpTask implements Runnable {
@@ -38,7 +38,7 @@
public void run() {
String result;
try {
- result = ThreadDumpHelper.takeDumpJSONString(ncs.getThreadMXBean());
+ result = ThreadDumpUtil.takeDumpJSONString();
} catch (Exception e) {
LOGGER.log(Level.WARNING, "Exception taking thread dump", e);
result = null;
diff --git a/hyracks-fullstack/hyracks/hyracks-http/pom.xml b/hyracks-fullstack/hyracks/hyracks-http/pom.xml
index ed0e8c8..09bf513 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-http/pom.xml
@@ -66,5 +66,10 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-util</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
index 44d4dfe..645bc01 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
@@ -31,6 +31,7 @@
import java.util.logging.Logger;
import org.apache.hyracks.http.api.IServlet;
+import org.apache.hyracks.util.ThreadDumpUtil;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
@@ -218,11 +219,22 @@
}
protected void doStop() throws InterruptedException {
+ // stop taking new requests
executor.shutdown();
try {
- executor.awaitTermination(1, TimeUnit.MINUTES);
+ // wait 5s before interrupting existing requests
+ executor.awaitTermination(5, TimeUnit.SECONDS);
+ // interrupt
+ executor.shutdownNow();
+ // wait 30s for interrupted requests to unwind
+ executor.awaitTermination(30, TimeUnit.SECONDS);
if (!executor.isTerminated()) {
- LOGGER.log(Level.SEVERE, "Failed to shutdown http server executor");
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.log(Level.SEVERE, "Failed to shutdown http server executor; thread dump: " +
+ ThreadDumpUtil.takeDumpString());
+ } else {
+ LOGGER.log(Level.SEVERE, "Failed to shutdown http server executor");
+ }
}
} catch (Exception e) {
LOGGER.log(Level.SEVERE, "Error while shutting down http server executor", e);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/utils/ThreadDumpHelper.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ThreadDumpUtil.java
similarity index 84%
rename from hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/utils/ThreadDumpHelper.java
rename to hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ThreadDumpUtil.java
index 62c6586..ec1a0b2 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/utils/ThreadDumpHelper.java
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ThreadDumpUtil.java
@@ -16,9 +16,10 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.hyracks.control.common.utils;
+package org.apache.hyracks.util;
import java.io.IOException;
+import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.util.ArrayList;
@@ -26,24 +27,26 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.stream.Stream;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.node.ObjectNode;
-public class ThreadDumpHelper {
+public class ThreadDumpUtil {
private static final ObjectMapper om = new ObjectMapper();
+ private static final ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
- private ThreadDumpHelper() {
+ private ThreadDumpUtil() {
om.enable(SerializationFeature.INDENT_OUTPUT);
}
- public static String takeDumpJSONString(ThreadMXBean threadMXBean) throws IOException {
- ObjectNode json = takeDumpJSON(threadMXBean);
+ public static String takeDumpJSONString() throws IOException {
+ ObjectNode json = takeDumpJSON();
return om.writerWithDefaultPrettyPrinter().writeValueAsString(json);
}
- public static ObjectNode takeDumpJSON(ThreadMXBean threadMXBean) {
+ public static ObjectNode takeDumpJSON() {
ThreadInfo[] threadInfos = threadMXBean.dumpAllThreads(true, true);
List<Map<String, Object>> threads = new ArrayList<>();
@@ -92,4 +95,10 @@
}
return json;
}
+
+ public static String takeDumpString() {
+ StringBuilder buf = new StringBuilder(2048);
+ Stream.of(threadMXBean.dumpAllThreads(true, true)).forEach(buf::append);
+ return buf.toString();
+ }
}