[NO ISSUE] Refactoring / cleanup of HTTP cluster state / diagnostics APIs
Change-Id: If47ec45bf88a39d63421903080fee3ddc0f1e42b
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1969
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java
index 9d2415d..b64b6f6 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java
@@ -34,25 +34,32 @@
import java.util.logging.Level;
import java.util.logging.Logger;
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.runtime.utils.ClusterStateManager;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.http.api.IServletRequest;
import org.apache.hyracks.http.api.IServletResponse;
import org.apache.hyracks.http.server.utils.HttpUtil;
+import org.apache.hyracks.util.JSONUtil;
+import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.databind.node.TextNode;
import io.netty.handler.codec.http.HttpResponseStatus;
public class DiagnosticsApiServlet extends NodeControllerDetailsApiServlet {
private static final Logger LOGGER = Logger.getLogger(DiagnosticsApiServlet.class.getName());
- private final ICcApplicationContext appCtx;
+ protected final ObjectMapper om;
+ protected final IHyracksClientConnection hcc;
+ protected final ExecutorService executor;
- public DiagnosticsApiServlet(ConcurrentMap<String, Object> ctx, String[] paths, ICcApplicationContext appCtx) {
+ public DiagnosticsApiServlet(ConcurrentMap<String, Object> ctx, String[] paths) {
super(ctx, paths);
- this.appCtx = appCtx;
+ this.om = new ObjectMapper();
+ this.hcc = (IHyracksClientConnection) ctx.get(HYRACKS_CONNECTION_ATTR);
+ this.executor = (ExecutorService) ctx.get(ServletConstants.EXECUTOR_SERVICE_ATTR);
}
@Override
@@ -60,15 +67,13 @@
HttpUtil.setContentType(response, HttpUtil.ContentType.APPLICATION_JSON, HttpUtil.Encoding.UTF8);
PrintWriter responseWriter = response.writer();
ObjectNode json;
- ObjectMapper om = new ObjectMapper();
response.setStatus(HttpResponseStatus.OK);
om.enable(SerializationFeature.INDENT_OUTPUT);
try {
if (!"".equals(localPath(request))) {
throw new IllegalArgumentException();
}
- json = getClusterDiagnosticsJSON();
- responseWriter.write(om.writerWithDefaultPrettyPrinter().writeValueAsString(json));
+ responseWriter.write(JSONUtil.convertNode(getClusterDiagnosticsJSON()));
} catch (IllegalStateException e) { // NOSONAR - exception not logged or rethrown
response.setStatus(HttpResponseStatus.SERVICE_UNAVAILABLE);
} catch (IllegalArgumentException e) { // NOSONAR - exception not logged or rethrown
@@ -81,44 +86,53 @@
responseWriter.flush();
}
- private ObjectNode getClusterDiagnosticsJSON() throws Exception {
- ObjectMapper om = new ObjectMapper();
- IHyracksClientConnection hcc = (IHyracksClientConnection) ctx.get(HYRACKS_CONNECTION_ATTR);
- ExecutorService executor = (ExecutorService) ctx.get(ServletConstants.EXECUTOR_SERVICE_ATTR);
- Map<String, Future<ObjectNode>> ccFutureData = new HashMap<>();
+ protected ObjectNode getClusterDiagnosticsJSON() throws Exception {
+ Map<String, Future<JsonNode>> ccFutureData;
+ ccFutureData = getCcDiagosticsFutures();
+
+ Map<String, Map<String, Future<JsonNode>>> ncDataMap = new HashMap<>();
+ for (String nc : ClusterStateManager.INSTANCE.getParticipantNodes()) {
+ ncDataMap.put(nc, getNcDiagnosticFutures(nc));
+ }
+ ObjectNode result = om.createObjectNode();
+ result.putPOJO("cc", resolveFutures(ccFutureData));
+ List<Map<String, ?>> ncList = new ArrayList<>();
+ for (Map.Entry<String, Map<String, Future<JsonNode>>> entry : ncDataMap.entrySet()) {
+ final Map<String, JsonNode> ncMap = resolveFutures(entry.getValue());
+ ncMap.put("node_id", new TextNode(entry.getKey()));
+ ncList.add(ncMap);
+ }
+ result.putPOJO("ncs", ncList);
+ result.put("date", String.valueOf(new Date()));
+ return result;
+ }
+
+ protected Map<String, Future<JsonNode>> getNcDiagnosticFutures(String nc) {
+ Map<String, Future<JsonNode>> ncData;
+ ncData = new HashMap<>();
+ ncData.put("threaddump", executor.submit(() -> fixupKeys((ObjectNode) om.readTree(hcc.getThreadDump(nc)))));
+ ncData.put("config",
+ executor.submit(() -> fixupKeys((ObjectNode) om.readTree(hcc.getNodeDetailsJSON(nc, false, true)))));
+ ncData.put("stats", executor.submit(() -> fixupKeys(processNodeStats(hcc, nc))));
+ return ncData;
+ }
+
+ protected Map<String, Future<JsonNode>> getCcDiagosticsFutures() {
+ Map<String, Future<JsonNode>> ccFutureData;
+ ccFutureData = new HashMap<>();
ccFutureData.put("threaddump",
executor.submit(() -> fixupKeys((ObjectNode) om.readTree(hcc.getThreadDump(null)))));
ccFutureData.put("config",
executor.submit(() -> fixupKeys((ObjectNode) om.readTree(hcc.getNodeDetailsJSON(null, false, true)))));
ccFutureData.put("stats",
executor.submit(() -> fixupKeys((ObjectNode) om.readTree(hcc.getNodeDetailsJSON(null, true, false)))));
-
- Map<String, Map<String, Future<ObjectNode>>> ncDataMap = new HashMap<>();
- for (String nc : appCtx.getMetadataProperties().getNodeNames()) {
- Map<String, Future<ObjectNode>> ncData = new HashMap<>();
- ncData.put("threaddump", executor.submit(() -> fixupKeys((ObjectNode) om.readTree(hcc.getThreadDump(nc)))));
- ncData.put("config", executor
- .submit(() -> fixupKeys((ObjectNode) om.readTree(hcc.getNodeDetailsJSON(nc, false, true)))));
- ncData.put("stats", executor.submit(() -> fixupKeys(processNodeStats(hcc, nc))));
- ncDataMap.put(nc, ncData);
- }
- ObjectNode result = om.createObjectNode();
- result.putPOJO("cc", resolveFutures(ccFutureData));
- List<Map<String, ?>> ncList = new ArrayList<>();
- for (Map.Entry<String, Map<String, Future<ObjectNode>>> entry : ncDataMap.entrySet()) {
- final Map<String, Object> ncMap = resolveFutures(entry.getValue());
- ncMap.put("node_id", entry.getKey());
- ncList.add(ncMap);
- }
- result.putPOJO("ncs", ncList);
- result.putPOJO("date", new Date());
- return result;
+ return ccFutureData;
}
- private Map<String, Object> resolveFutures(Map<String, Future<ObjectNode>> futureMap)
+ protected Map<String, JsonNode> resolveFutures(Map<String, Future<JsonNode>> futureMap)
throws ExecutionException, InterruptedException {
- Map<String, Object> result = new HashMap<>();
- for (Map.Entry<String, Future<ObjectNode>> entry : futureMap.entrySet()) {
+ Map<String, JsonNode> result = new HashMap<>();
+ for (Map.Entry<String, Future<JsonNode>> entry : futureMap.entrySet()) {
result.put(entry.getKey(), entry.getValue().get());
}
return result;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NodeControllerDetailsApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NodeControllerDetailsApiServlet.java
index 6291869..01c59f3 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NodeControllerDetailsApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NodeControllerDetailsApiServlet.java
@@ -180,8 +180,9 @@
for (int i = 0; i < gcNames.size(); i++) {
ObjectNode gc = om.createObjectNode();
gc.set("name", gcNames.get(i));
- gc.set("collection-time", ((ArrayNode) gcCollectionTimes.get(i)).get(index));
- gc.set("collection-count", ((ArrayNode) gcCollectionCounts.get(i)).get(index));
+ gc.set("collection-time", gcCollectionTimes.get(i).get(index));
+ gc.set("collection-count", gcCollectionCounts.get(i).get(index));
+ fixupKeys(gc);
gcs.add(gc);
}
json.set("gcs", gcs);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index 9040ad1..b08c1e2 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -313,7 +313,7 @@
case Servlets.CLUSTER_STATE_CC_DETAIL:
return new ClusterControllerDetailsApiServlet(ctx, paths);
case Servlets.DIAGNOSTICS:
- return new DiagnosticsApiServlet(ctx, paths, appCtx);
+ return new DiagnosticsApiServlet(ctx, paths);
case Servlets.ACTIVE_STATS:
return new ActiveStatsApiServlet(ctx, paths, appCtx);
default:
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java
index be84bc3..c5b9d11 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java
@@ -19,7 +19,7 @@
package org.apache.asterix.test.runtime;
-import static org.apache.hyracks.control.common.utils.ThreadDumpHelper.takeDumpJSON;
+import static org.apache.hyracks.control.common.utils.ThreadDumpHelper.takeDumpJSONString;
import java.io.BufferedReader;
import java.io.File;
@@ -190,7 +190,7 @@
}
private static void checkThreadLeaks() throws IOException {
- String threadDump = ThreadDumpHelper.takeDumpJSON(ManagementFactory.getThreadMXBean());
+ String threadDump = ThreadDumpHelper.takeDumpJSONString(ManagementFactory.getThreadMXBean());
// Currently we only do sanity check for threads used in the execution engine.
// Later we should check if there are leaked storage threads as well.
if (threadDump.contains("Operator") || threadDump.contains("SuperActivity")
@@ -215,7 +215,7 @@
try (BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()))) {
int runFileCount = Integer.parseInt(reader.readLine().trim());
if (runFileCount != 0) {
- System.out.print(takeDumpJSON(ManagementFactory.getThreadMXBean()));
+ System.out.print(takeDumpJSONString(ManagementFactory.getThreadMXBean()));
outputLeakedOpenFiles(processId);
throw new AssertionError("There are " + runFileCount + " leaked run files.");
}
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/diagnostics_1/diagnostics_1.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/diagnostics_1/diagnostics_1.1.regexadm
index b2dafd9..b0d6638 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/diagnostics_1/diagnostics_1.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/diagnostics_1/diagnostics_1.1.regexadm
@@ -1,7 +1,10 @@
.*"cc" : \{.*
+ "config" : \{.*
+ \},.*
"stats" : \{.*
\},.*
"threaddump" : \{.*
- \},.*
- "config" : \{.*
- \}.*"ncs".*"node_id" : "asterix_nc1".*"threaddump".*"node_id" : "asterix_nc2".*"threaddump".*
\ No newline at end of file
+ \}.*
+ },
+ "date" : "[SMTWF][^"]* [^"]*[0-9]{4}",
+ "ncs".*"node_id" : "asterix_nc1".*"threaddump".*"node_id" : "asterix_nc2".*"threaddump".*
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetThreadDumpWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetThreadDumpWork.java
index d7c7be1..b5388c2 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetThreadDumpWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetThreadDumpWork.java
@@ -55,7 +55,7 @@
if (nodeId == null) {
// null nodeId means the request is for the cluster controller
try {
- callback.setValue(ThreadDumpHelper.takeDumpJSON(ManagementFactory.getThreadMXBean()));
+ callback.setValue(ThreadDumpHelper.takeDumpJSONString(ManagementFactory.getThreadMXBean()));
} catch (Exception e) {
LOGGER.log(Level.WARNING, "Exception taking CC thread dump", e);
callback.setException(e);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/utils/ThreadDumpHelper.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/utils/ThreadDumpHelper.java
index 1d6dbcd..62c6586 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/utils/ThreadDumpHelper.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/utils/ThreadDumpHelper.java
@@ -18,12 +18,6 @@
*/
package org.apache.hyracks.control.common.utils;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.SerializationFeature;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-
import java.io.IOException;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
@@ -33,12 +27,23 @@
import java.util.List;
import java.util.Map;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
public class ThreadDumpHelper {
+ private static final ObjectMapper om = new ObjectMapper();
private ThreadDumpHelper() {
+ om.enable(SerializationFeature.INDENT_OUTPUT);
}
- public static String takeDumpJSON(ThreadMXBean threadMXBean) throws IOException {
+ public static String takeDumpJSONString(ThreadMXBean threadMXBean) throws IOException {
+ ObjectNode json = takeDumpJSON(threadMXBean);
+ return om.writerWithDefaultPrettyPrinter().writeValueAsString(json);
+ }
+
+ public static ObjectNode takeDumpJSON(ThreadMXBean threadMXBean) {
ThreadInfo[] threadInfos = threadMXBean.dumpAllThreads(true, true);
List<Map<String, Object>> threads = new ArrayList<>();
@@ -73,9 +78,8 @@
}
threads.add(threadMap);
}
- ObjectMapper om = new ObjectMapper();
ObjectNode json = om.createObjectNode();
- json.put("date", new Date().toString());
+ json.put("date", String.valueOf(new Date()));
json.putPOJO("threads", threads);
long[] deadlockedThreads = threadMXBean.findDeadlockedThreads();
@@ -86,7 +90,6 @@
if (monitorDeadlockedThreads != null && monitorDeadlockedThreads.length > 0) {
json.putPOJO("monitor_deadlocked_thread_ids", monitorDeadlockedThreads);
}
- om.enable(SerializationFeature.INDENT_OUTPUT);
- return om.writerWithDefaultPrettyPrinter().writeValueAsString(json);
+ return json;
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/ThreadDumpTask.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/ThreadDumpTask.java
index 5ebb99a..abde87f 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/ThreadDumpTask.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/ThreadDumpTask.java
@@ -38,7 +38,7 @@
public void run() {
String result;
try {
- result = ThreadDumpHelper.takeDumpJSON(ncs.getThreadMXBean());
+ result = ThreadDumpHelper.takeDumpJSONString(ncs.getThreadMXBean());
} catch (Exception e) {
LOGGER.log(Level.WARNING, "Exception taking thread dump", e);
result = null;