[ASTERIXDB-2229][OTR] Restore Thread Names in Thread Pool
- user model changes: no
- storage format changes: no
- interface changes: no
- Restore thread names to their original names
before returning them to the pool of the NC
app.
- Remove explicit thread name resets.
- Delete unused ThreadExecutor class.
Change-Id: I3bda1b65e7aefd35d2b8cfa814f73369c3bf5a18
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2447
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java
index dacd0ee..2de8319 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java
@@ -59,19 +59,12 @@
public Callable<Void> recover() {
if (retryPolicyFactory == NoRetryPolicyFactory.INSTANCE) {
- return () -> {
- return null;
- };
+ return () -> null;
}
IRetryPolicy policy = retryPolicyFactory.create(listener);
return () -> {
- String nameBefore = Thread.currentThread().getName();
- try {
- Thread.currentThread().setName("RecoveryTask (" + listener.getEntityId() + ")");
- doRecover(policy);
- } finally {
- Thread.currentThread().setName(nameBefore);
- }
+ Thread.currentThread().setName("RecoveryTask (" + listener.getEntityId() + ")");
+ doRecover(policy);
return null;
};
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index c724952..288e5f2 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -28,7 +28,6 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import org.apache.asterix.active.ActiveManager;
@@ -102,6 +101,7 @@
import org.apache.hyracks.storage.common.file.FileMapManager;
import org.apache.hyracks.storage.common.file.ILocalResourceRepositoryFactory;
import org.apache.hyracks.storage.common.file.IResourceIdFactory;
+import org.apache.hyracks.util.MaintainedThreadNameExecutorService;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -171,7 +171,8 @@
@Override
public void initialize(boolean initialRun) throws IOException, ACIDException {
ioManager = getServiceContext().getIoManager();
- threadExecutor = Executors.newCachedThreadPool(getServiceContext().getThreadFactory());
+ threadExecutor =
+ MaintainedThreadNameExecutorService.newCachedThreadPool(getServiceContext().getThreadFactory());
ICacheMemoryAllocator allocator = new HeapBufferAllocator();
IPageCleanerPolicy pcp = new DelayPageCleanerPolicy(600000);
IPageReplacementStrategy prs = new ClockPageReplacementStrategy(allocator,
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/ThreadExecutor.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/ThreadExecutor.java
deleted file mode 100644
index 03cead0..0000000
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/ThreadExecutor.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.api;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.ThreadFactory;
-
-public class ThreadExecutor implements Executor {
- private final ExecutorService executorService;
-
- public ThreadExecutor(ThreadFactory threadFactory) {
- executorService = Executors.newCachedThreadPool(threadFactory);
- }
-
- @Override
- public void execute(Runnable command) {
- executorService.execute(command);
- }
-
- public <T> Future<T> submit(Callable<T> command) {
- return executorService.submit(command);
- }
-}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
index 6c95cc2..0503677 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
@@ -60,7 +60,6 @@
@Override
protected void start() throws HyracksDataException, InterruptedException {
- String before = Thread.currentThread().getName();
Thread.currentThread().setName("Intake Thread");
try {
writer.open();
@@ -85,7 +84,6 @@
throw e;
} finally {
writer.close();
- Thread.currentThread().setName(before);
}
}
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
index 417164c..bc93294 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
@@ -106,7 +106,6 @@
@Override
public void run() {
- final String oldName = Thread.currentThread().getName();
Thread.currentThread().setName("Replication Worker");
try {
ReplicationRequestType requestType = ReplicationProtocol.getRequestType(socketChannel, inBuffer);
@@ -124,7 +123,6 @@
LOGGER.warn("Failed to close replication socket.", e);
}
}
- Thread.currentThread().setName(oldName);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
index f8fe77f..caef91a 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
@@ -34,7 +34,6 @@
import java.util.TimerTask;
import java.util.TreeMap;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import org.apache.hyracks.api.application.ICCApplication;
import org.apache.hyracks.api.client.ClusterControllerInfo;
@@ -82,6 +81,7 @@
import org.apache.hyracks.ipc.impl.IPCSystem;
import org.apache.hyracks.ipc.impl.JavaSerializationBasedPayloadSerializerDeserializer;
import org.apache.hyracks.util.ExitUtil;
+import org.apache.hyracks.util.MaintainedThreadNameExecutorService;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -232,7 +232,7 @@
serviceCtx = new CCServiceContext(this, serverCtx, ccContext, ccConfig.getAppConfig());
serviceCtx.addJobLifecycleListener(datasetDirectoryService);
application.init(serviceCtx);
- executor = Executors.newCachedThreadPool(serviceCtx.getThreadFactory());
+ executor = MaintainedThreadNameExecutorService.newCachedThreadPool(serviceCtx.getThreadFactory());
application.start(ccConfig.getAppArgsArray());
IJobCapacityController jobCapacityController = application.getJobCapacityController();
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 0d0c535..b67bfac 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -37,7 +37,6 @@
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -94,6 +93,7 @@
import org.apache.hyracks.ipc.impl.IPCSystem;
import org.apache.hyracks.net.protocols.muxdemux.FullFrameChannelInterfaceFactory;
import org.apache.hyracks.util.ExitUtil;
+import org.apache.hyracks.util.MaintainedThreadNameExecutorService;
import org.apache.hyracks.util.PidHelper;
import org.apache.hyracks.util.trace.ITracer;
import org.apache.hyracks.util.trace.Tracer;
@@ -480,7 +480,7 @@
serviceCtx = new NCServiceContext(this, serverCtx, ioManager, id, memoryManager, lccm,
ncConfig.getNodeScopedAppConfig());
application.init(serviceCtx);
- executor = Executors.newCachedThreadPool(serviceCtx.getThreadFactory());
+ executor = MaintainedThreadNameExecutorService.newCachedThreadPool(serviceCtx.getThreadFactory());
application.start(ncConfig.getAppArgsArray());
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
index 9b32cc7..dcfc291 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
@@ -275,7 +275,6 @@
@Override
public void run() {
Thread ct = Thread.currentThread();
- String threadName = ct.getName();
// Calls synchronized addPendingThread(..) to make sure that in the abort() method,
// the thread is not escaped from interruption.
if (!addPendingThread(ct)) {
@@ -304,7 +303,6 @@
if (!addPendingThread(thread)) {
return;
}
- String oldName = thread.getName();
thread.setName(displayName + ":" + taskAttemptId + ":" + cIdx);
thread.setPriority(Thread.MIN_PRIORITY);
try {
@@ -314,7 +312,6 @@
exceptions.add(e);
}
} finally {
- thread.setName(oldName);
sem.release();
removePendingThread(thread);
}
@@ -351,7 +348,6 @@
} catch (Exception e) {
exceptions.add(e);
} finally {
- ct.setName(threadName);
close();
removePendingThread(ct);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
index a782bca..29c2ff2 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
@@ -88,7 +88,6 @@
public void run() {
Thread thread = Thread.currentThread();
setDataConsumerThread(thread); // Sets the data consumer thread to the current thread.
- String oldName = thread.getName();
try {
thread.setName(MaterializingPipelinedPartition.class.getName() + pid);
FileReference fRefCopy;
@@ -167,7 +166,6 @@
} catch (Exception e) {
LOGGER.log(Level.ERROR, e.getMessage(), e);
} finally {
- thread.setName(oldName);
setDataConsumerThread(null); // Sets back the data consumer thread to null.
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/MaintainedThreadNameExecutorService.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/MaintainedThreadNameExecutorService.java
new file mode 100644
index 0000000..a9ebb50
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/MaintainedThreadNameExecutorService.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.util;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+public class MaintainedThreadNameExecutorService extends ThreadPoolExecutor {
+
+ private final Map<Thread, String> threadNames = new ConcurrentHashMap<>();
+
+ private MaintainedThreadNameExecutorService(ThreadFactory threadFactory) {
+ super(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), threadFactory);
+ }
+
+ public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
+ return new MaintainedThreadNameExecutorService(threadFactory);
+ }
+
+ @Override
+ protected void beforeExecute(Thread t, Runnable r) {
+ threadNames.put(t, t.getName());
+ super.beforeExecute(t, r);
+ }
+
+ @Override
+ protected void afterExecute(Runnable r, Throwable t) {
+ super.afterExecute(r, t);
+ final Thread thread = Thread.currentThread();
+ final String originalThreadName = threadNames.remove(thread);
+ if (originalThreadName != null) {
+ thread.setName(originalThreadName);
+ }
+ }
+}