Set the priority of the following threads to be Thread.MAX_PRIORITY:
1. heartbeat thread at NC
2. IPC network thread
3. work queue thread in CC
Change-Id: I4e53a85e21a6bdee48a3ca8d004569700f911fbd
Reviewed-on: https://asterix-gerrit.ics.uci.edu/389
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
index 9ae7f77..f2f450d 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
@@ -35,8 +35,6 @@
import java.util.logging.Level;
import java.util.logging.Logger;
-import org.xml.sax.InputSource;
-
import org.apache.hyracks.api.application.ICCApplicationEntryPoint;
import org.apache.hyracks.api.client.ClusterControllerInfo;
import org.apache.hyracks.api.client.HyracksClientInterfaceFunctions;
@@ -104,6 +102,7 @@
import org.apache.hyracks.ipc.exceptions.IPCException;
import org.apache.hyracks.ipc.impl.IPCSystem;
import org.apache.hyracks.ipc.impl.JavaSerializationBasedPayloadSerializerDeserializer;
+import org.xml.sax.InputSource;
public class ClusterControllerService extends AbstractRemoteService {
private static Logger LOGGER = Logger.getLogger(ClusterControllerService.class.getName());
@@ -172,6 +171,7 @@
runMapArchive = new LinkedHashMap<JobId, JobRun>() {
private static final long serialVersionUID = 1L;
+ @Override
protected boolean removeEldestEntry(Map.Entry<JobId, JobRun> eldest) {
return size() > ccConfig.jobHistorySize;
}
@@ -181,11 +181,12 @@
/** history size + 1 is for the case when history size = 0 */
private int allowedSize = 100 * (ccConfig.jobHistorySize + 1);
+ @Override
protected boolean removeEldestEntry(Map.Entry<JobId, List<Exception>> eldest) {
return size() > allowedSize;
}
};
- workQueue = new WorkQueue();
+ workQueue = new WorkQueue(Thread.MAX_PRIORITY); // WorkQueue is in charge of heartbeat as well as other events.
this.timer = new Timer(true);
final ClusterTopology topology = computeClusterTopology(ccConfig);
ccContext = new ICCContext() {
@@ -604,7 +605,7 @@
/**
* Add a deployment run
- *
+ *
* @param deploymentKey
* @param nodeControllerIds
*/
@@ -614,7 +615,7 @@
/**
* Get a deployment run
- *
+ *
* @param deploymentKey
*/
public synchronized DeploymentRun getDeploymentRun(DeploymentId deploymentKey) {
@@ -623,7 +624,7 @@
/**
* Remove a deployment run
- *
+ *
* @param deploymentKey
*/
public synchronized void removeDeploymentRun(DeploymentId deploymentKey) {
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/WorkQueue.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/WorkQueue.java
index e0d0eb0..da341a7 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/WorkQueue.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/WorkQueue.java
@@ -36,8 +36,14 @@
private boolean stopped;
private AtomicInteger enqueueCount;
private AtomicInteger dequeueCount;
+ private int threadPriority = Thread.MAX_PRIORITY;
- public WorkQueue() {
+ public WorkQueue(int threadPriority) {
+ if (threadPriority != Thread.MAX_PRIORITY && threadPriority != Thread.NORM_PRIORITY
+ && threadPriority != Thread.MIN_PRIORITY) {
+ throw new IllegalArgumentException("Illegal thread priority number.");
+ }
+ this.threadPriority = threadPriority;
queue = new LinkedBlockingQueue<AbstractWork>();
thread = new WorkerThread();
stopSemaphore = new Semaphore(1);
@@ -96,7 +102,7 @@
private class WorkerThread extends Thread {
WorkerThread() {
setDaemon(true);
- setPriority(MAX_PRIORITY);
+ setPriority(threadPriority);
}
@Override
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index e7689d4..ab0f16b 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -26,6 +26,7 @@
import java.lang.management.OperatingSystemMXBean;
import java.lang.management.RuntimeMXBean;
import java.lang.management.ThreadMXBean;
+import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Hashtable;
@@ -42,7 +43,6 @@
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.commons.lang3.mutable.MutableObject;
-
import org.apache.hyracks.api.application.INCApplicationEntryPoint;
import org.apache.hyracks.api.client.NodeControllerInfo;
import org.apache.hyracks.api.comm.NetworkAddress;
@@ -170,11 +170,11 @@
throw new Exception("id not set");
}
partitionManager = new PartitionManager(this);
- netManager = new NetworkManager(ncConfig.dataIPAddress, ncConfig.dataPort, partitionManager, ncConfig.nNetThreads,
- ncConfig.nNetBuffers, ncConfig.dataPublicIPAddress, ncConfig.dataPublicPort);
+ netManager = new NetworkManager(ncConfig.dataIPAddress, ncConfig.dataPort, partitionManager,
+ ncConfig.nNetThreads, ncConfig.nNetBuffers, ncConfig.dataPublicIPAddress, ncConfig.dataPublicPort);
lccm = new LifeCycleComponentManager();
- queue = new WorkQueue();
+ queue = new WorkQueue(Thread.NORM_PRIORITY); // Reserves MAX_PRIORITY of the heartbeat thread.
jobletMap = new Hashtable<JobId, Joblet>();
timer = new Timer(true);
serverCtx = new ServerContext(ServerContext.ServerType.NODE_CONTROLLER, new File(new File(
@@ -243,11 +243,11 @@
private void init() throws Exception {
ctx.getIOManager().setExecutor(executor);
- datasetPartitionManager = new DatasetPartitionManager
- (this, executor, ncConfig.resultManagerMemory, ncConfig.resultTTL, ncConfig.resultSweepThreshold);
- datasetNetworkManager = new DatasetNetworkManager
- (ncConfig.resultIPAddress, ncConfig.resultPort, datasetPartitionManager,
- ncConfig.nNetThreads, ncConfig.nNetBuffers, ncConfig.resultPublicIPAddress, ncConfig.resultPublicPort);
+ datasetPartitionManager = new DatasetPartitionManager(this, executor, ncConfig.resultManagerMemory,
+ ncConfig.resultTTL, ncConfig.resultSweepThreshold);
+ datasetNetworkManager = new DatasetNetworkManager(ncConfig.resultIPAddress, ncConfig.resultPort,
+ datasetPartitionManager, ncConfig.nNetThreads, ncConfig.nNetBuffers, ncConfig.resultPublicIPAddress,
+ ncConfig.resultPublicPort);
}
@Override
@@ -273,12 +273,11 @@
if (ncConfig.dataPublicIPAddress != null) {
netAddress = new NetworkAddress(ncConfig.dataPublicIPAddress, ncConfig.dataPublicPort);
}
- ccs.registerNode(new NodeRegistration(ipc.getSocketAddress(), id, ncConfig, netAddress,
- datasetAddress, osMXBean.getName(), osMXBean.getArch(), osMXBean
- .getVersion(), osMXBean.getAvailableProcessors(), runtimeMXBean.getVmName(), runtimeMXBean
- .getVmVersion(), runtimeMXBean.getVmVendor(), runtimeMXBean.getClassPath(), runtimeMXBean
- .getLibraryPath(), runtimeMXBean.getBootClassPath(), runtimeMXBean.getInputArguments(),
- runtimeMXBean.getSystemProperties(), hbSchema));
+ ccs.registerNode(new NodeRegistration(ipc.getSocketAddress(), id, ncConfig, netAddress, datasetAddress,
+ osMXBean.getName(), osMXBean.getArch(), osMXBean.getVersion(), osMXBean.getAvailableProcessors(),
+ runtimeMXBean.getVmName(), runtimeMXBean.getVmVersion(), runtimeMXBean.getVmVendor(), runtimeMXBean
+ .getClassPath(), runtimeMXBean.getLibraryPath(), runtimeMXBean.getBootClassPath(),
+ runtimeMXBean.getInputArguments(), runtimeMXBean.getSystemProperties(), hbSchema));
synchronized (this) {
while (registrationPending) {
@@ -294,6 +293,11 @@
heartbeatTask = new HeartbeatTask(ccs);
+ // Use reflection to set the priority of the timer thread.
+ Field threadField = timer.getClass().getDeclaredField("thread");
+ threadField.setAccessible(true);
+ Thread timerThread = (Thread) threadField.get(timer); // The internal timer thread of the Timer object.
+ timerThread.setPriority(Thread.MAX_PRIORITY);
// Schedule heartbeat generator.
timer.schedule(heartbeatTask, 0, nodeParameters.getHeartbeatPeriod());
@@ -571,6 +575,7 @@
this.nodeControllerService = ncAppEntryPoint;
}
+ @Override
public void run() {
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Shutdown hook in progress");
diff --git a/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java b/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
index dd3d2f9..e02e4f4 100644
--- a/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
+++ b/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
@@ -64,6 +64,7 @@
IPCConnectionManager(IPCSystem system, InetSocketAddress socketAddress) throws IOException {
this.system = system;
this.networkThread = new NetworkThread();
+ this.networkThread.setPriority(Thread.MAX_PRIORITY);
this.serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().setReuseAddress(true);
serverSocketChannel.configureBlocking(false);
@@ -114,8 +115,8 @@
}
} else if (attempt < retries) {
if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Connection to " + remoteAddress +
- " failed (Attempt " + attempt + " of " + retries + ")");
+ LOGGER.info("Connection to " + remoteAddress + " failed (Attempt " + attempt + " of " + retries
+ + ")");
attempt++;
Thread.sleep(5000);
}
@@ -308,8 +309,7 @@
if (!channel.finishConnect()) {
throw new Exception("Connection did not finish");
}
- }
- catch (Exception e) {
+ } catch (Exception e) {
e.printStackTrace();
handle.setState(HandleState.CONNECT_FAILED);
continue;
diff --git a/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java b/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java
index 42e19f7..4b6c22f 100644
--- a/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java
+++ b/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java
@@ -106,7 +106,7 @@
public IOThread() throws IOException {
super("TCPEndpoint IO Thread");
setDaemon(true);
- setPriority(MAX_PRIORITY);
+ setPriority(Thread.NORM_PRIORITY);
this.pendingConnections = new ArrayList<InetSocketAddress>();
this.workingPendingConnections = new ArrayList<InetSocketAddress>();
this.incomingConnections = new ArrayList<SocketChannel>();
diff --git a/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java b/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
index 4f9d9ce..facd8c1 100644
--- a/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
+++ b/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
@@ -475,7 +475,7 @@
private int cleanedCount = 0;
public CleanerThread() {
- setPriority(MAX_PRIORITY);
+ setPriority(Thread.NORM_PRIORITY);
setDaemon(true);
}