[NO ISSUE][RT] Improve logging in task related works

Change-Id: Ia4e24a95aaac37b1d3d0d9a35266109ae0315293
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2803
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
index b6b3e40..7dc636c 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
@@ -221,17 +221,17 @@
         JobId jobId = run.getJobId();
         Throwable caughtException = null;
         CCServiceContext serviceCtx = ccs.getContext();
-        if (serviceCtx != null) {
-            try {
-                serviceCtx.notifyJobFinish(jobId, run.getPendingStatus(), run.getPendingExceptions());
-            } catch (Exception e) {
-                LOGGER.error("Exception notifying job finish {}", jobId, e);
-                caughtException = e;
-            }
+        try {
+            serviceCtx.notifyJobFinish(jobId, run.getPendingStatus(), run.getPendingExceptions());
+        } catch (Exception e) {
+            LOGGER.error("Exception notifying job finish {}", jobId, e);
+            caughtException = e;
         }
         run.setStatus(run.getPendingStatus(), run.getPendingExceptions());
         run.setEndTime(System.currentTimeMillis());
-        activeRunMap.remove(jobId);
+        if (activeRunMap.remove(jobId) == null) {
+            LOGGER.warn("Job {} was not found running but is getting archived and capacity released", jobId);
+        }
         runMapArchive.put(jobId, run);
         runMapHistory.put(jobId, run.getExceptions());
 
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
index d7b930c..23a5abb 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
@@ -290,7 +290,7 @@
                     .schedule(new NotifyTaskFailureWork(ncs, this, exceptions, joblet.getJobId(), taskAttemptId));
             return;
         }
-        ct.setName(displayName + ":" + taskAttemptId + ":" + 0);
+        ct.setName(displayName + ":" + joblet.getJobId() + ":" + taskAttemptId + ":" + 0);
         try {
             Throwable operatorException = null;
             try {
@@ -469,4 +469,10 @@
     public boolean isCompleted() {
         return completed;
     }
+
+    @Override
+    public String toString() {
+        return "{ \"class\" : \"" + getClass().getSimpleName() + "\", \"node\" : \"" + ncs.getId() + "\" \"jobId\" : \""
+                + joblet.getJobId() + "\", \"taskId\" : \"" + taskAttemptId + "\" }";
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskCompleteWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskCompleteWork.java
index 614a9e0..554c660 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskCompleteWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskCompleteWork.java
@@ -51,6 +51,7 @@
 
     @Override
     public String toString() {
-        return getClass().getSimpleName() + ":" + task.getTaskAttemptId();
+        return getName() + ": [" + ncs.getId() + "[" + task.getJoblet().getJobId() + ":" + task.getTaskAttemptId()
+                + "]";
     }
 }
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java
index f0b68a0..ac80afa 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java
@@ -64,4 +64,9 @@
             task.getJoblet().removeTask(task);
         }
     }
+
+    @Override
+    public String toString() {
+        return getName() + ": [" + ncs.getId() + "[" + jobId + ":" + taskId + "]";
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/InputChannelFrameReader.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/InputChannelFrameReader.java
index 0efed6f..bf9f575 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/InputChannelFrameReader.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/InputChannelFrameReader.java
@@ -27,8 +27,11 @@
 import org.apache.hyracks.api.comm.IFrameReader;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
 public class InputChannelFrameReader implements IFrameReader, IInputChannelMonitor {
+    private static final Logger LOGGER = LogManager.getLogger();
     private final IInputChannel channel;
 
     private int availableFrames;
@@ -59,6 +62,7 @@
         if (failed) {
             // Do not throw exception here to allow the root cause exception gets propagated to the master first.
             // Return false to allow the nextFrame(...) call to be a non-op.
+            LOGGER.warn("Sender failed.. returning silently");
             return false;
         }
         if (availableFrames <= 0 && eos) {
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/NonDeterministicChannelReader.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/NonDeterministicChannelReader.java
index b6f7cad..3c0a06b 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/NonDeterministicChannelReader.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/NonDeterministicChannelReader.java
@@ -106,6 +106,7 @@
                 return lastReadSender;
             }
             if (!failSenders.isEmpty()) {
+                LOGGER.warn("Sender failed.. returning silently");
                 // Do not throw exception here to allow the root cause exception gets propagated to the master first.
                 // Return a negative value to allow the nextFrame(...) call to be a non-op.
                 return -1;
@@ -143,10 +144,8 @@
     public synchronized void notifyFailure(IInputChannel channel) {
         PartitionId pid = (PartitionId) channel.getAttachment();
         int senderIndex = pid.getSenderIndex();
-        if (LOGGER.isDebugEnabled()) {
-            LOGGER.debug("Failure: " + pid.getConnectorDescriptorId() + " sender: " + senderIndex + " receiver: "
-                    + pid.getReceiverIndex());
-        }
+        LOGGER.warn("Failure: " + pid.getConnectorDescriptorId() + " sender: " + senderIndex + " receiver: "
+                + pid.getReceiverIndex());
         failSenders.set(senderIndex);
         eosSenders.set(senderIndex);
         notifyAll();