[NO ISSUE][OTH] Logging enhancements + query job logging

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:
- Add context to cancel request/response.
- Reduce some JobWork to TRACE.
- Avoid NPE when closing the pipeline.
- Add method to get Job Queue size.
- Add method to IJobCapacityController to get cluster
  current capacity for logging.
- Remove not useful logs.

Change-Id: Iec2c7af1b14bf99b1d11b4178be66da860dfcbf4
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18318
Reviewed-by: Michael Blow <mblow@apache.org>
Tested-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
index 626b938..63d8846 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
@@ -153,9 +153,8 @@
     @Override
     public synchronized void notify(ActiveEvent event) {
         try {
-            if (LOGGER.isEnabled(level)) {
-                LOGGER.log(level, "EventListener is notified.");
-            }
+            LOGGER.debug("CC handling event {}; state={}, prev state={}, suspended={}", event, state, prevState,
+                    suspended);
             ActiveEvent.Kind eventKind = event.getEventKind();
             switch (eventKind) {
                 case JOB_CREATED:
@@ -194,26 +193,21 @@
 
     @SuppressWarnings("unchecked")
     protected void finish(ActiveEvent event) throws HyracksDataException {
-        if (LOGGER.isEnabled(level)) {
-            LOGGER.log(level, "Active job {} finished", jobId);
-        }
         JobId lastJobId = jobId;
+        Pair<JobStatus, List<Exception>> status = (Pair<JobStatus, List<Exception>>) event.getEventObject();
         if (numRegistered != numDeRegistered) {
             LOGGER.log(Level.WARN,
-                    "Active job {} finished with reported runtime registrations = {} and deregistrations = {}", jobId,
-                    numRegistered, numDeRegistered);
+                    "ingestion job {} finished with status={}, reported runtime registrations={}, deregistrations={}",
+                    jobId, status, numRegistered, numDeRegistered);
         }
         jobId = null;
-        Pair<JobStatus, List<Exception>> status = (Pair<JobStatus, List<Exception>>) event.getEventObject();
         JobStatus jobStatus = status.getLeft();
         List<Exception> exceptions = status.getRight();
-        if (LOGGER.isEnabled(level)) {
-            LOGGER.log(level, "Active job {} finished with status {}", lastJobId, jobStatus);
-        }
+        LOGGER.debug("ingestion job {} finished with status {}", lastJobId, jobStatus);
         if (!jobSuccessfullyTerminated(jobStatus)) {
             jobFailure = exceptions.isEmpty() ? new RuntimeDataException(ErrorCode.UNREPORTED_TASK_FAILURE_EXCEPTION)
                     : exceptions.get(0);
-            LOGGER.error("Active Job {} failed", lastJobId, jobFailure);
+            LOGGER.error("ingestion job {} failed", lastJobId, jobFailure);
             setState((state == ActivityState.STOPPING || state == ActivityState.CANCELLING) ? ActivityState.STOPPED
                     : ActivityState.TEMPORARILY_FAILED);
             if (prevState == ActivityState.RUNNING) {
@@ -371,16 +365,14 @@
 
     @Override
     public synchronized void recover() {
-        if (LOGGER.isEnabled(level)) {
-            LOGGER.log(level, "Recover is called on {}", entityId);
-        }
         if (retryPolicyFactory == NoRetryPolicyFactory.INSTANCE) {
-            LOGGER.log(level, "But it has no recovery policy, so it is set to permanent failure");
+            LOGGER.debug("recover is called on {} w/o recovery policy; setting to permanent failure", entityId);
             setState(ActivityState.STOPPED);
         } else {
+            LOGGER.debug("recover is called on {}", entityId);
             ExecutorService executor = appCtx.getServiceContext().getControllerService().getExecutor();
             setState(ActivityState.TEMPORARILY_FAILED);
-            LOGGER.log(level, "Recovery task has been submitted");
+            LOGGER.debug("recovery task has been submitted");
             rt = createRecoveryTask();
             executor.submit(rt.recover());
         }
@@ -481,13 +473,9 @@
         try {
             Thread.currentThread().setName(nameBefore + " : WaitForCompletionForJobId: " + jobId);
             sendStopMessages(metadataProvider, timeout, unit);
-            if (LOGGER.isDebugEnabled()) {
-                LOGGER.debug("Waiting for its state to become " + waitFor);
-            }
+            LOGGER.debug("waiting for {} to become {}", jobId, waitFor);
             subscriber.sync();
-            if (LOGGER.isDebugEnabled()) {
-                LOGGER.debug("Disconnect has been completed " + waitFor);
-            }
+            LOGGER.debug("disconnect has been completed {}", waitFor);
         } catch (InterruptedException ie) {
             forceStop(subscriber, ie);
             Thread.currentThread().interrupt();
@@ -513,13 +501,8 @@
         ICCMessageBroker messageBroker = (ICCMessageBroker) applicationCtx.getServiceContext().getMessageBroker();
         AlgebricksAbsolutePartitionConstraint runtimeLocations = getLocations();
         int partition = 0;
-        if (LOGGER.isInfoEnabled()) {
-            LOGGER.log(Level.INFO, "Sending stop messages to " + runtimeLocations);
-        }
+        LOGGER.log(Level.INFO, "Sending stop messages to {}", runtimeLocations);
         for (String location : runtimeLocations.getLocations()) {
-            if (LOGGER.isInfoEnabled()) {
-                LOGGER.log(Level.INFO, "Sending to " + location);
-            }
             ActiveRuntimeId runtimeId = getActiveRuntimeId(partition++);
             messageBroker.sendApplicationMessageToNC(new ActiveManagerMessage(ActiveManagerMessage.Kind.STOP_ACTIVITY,
                     runtimeId, new StopRuntimeParameters(timeout, unit)), location);
@@ -581,14 +564,10 @@
         WaitForStateSubscriber subscriber;
         Future<Void> suspendTask;
         synchronized (this) {
-            if (LOGGER.isEnabled(level)) {
-                LOGGER.log(level, "suspending entity " + entityId);
-                LOGGER.log(level, "Waiting for ongoing activities");
-            }
+            LOGGER.log(level, "{} Suspending entity {}", jobId, entityId);
+            LOGGER.log(level, "{} Waiting for ongoing activities", jobId);
             waitForNonTransitionState();
-            if (LOGGER.isEnabled(level)) {
-                LOGGER.log(level, "Proceeding with suspension. Current state is " + state);
-            }
+            LOGGER.log(level, "{} Proceeding with suspension. Current state is {}", jobId, state);
             if (state == ActivityState.STOPPED) {
                 suspended = true;
                 return;
@@ -609,12 +588,12 @@
                         doSuspend(metadataProvider);
                         return null;
                     });
-            LOGGER.log(level, "Suspension task has been submitted");
+            LOGGER.log(level, "{} Suspension task has been submitted", jobId);
         }
         try {
-            LOGGER.log(level, "Waiting for suspension task to complete");
+            LOGGER.log(level, "{} Waiting for suspension task to complete", jobId);
             suspendTask.get();
-            LOGGER.log(level, "waiting for state to become SUSPENDED or TEMPORARILY_FAILED");
+            LOGGER.log(level, "{} Waiting for state to become SUSPENDED or TEMPORARILY_FAILED", jobId);
             subscriber.sync();
             suspended = true;
         } catch (Exception e) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
index 3c277d5..1f31698 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
@@ -92,16 +92,14 @@
 
     @Override
     public void notifyJobCreation(JobId jobId, JobSpecification jobSpecification) throws HyracksDataException {
-        if (LOGGER.isEnabled(level)) {
-            LOGGER.log(level, "notifyJobCreation was called for job {}", jobId);
-        }
         Object property = jobSpecification.getProperty(ACTIVE_ENTITY_PROPERTY_NAME);
         if (!(property instanceof EntityId)) {
-            if (LOGGER.isEnabled(level)) {
-                LOGGER.log(level, "Job {} is not of type active job. property found to be {}", jobId, property);
+            if (property != null) {
+                LOGGER.debug("{} is not an active job. job property={}", jobId, property);
             }
             return;
         }
+        LOGGER.debug("notified of ingestion job creation {}", jobId);
         EntityId entityId = (EntityId) property;
         monitorJob(jobId, entityId);
         boolean found = jobId2EntityId.get(jobId) != null;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java
index 65d1039..04e661b 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java
@@ -82,4 +82,9 @@
         }
     }
 
+    @Override
+    public String toString() {
+        return String.format("%s(id=%s, uuid=%s, contextId=%s, node=%s)", getClass().getSimpleName(), reqId, uuid,
+                contextId, nodeId);
+    }
 }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryResponse.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryResponse.java
index d65ae31..a711b73 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryResponse.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryResponse.java
@@ -49,4 +49,8 @@
         return status;
     }
 
+    @Override
+    public String toString() {
+        return String.format("%s(id=%s, status=%s)", getClass().getSimpleName(), reqId, status);
+    }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/SleepDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/SleepDescriptor.java
index ac87f7e..ef348fe 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/SleepDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/SleepDescriptor.java
@@ -64,16 +64,16 @@
                         final long time = ATypeHierarchy.getLongValue(getIdentifier().getName(), 1, bytes, offset);
 
                         try {
-                            if (LOGGER.isInfoEnabled()) {
-                                LOGGER.log(Level.INFO,
+                            if (LOGGER.isTraceEnabled()) {
+                                LOGGER.log(Level.TRACE,
                                         ctx.getTaskContext().getTaskAttemptId() + " sleeping for " + time + " ms");
                             }
                             Thread.sleep(time);
                         } catch (InterruptedException e) {
                             Thread.currentThread().interrupt();
                         } finally {
-                            if (LOGGER.isInfoEnabled()) {
-                                LOGGER.log(Level.INFO,
+                            if (LOGGER.isTraceEnabled()) {
+                                LOGGER.log(Level.TRACE,
                                         ctx.getTaskContext().getTaskAttemptId() + " done sleeping for " + time + " ms");
                             }
                         }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java
index b123a5e..ae903d1 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java
@@ -77,6 +77,11 @@
         ensureMaxCapacity();
     }
 
+    @Override
+    public IReadOnlyClusterCapacity getClusterCapacity() {
+        return resourceManager.getCurrentCapacity();
+    }
+
     private void ensureMaxCapacity() {
         final IClusterCapacity currentCapacity = resourceManager.getCurrentCapacity();
         final IReadOnlyClusterCapacity maximumCapacity = resourceManager.getMaximumCapacity();
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
index 9f4541f5..0c74260 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
@@ -74,7 +74,7 @@
     }
 
     protected void flushIfNotFailed() throws HyracksDataException {
-        if (!failed && appender.getTupleCount() > 0) {
+        if (!failed && appender != null && appender.getTupleCount() > 0) {
             flushAndReset();
         }
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/DefaultJobCapacityController.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/DefaultJobCapacityController.java
index 9e38a20..b18bcb1 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/DefaultJobCapacityController.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/DefaultJobCapacityController.java
@@ -24,6 +24,11 @@
 public class DefaultJobCapacityController implements IJobCapacityController {
 
     public static final DefaultJobCapacityController INSTANCE = new DefaultJobCapacityController();
+    private static final IClusterCapacity CAPACITY = new ClusterCapacity();
+    static {
+        CAPACITY.setAggregatedCores(Integer.MAX_VALUE);
+        CAPACITY.setAggregatedMemoryByteSize(Long.MAX_VALUE);
+    }
 
     private DefaultJobCapacityController() {
     }
@@ -37,4 +42,9 @@
     public void release(JobSpecification job) {
         // No operation here.
     }
+
+    @Override
+    public IReadOnlyClusterCapacity getClusterCapacity() {
+        return CAPACITY;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IJobCapacityController.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IJobCapacityController.java
index 5fa4bd9..f88baa2 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IJobCapacityController.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IJobCapacityController.java
@@ -57,4 +57,10 @@
      */
     void release(JobSpecification job);
 
+    /**
+     * The cluster current capacity.
+     *
+     * @return the cluster current capacity.
+     */
+    IReadOnlyClusterCapacity getClusterCapacity();
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
index 33bdd05..2347449 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
@@ -38,7 +38,9 @@
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.job.resource.IClusterCapacity;
 import org.apache.hyracks.api.job.resource.IJobCapacityController;
+import org.apache.hyracks.api.job.resource.IReadOnlyClusterCapacity;
 import org.apache.hyracks.api.util.ExceptionUtils;
 import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.cc.NodeControllerState;
@@ -312,6 +314,7 @@
         run.setStartTime(System.currentTimeMillis());
         run.setStartTimeZoneId(ZoneId.systemDefault().getId());
         JobId jobId = run.getJobId();
+        logJobCapacity(run, "running");
         activeRunMap.put(jobId, run);
         run.setStatus(JobStatus.RUNNING, null);
         executeJobInternal(run);
@@ -319,6 +322,7 @@
 
     // Queue a job when the required capacity for the job is not met.
     private void queueJob(JobRun jobRun) throws HyracksException {
+        logJobCapacity(jobRun, "queueing");
         jobRun.setStatus(JobStatus.PENDING, null);
         jobQueue.add(jobRun);
     }
@@ -354,5 +358,23 @@
     private void releaseJobCapacity(JobRun jobRun) {
         final JobSpecification job = jobRun.getJobSpecification();
         jobCapacityController.release(job);
+        logJobCapacity(jobRun, "released");
+    }
+
+    private void logJobCapacity(JobRun jobRun, String jobStateDesc) {
+        IClusterCapacity requiredResources = jobRun.getJobSpecification().getRequiredClusterCapacity();
+        if (requiredResources == null) {
+            return;
+        }
+        long requiredMemory = requiredResources.getAggregatedMemoryByteSize();
+        int requiredCPUs = requiredResources.getAggregatedCores();
+        if (requiredMemory == 0 && requiredCPUs == 0) {
+            return;
+        }
+        IReadOnlyClusterCapacity clusterCapacity = jobCapacityController.getClusterCapacity();
+        LOGGER.info("{} {}, memory={}, cpu={}, (new) cluster memory={}, cpu={}, currently running={}, queued={}",
+                jobStateDesc, jobRun.getJobId(), requiredMemory, requiredCPUs,
+                clusterCapacity.getAggregatedMemoryByteSize(), clusterCapacity.getAggregatedCores(),
+                getRunningJobsCount(), jobQueue.size());
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java
index 260c6b9..38277c2 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java
@@ -121,4 +121,9 @@
     public void clear() {
         jobListMap.clear();
     }
+
+    @Override
+    public int size() {
+        return jobListMap.size();
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/IJobQueue.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/IJobQueue.java
index be40883..1f2c29a 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/IJobQueue.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/IJobQueue.java
@@ -73,4 +73,11 @@
      * Clears the job queue
      */
     void clear();
+
+    /**
+     * Returns the number of queued jobs.
+     *
+     * @return the number of queued jobs.
+     */
+    int size();
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ApplicationMessageWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ApplicationMessageWork.java
index 771832e..4f77914 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ApplicationMessageWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ApplicationMessageWork.java
@@ -25,6 +25,7 @@
 import org.apache.hyracks.api.messages.IMessage;
 import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.common.deployment.DeploymentUtils;
+import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -34,8 +35,8 @@
 public class ApplicationMessageWork extends AbstractHeartbeatWork {
 
     private static final Logger LOGGER = LogManager.getLogger();
-    private byte[] message;
-    private DeploymentId deploymentId;
+    private final byte[] message;
+    private final DeploymentId deploymentId;
 
     public ApplicationMessageWork(ClusterControllerService ccs, byte[] message, DeploymentId deploymentId,
             String nodeId) {
@@ -61,6 +62,11 @@
         return getName() + ": nodeID: " + nodeId;
     }
 
+    @Override
+    public Level logLevel() {
+        return Level.TRACE;
+    }
+
     private static void notifyMessageBroker(ICCServiceContext ctx, IMessage msg, String nodeId) {
         final ExecutorService executor = ctx.getControllerService().getExecutor();
         executor.execute(() -> {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeControllersInfoWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeControllersInfoWork.java
index c36b887..73c0c7d 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeControllersInfoWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeControllersInfoWork.java
@@ -24,10 +24,11 @@
 import org.apache.hyracks.control.cc.cluster.INodeManager;
 import org.apache.hyracks.control.common.work.AbstractWork;
 import org.apache.hyracks.control.common.work.IResultCallback;
+import org.apache.logging.log4j.Level;
 
 public class GetNodeControllersInfoWork extends AbstractWork {
     private final INodeManager nodeManager;
-    private IResultCallback<Map<String, NodeControllerInfo>> callback;
+    private final IResultCallback<Map<String, NodeControllerInfo>> callback;
 
     public GetNodeControllersInfoWork(INodeManager nodeManager,
             IResultCallback<Map<String, NodeControllerInfo>> callback) {
@@ -39,4 +40,9 @@
     public void run() {
         callback.setValue(nodeManager.getNodeControllerInfoMap());
     }
+
+    @Override
+    public Level logLevel() {
+        return Level.TRACE;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RemoveDeadNodesWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RemoveDeadNodesWork.java
index ee10669..cbdf98a 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RemoveDeadNodesWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RemoveDeadNodesWork.java
@@ -71,6 +71,6 @@
 
     @Override
     public Level logLevel() {
-        return Level.DEBUG;
+        return Level.TRACE;
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
index eee8950..e52e3ac 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
@@ -52,7 +52,6 @@
     private boolean failed;
     protected boolean flushRequest;
     private boolean deallocated;
-    private Level openCloseLevel = Level.DEBUG;
     private Thread dataConsumerThread;
 
     public MaterializingPipelinedPartition(IHyracksTaskContext ctx, PartitionManager manager, PartitionId pid,
@@ -181,9 +180,6 @@
 
     @Override
     public void open() throws HyracksDataException {
-        if (LOGGER.isEnabled(openCloseLevel)) {
-            LOGGER.log(openCloseLevel, "open(" + pid + " by " + taId);
-        }
         size = 0;
         eos = false;
         failed = false;
@@ -215,9 +211,6 @@
 
     @Override
     public void close() throws HyracksDataException {
-        if (LOGGER.isEnabled(openCloseLevel)) {
-            LOGGER.log(openCloseLevel, "close(" + pid + " by " + taId);
-        }
         if (writeHandle != null) {
             ctx.getIoManager().close(writeHandle);
         }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/ApplicationMessageWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/ApplicationMessageWork.java
index 6d4f173..d1360d8 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/ApplicationMessageWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/ApplicationMessageWork.java
@@ -30,10 +30,10 @@
 
 public class ApplicationMessageWork extends AbstractWork {
     private static final Logger LOGGER = LogManager.getLogger();
-    private byte[] message;
-    private DeploymentId deploymentId;
-    private String nodeId;
-    private NodeControllerService ncs;
+    private final byte[] message;
+    private final DeploymentId deploymentId;
+    private final String nodeId;
+    private final NodeControllerService ncs;
 
     public ApplicationMessageWork(NodeControllerService ncs, byte[] message, DeploymentId deploymentId, String nodeId) {
         this.ncs = ncs;
@@ -62,4 +62,9 @@
     public String toString() {
         return getName() + ": nodeId: " + nodeId;
     }
+
+    @Override
+    public Level logLevel() {
+        return Level.TRACE;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java
index 6dd4307..cd79da7 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java
@@ -23,6 +23,7 @@
 import org.apache.hyracks.api.dataflow.TaskAttemptId;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.result.IResultPartitionManager;
+import org.apache.hyracks.api.util.ErrorMessageUtil;
 import org.apache.hyracks.control.common.work.AbstractWork;
 import org.apache.hyracks.control.nc.NodeControllerService;
 import org.apache.hyracks.control.nc.Task;
@@ -65,6 +66,8 @@
 
     @Override
     public String toString() {
-        return getName() + ": [" + ncs.getId() + "[" + jobId + ":" + taskId + "]";
+        return getName() + ": [" + ncs.getId() + "[" + jobId + ":" + taskId + "]"
+                + ((exceptions != null && !exceptions.isEmpty())
+                        ? " " + ErrorMessageUtil.getCauseMessage(exceptions.get(0)) : "");
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
index 555e8fb..0f31491 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
@@ -319,9 +319,6 @@
                         if (!failed) {
                             state.hybridHJ.closeBuild();
                             ctx.setStateObject(state);
-                            if (LOGGER.isTraceEnabled()) {
-                                LOGGER.trace("OptimizedHybridHashJoin closed its build phase");
-                            }
                         } else {
                             state.hybridHJ.clearBuildTempFiles();
                         }
@@ -402,10 +399,6 @@
 
                     writer.open();
                     state.hybridHJ.initProbe(probComp);
-
-                    if (LOGGER.isDebugEnabled()) {
-                        LOGGER.debug("OptimizedHybridHashJoin is starting the probe phase.");
-                    }
                 }
 
                 @Override
@@ -416,7 +409,7 @@
                 @Override
                 public void fail() throws HyracksDataException {
                     failed = true;
-                    if (state.hybridHJ != null) {
+                    if (state != null && state.hybridHJ != null) {
                         state.hybridHJ.fail();
                     }
                     writer.fail();
@@ -427,12 +420,13 @@
                     if (failed) {
                         try {
                             // Clear temp files if fail() was called.
-                            state.hybridHJ.clearBuildTempFiles();
-                            state.hybridHJ.clearProbeTempFiles();
+                            if (state != null && state.hybridHJ != null) {
+                                state.hybridHJ.clearBuildTempFiles();
+                                state.hybridHJ.clearProbeTempFiles();
+                            }
                         } finally {
                             writer.close(); // writer should always be closed.
                         }
-                        logProbeComplete();
                         return;
                     }
                     try {
@@ -477,17 +471,7 @@
                         // Re-throw the whatever is caught.
                         throw e;
                     } finally {
-                        try {
-                            logProbeComplete();
-                        } finally {
-                            writer.close();
-                        }
-                    }
-                }
-
-                private void logProbeComplete() {
-                    if (LOGGER.isDebugEnabled()) {
-                        LOGGER.debug("OptimizedHybridHashJoin closed its probe phase");
+                        writer.close();
                     }
                 }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TupleSorterHeapSort.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TupleSorterHeapSort.java
index 08f15b3..a1704ec 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TupleSorterHeapSort.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TupleSorterHeapSort.java
@@ -41,13 +41,9 @@
 import org.apache.hyracks.dataflow.std.structures.IResetableComparableFactory;
 import org.apache.hyracks.dataflow.std.structures.MaxHeap;
 import org.apache.hyracks.dataflow.std.structures.TuplePointer;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
 
 public class TupleSorterHeapSort implements ITupleSorter {
 
-    private static final Logger LOGGER = LogManager.getLogger();
-
     class HeapEntryFactory implements IResetableComparableFactory<HeapEntry> {
         @Override
         public IResetableComparable<HeapEntry> createResetableComparable() {
@@ -288,7 +284,6 @@
         int maxFrameSize = outputFrame.getFrameSize();
         int numEntries = heap.getNumEntries();
         IResetableComparable[] entries = heap.getEntries();
-        int io = 0;
         for (int i = 0; i < numEntries; i++) {
             HeapEntry minEntry = (HeapEntry) entries[i];
             bufferAccessor1.reset(minEntry.tuplePointer);
@@ -296,14 +291,10 @@
                     bufferAccessor1.getTupleStartOffset(), bufferAccessor1.getTupleLength());
             if (flushed > 0) {
                 maxFrameSize = Math.max(maxFrameSize, flushed);
-                io++;
             }
         }
         maxFrameSize = Math.max(maxFrameSize, outputFrame.getFrameSize());
         outputAppender.write(writer, true);
-        if (LOGGER.isInfoEnabled()) {
-            LOGGER.info("Flushed records:" + numEntries + "; Flushed through " + (io + 1) + " frames");
-        }
         return maxFrameSize;
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
index be22b9c..7a75a0f 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
@@ -34,7 +34,9 @@
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.job.resource.ClusterCapacity;
 import org.apache.hyracks.api.job.resource.IJobCapacityController;
+import org.apache.hyracks.api.job.resource.IReadOnlyClusterCapacity;
 import org.apache.hyracks.api.result.IResultSet;
 import org.apache.hyracks.api.result.IResultSetReader;
 import org.apache.hyracks.client.result.ResultSet;
@@ -254,6 +256,14 @@
                 public void release(JobSpecification job) {
 
                 }
+
+                @Override
+                public IReadOnlyClusterCapacity getClusterCapacity() {
+                    ClusterCapacity clusterCapacity = new ClusterCapacity();
+                    clusterCapacity.setAggregatedMemoryByteSize(maxRAM);
+                    clusterCapacity.setAggregatedCores(Integer.MAX_VALUE);
+                    return clusterCapacity;
+                }
             };
         }
     }