Set the priority of the following threads to be Thread.MAX_PRIORITY:
1. heartbeat thread at NC
2. IPC network thread
3. work queue thread in CC

Change-Id: I4e53a85e21a6bdee48a3ca8d004569700f911fbd
Reviewed-on: https://asterix-gerrit.ics.uci.edu/389
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
index 9ae7f77..f2f450d 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
@@ -35,8 +35,6 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.xml.sax.InputSource;
-
 import org.apache.hyracks.api.application.ICCApplicationEntryPoint;
 import org.apache.hyracks.api.client.ClusterControllerInfo;
 import org.apache.hyracks.api.client.HyracksClientInterfaceFunctions;
@@ -104,6 +102,7 @@
 import org.apache.hyracks.ipc.exceptions.IPCException;
 import org.apache.hyracks.ipc.impl.IPCSystem;
 import org.apache.hyracks.ipc.impl.JavaSerializationBasedPayloadSerializerDeserializer;
+import org.xml.sax.InputSource;
 
 public class ClusterControllerService extends AbstractRemoteService {
     private static Logger LOGGER = Logger.getLogger(ClusterControllerService.class.getName());
@@ -172,6 +171,7 @@
         runMapArchive = new LinkedHashMap<JobId, JobRun>() {
             private static final long serialVersionUID = 1L;
 
+            @Override
             protected boolean removeEldestEntry(Map.Entry<JobId, JobRun> eldest) {
                 return size() > ccConfig.jobHistorySize;
             }
@@ -181,11 +181,12 @@
             /** history size + 1 is for the case when history size = 0 */
             private int allowedSize = 100 * (ccConfig.jobHistorySize + 1);
 
+            @Override
             protected boolean removeEldestEntry(Map.Entry<JobId, List<Exception>> eldest) {
                 return size() > allowedSize;
             }
         };
-        workQueue = new WorkQueue();
+        workQueue = new WorkQueue(Thread.MAX_PRIORITY); // WorkQueue is in charge of heartbeat as well as other events.
         this.timer = new Timer(true);
         final ClusterTopology topology = computeClusterTopology(ccConfig);
         ccContext = new ICCContext() {
@@ -604,7 +605,7 @@
 
     /**
      * Add a deployment run
-     * 
+     *
      * @param deploymentKey
      * @param nodeControllerIds
      */
@@ -614,7 +615,7 @@
 
     /**
      * Get a deployment run
-     * 
+     *
      * @param deploymentKey
      */
     public synchronized DeploymentRun getDeploymentRun(DeploymentId deploymentKey) {
@@ -623,7 +624,7 @@
 
     /**
      * Remove a deployment run
-     * 
+     *
      * @param deploymentKey
      */
     public synchronized void removeDeploymentRun(DeploymentId deploymentKey) {
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/WorkQueue.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/WorkQueue.java
index e0d0eb0..da341a7 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/WorkQueue.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/WorkQueue.java
@@ -36,8 +36,14 @@
     private boolean stopped;
     private AtomicInteger enqueueCount;
     private AtomicInteger dequeueCount;
+    private int threadPriority = Thread.MAX_PRIORITY;
 
-    public WorkQueue() {
+    public WorkQueue(int threadPriority) {
+        if (threadPriority != Thread.MAX_PRIORITY && threadPriority != Thread.NORM_PRIORITY
+                && threadPriority != Thread.MIN_PRIORITY) {
+            throw new IllegalArgumentException("Illegal thread priority number.");
+        }
+        this.threadPriority = threadPriority;
         queue = new LinkedBlockingQueue<AbstractWork>();
         thread = new WorkerThread();
         stopSemaphore = new Semaphore(1);
@@ -96,7 +102,7 @@
     private class WorkerThread extends Thread {
         WorkerThread() {
             setDaemon(true);
-            setPriority(MAX_PRIORITY);
+            setPriority(threadPriority);
         }
 
         @Override
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index e7689d4..ab0f16b 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -26,6 +26,7 @@
 import java.lang.management.OperatingSystemMXBean;
 import java.lang.management.RuntimeMXBean;
 import java.lang.management.ThreadMXBean;
+import java.lang.reflect.Field;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Hashtable;
@@ -42,7 +43,6 @@
 
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.commons.lang3.mutable.MutableObject;
-
 import org.apache.hyracks.api.application.INCApplicationEntryPoint;
 import org.apache.hyracks.api.client.NodeControllerInfo;
 import org.apache.hyracks.api.comm.NetworkAddress;
@@ -170,11 +170,11 @@
             throw new Exception("id not set");
         }
         partitionManager = new PartitionManager(this);
-        netManager = new NetworkManager(ncConfig.dataIPAddress, ncConfig.dataPort, partitionManager, ncConfig.nNetThreads,
-                                        ncConfig.nNetBuffers, ncConfig.dataPublicIPAddress, ncConfig.dataPublicPort);
+        netManager = new NetworkManager(ncConfig.dataIPAddress, ncConfig.dataPort, partitionManager,
+                ncConfig.nNetThreads, ncConfig.nNetBuffers, ncConfig.dataPublicIPAddress, ncConfig.dataPublicPort);
 
         lccm = new LifeCycleComponentManager();
-        queue = new WorkQueue();
+        queue = new WorkQueue(Thread.NORM_PRIORITY); // Reserves MAX_PRIORITY of the heartbeat thread.
         jobletMap = new Hashtable<JobId, Joblet>();
         timer = new Timer(true);
         serverCtx = new ServerContext(ServerContext.ServerType.NODE_CONTROLLER, new File(new File(
@@ -243,11 +243,11 @@
 
     private void init() throws Exception {
         ctx.getIOManager().setExecutor(executor);
-        datasetPartitionManager = new DatasetPartitionManager
-            (this, executor, ncConfig.resultManagerMemory, ncConfig.resultTTL, ncConfig.resultSweepThreshold);
-        datasetNetworkManager = new DatasetNetworkManager
-            (ncConfig.resultIPAddress, ncConfig.resultPort, datasetPartitionManager,
-             ncConfig.nNetThreads, ncConfig.nNetBuffers, ncConfig.resultPublicIPAddress, ncConfig.resultPublicPort);
+        datasetPartitionManager = new DatasetPartitionManager(this, executor, ncConfig.resultManagerMemory,
+                ncConfig.resultTTL, ncConfig.resultSweepThreshold);
+        datasetNetworkManager = new DatasetNetworkManager(ncConfig.resultIPAddress, ncConfig.resultPort,
+                datasetPartitionManager, ncConfig.nNetThreads, ncConfig.nNetBuffers, ncConfig.resultPublicIPAddress,
+                ncConfig.resultPublicPort);
     }
 
     @Override
@@ -273,12 +273,11 @@
         if (ncConfig.dataPublicIPAddress != null) {
             netAddress = new NetworkAddress(ncConfig.dataPublicIPAddress, ncConfig.dataPublicPort);
         }
-        ccs.registerNode(new NodeRegistration(ipc.getSocketAddress(), id, ncConfig, netAddress,
-                datasetAddress, osMXBean.getName(), osMXBean.getArch(), osMXBean
-                        .getVersion(), osMXBean.getAvailableProcessors(), runtimeMXBean.getVmName(), runtimeMXBean
-                        .getVmVersion(), runtimeMXBean.getVmVendor(), runtimeMXBean.getClassPath(), runtimeMXBean
-                        .getLibraryPath(), runtimeMXBean.getBootClassPath(), runtimeMXBean.getInputArguments(),
-                runtimeMXBean.getSystemProperties(), hbSchema));
+        ccs.registerNode(new NodeRegistration(ipc.getSocketAddress(), id, ncConfig, netAddress, datasetAddress,
+                osMXBean.getName(), osMXBean.getArch(), osMXBean.getVersion(), osMXBean.getAvailableProcessors(),
+                runtimeMXBean.getVmName(), runtimeMXBean.getVmVersion(), runtimeMXBean.getVmVendor(), runtimeMXBean
+                        .getClassPath(), runtimeMXBean.getLibraryPath(), runtimeMXBean.getBootClassPath(),
+                runtimeMXBean.getInputArguments(), runtimeMXBean.getSystemProperties(), hbSchema));
 
         synchronized (this) {
             while (registrationPending) {
@@ -294,6 +293,11 @@
 
         heartbeatTask = new HeartbeatTask(ccs);
 
+        // Use reflection to set the priority of the timer thread.
+        Field threadField = timer.getClass().getDeclaredField("thread");
+        threadField.setAccessible(true);
+        Thread timerThread = (Thread) threadField.get(timer); // The internal timer thread of the Timer object.
+        timerThread.setPriority(Thread.MAX_PRIORITY);
         // Schedule heartbeat generator.
         timer.schedule(heartbeatTask, 0, nodeParameters.getHeartbeatPeriod());
 
@@ -571,6 +575,7 @@
             this.nodeControllerService = ncAppEntryPoint;
         }
 
+        @Override
         public void run() {
             if (LOGGER.isLoggable(Level.INFO)) {
                 LOGGER.info("Shutdown hook in progress");
diff --git a/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java b/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
index dd3d2f9..e02e4f4 100644
--- a/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
+++ b/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
@@ -64,6 +64,7 @@
     IPCConnectionManager(IPCSystem system, InetSocketAddress socketAddress) throws IOException {
         this.system = system;
         this.networkThread = new NetworkThread();
+        this.networkThread.setPriority(Thread.MAX_PRIORITY);
         this.serverSocketChannel = ServerSocketChannel.open();
         serverSocketChannel.socket().setReuseAddress(true);
         serverSocketChannel.configureBlocking(false);
@@ -114,8 +115,8 @@
                 }
             } else if (attempt < retries) {
                 if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("Connection to " + remoteAddress +
-                            " failed (Attempt " + attempt + " of " + retries + ")");
+                    LOGGER.info("Connection to " + remoteAddress + " failed (Attempt " + attempt + " of " + retries
+                            + ")");
                     attempt++;
                     Thread.sleep(5000);
                 }
@@ -308,8 +309,7 @@
                                     if (!channel.finishConnect()) {
                                         throw new Exception("Connection did not finish");
                                     }
-                                }
-                                catch (Exception e) {
+                                } catch (Exception e) {
                                     e.printStackTrace();
                                     handle.setState(HandleState.CONNECT_FAILED);
                                     continue;
diff --git a/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java b/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java
index 42e19f7..4b6c22f 100644
--- a/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java
+++ b/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java
@@ -106,7 +106,7 @@
         public IOThread() throws IOException {
             super("TCPEndpoint IO Thread");
             setDaemon(true);
-            setPriority(MAX_PRIORITY);
+            setPriority(Thread.NORM_PRIORITY);
             this.pendingConnections = new ArrayList<InetSocketAddress>();
             this.workingPendingConnections = new ArrayList<InetSocketAddress>();
             this.incomingConnections = new ArrayList<SocketChannel>();
diff --git a/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java b/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
index 4f9d9ce..facd8c1 100644
--- a/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
+++ b/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
@@ -475,7 +475,7 @@
         private int cleanedCount = 0;
 
         public CleanerThread() {
-            setPriority(MAX_PRIORITY);
+            setPriority(Thread.NORM_PRIORITY);
             setDaemon(true);
         }