Merge branch 'trinity' into 'master'

Change-Id: I6b458d2761d490355dde6e4238a15bd42a1e0d9d
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java
index 84a6488..f3e5268 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java
@@ -94,6 +94,15 @@
 
     @Override
     public ObjectNode asJson() {
+        return putJson();
+    }
+
+    @Override
+    public ObjectNode asRedactedJson() {
+        return putJson();
+    }
+
+    private ObjectNode putJson() {
         ObjectNode json = JSONUtil.createObject();
         json.put("uuid", requestReference.getUuid());
         json.put("requestTime", new ADateTime(requestReference.getTime()).toSimpleString());
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
index 9f2cbdd..31f1979 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
@@ -33,6 +33,7 @@
 import org.apache.hyracks.api.job.resource.IJobCapacityController;
 import org.apache.hyracks.api.job.resource.IReadOnlyClusterCapacity;
 import org.apache.hyracks.api.util.ExceptionUtils;
+import org.apache.hyracks.util.LogRedactionUtil;
 
 import com.fasterxml.jackson.databind.node.ObjectNode;
 
@@ -95,11 +96,21 @@
     @Override
     public ObjectNode asJson() {
         ObjectNode json = super.asJson();
-        putJobDetails(json);
-        json.put("statement", statement);
+        return asJson(json, false);
+    }
+
+    @Override
+    public ObjectNode asRedactedJson() {
+        ObjectNode json = super.asRedactedJson();
+        return asJson(json, true);
+    }
+
+    private ObjectNode asJson(ObjectNode json, boolean redact) {
+        putJobDetails(json, redact);
+        json.put("statement", redact ? LogRedactionUtil.statement(statement) : statement);
         json.put("clientContextID", clientContextId);
         if (plan != null) {
-            json.put("plan", plan);
+            json.put("plan", redact ? LogRedactionUtil.userData(plan) : plan);
         }
         return json;
     }
@@ -132,16 +143,16 @@
         return ExceptionUtils.unwrap(e).getMessage();
     }
 
-    private void putJobDetails(ObjectNode json) {
+    private void putJobDetails(ObjectNode json, boolean redact) {
         try {
             json.put("jobId", jobId != null ? jobId.toString() : null);
-            putJobState(json, jobState);
+            putJobState(json, jobState, redact);
         } catch (Throwable th) {
             // ignore
         }
     }
 
-    private static void putJobState(ObjectNode json, JobState state) {
+    private static void putJobState(ObjectNode json, JobState state, boolean redact) {
         AMutableDateTime dateTime = new AMutableDateTime(0);
         putTime(json, state.createTime, "jobCreateTime", dateTime);
         putTime(json, state.startTime, "jobStartTime", dateTime);
@@ -152,7 +163,7 @@
         json.put("jobRequiredCPUs", state.requiredCPUs);
         json.put("jobRequiredMemory", state.requiredMemoryInBytes);
         if (state.errorMsg != null) {
-            json.put("error", state.errorMsg);
+            json.put("error", redact ? LogRedactionUtil.userData(state.errorMsg) : state.errorMsg);
         }
     }
 
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractRequestsServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractRequestsServlet.java
index 285c4c8..b6c6a71 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractRequestsServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractRequestsServlet.java
@@ -35,6 +35,7 @@
 
 public abstract class AbstractRequestsServlet extends AbstractServlet {
 
+    public static final String REDACT_PARAM = "redact";
     protected final ICcApplicationContext appCtx;
 
     public AbstractRequestsServlet(ConcurrentMap<String, Object> ctx, ICcApplicationContext appCtx, String... paths) {
@@ -46,8 +47,15 @@
     protected void get(IServletRequest request, IServletResponse response) throws Exception {
         ArrayNode requestsJson = JSONUtil.createArray();
         Collection<IClientRequest> requests = getRequests();
-        for (IClientRequest req : requests) {
-            requestsJson.add(req.asJson());
+        String redact = request.getParameter(REDACT_PARAM);
+        if (Boolean.parseBoolean(redact)) {
+            for (IClientRequest req : requests) {
+                requestsJson.add(req.asRedactedJson());
+            }
+        } else {
+            for (IClientRequest req : requests) {
+                requestsJson.add(req.asJson());
+            }
         }
         HttpUtil.setContentType(response, HttpUtil.ContentType.APPLICATION_JSON, request);
         response.setStatus(HttpResponseStatus.OK);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java
index d0a2e74..a201ec8 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java
@@ -140,6 +140,7 @@
                         if (!cancelRecovery && listener.getState() == ActivityState.TEMPORARILY_FAILED) {
                             listener.setState(ActivityState.RECOVERING);
                             listener.doRecover(metadataProvider);
+                            listener.setRunning(metadataProvider, true);
                         }
                         LOGGER.log(level, "Recovery completed successfully");
                         return null;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.2.pollquery.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.2.pollquery.sqlpp
index 7eee42e..07f8eea 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.2.pollquery.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.2.pollquery.sqlpp
@@ -18,5 +18,5 @@
  */
  -- param client_context_id=ensure_running_query
  -- polltimeoutsecs=15
-SELECT rqst.cancellable, rqst.jobId, rqst.state, rqst.uuid FROM active_requests() rqst
+SELECT rqst.cancellable, rqst.jobId, rqst.state, rqst.uuid, rqst.jobStatus FROM active_requests() rqst
 WHERE rqst.clientContextID = 'sleep_async_query';
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/completed_requests/completed_requests.4.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/completed_requests/completed_requests.4.query.sqlpp
new file mode 100644
index 0000000..de35939
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/completed_requests/completed_requests.4.query.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ -- param client_context_id=ensure_completed_query
+ -- param ignoreextrafields=true
+SELECT VALUE (SELECT VALUE r FROM completed_requests() r
+WHERE r.state="completed" AND r.clientContextID = "completed_requests_query")[0];
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/active_requests/active_requests.2.regex b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/active_requests/active_requests.2.regex
index e31fe3b..170838e 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/active_requests/active_requests.2.regex
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/active_requests/active_requests.2.regex
@@ -1 +1 @@
-/\{ "cancellable": true, "jobId": "JID:.*", "state": "running", "uuid": ".*" \}/
\ No newline at end of file
+/\{ "cancellable": true, "jobId": "JID:.*", "state": "running", "uuid": ".*", "jobStatus": "RUNNING" \}/
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/completed_requests/completed_requests.4.regexjson b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/completed_requests/completed_requests.4.regexjson
new file mode 100644
index 0000000..15affd1
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/completed_requests/completed_requests.4.regexjson
@@ -0,0 +1,19 @@
+{
+    "cancellable": true,
+    "clientContextID": "completed_requests_query",
+    "elapsedTime": "R{.*}",
+    "jobCreateTime": "R{.*}",
+    "jobEndTime": "R{.*}",
+    "jobId": "R{.*}",
+    "jobQueueTime": "R{.*}",
+    "jobRequiredCPUs": "R{.*}",
+    "jobRequiredMemory": "R{.*}",
+    "jobStartTime": "R{.*}",
+    "jobStatus": "TERMINATED",
+    "node": "R{.*}",
+    "remoteAddr": "R{.*}",
+    "requestTime": "R{.*}",
+    "state": "completed",
+    "userAgent": "R{.*}",
+    "uuid": "R{.*}"
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
index 8d2c91a..ee518dd 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
@@ -104,6 +104,11 @@
     ObjectNode asJson();
 
     /**
+     * @return A redacted json node representation of this request
+     */
+    ObjectNode asRedactedJson();
+
+    /**
      * Called when the job is created.
      *
      * @param jobId the job id
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
index 5771201..9875651 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
@@ -18,9 +18,9 @@
  */
 package org.apache.asterix.runtime.utils;
 
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -30,7 +30,6 @@
 import org.apache.asterix.common.api.IClientRequest;
 import org.apache.asterix.common.api.IRequestTracker;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
-import org.apache.commons.collections4.queue.CircularFifoQueue;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.job.JobId;
@@ -42,14 +41,22 @@
 
     private final Map<String, IClientRequest> runningRequests = new ConcurrentHashMap<>();
     private final Map<String, IClientRequest> clientIdRequests = new ConcurrentHashMap<>();
-    private final CircularFifoQueue<IClientRequest> completedRequests;
+    private final Map<String, IClientRequest> completedRequests;
     private final ICcApplicationContext ccAppCtx;
     private final AtomicLong numRequests;
     private final AtomicLong numOfFailedRequests;
 
     public RequestTracker(ICcApplicationContext ccAppCtx) {
         this.ccAppCtx = ccAppCtx;
-        completedRequests = new CircularFifoQueue<>(ccAppCtx.getExternalProperties().getRequestsArchiveSize());
+        int archiveSize = ccAppCtx.getExternalProperties().getRequestsArchiveSize();
+        completedRequests = new LinkedHashMap<>(archiveSize) {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            protected boolean removeEldestEntry(Map.Entry<String, IClientRequest> eldest) {
+                return size() > archiveSize;
+            }
+        };
         numRequests = new AtomicLong(0);
         numOfFailedRequests = new AtomicLong(0);
     }
@@ -102,7 +109,7 @@
 
     @Override
     public synchronized Collection<IClientRequest> getCompletedRequests() {
-        return Collections.unmodifiableCollection(new ArrayList<>(completedRequests));
+        return Collections.unmodifiableCollection(completedRequests.values());
     }
 
     private void cancel(IClientRequest request) throws HyracksDataException {
@@ -122,7 +129,7 @@
     }
 
     private synchronized void archive(IClientRequest request) {
-        completedRequests.add(request);
+        completedRequests.put(request.getId(), request);
     }
 
     public long getTotalNumberOfRequests() {
@@ -139,13 +146,14 @@
         return numOfFailedRequests.get();
     }
 
+    @Override
     public void notifyJobCreation(JobId jobId, JobSpecification spec, IJobCapacityController.JobSubmissionStatus status)
             throws HyracksException {
         String requestId = spec.getRequestId();
         if (requestId != null) {
-            IClientRequest clientRequest = runningRequests.get(requestId);
-            if (clientRequest != null) {
-                clientRequest.jobCreated(jobId, spec.getRequiredClusterCapacity(), status);
+            IClientRequest request = getRequest(requestId);
+            if (request != null) {
+                request.jobCreated(jobId, spec.getRequiredClusterCapacity(), status);
             }
         }
     }
@@ -154,9 +162,9 @@
     public void notifyJobStart(JobId jobId, JobSpecification spec) throws HyracksException {
         String requestId = spec.getRequestId();
         if (requestId != null) {
-            IClientRequest clientRequest = runningRequests.get(requestId);
-            if (clientRequest != null) {
-                clientRequest.jobStarted(jobId);
+            IClientRequest request = getRequest(requestId);
+            if (request != null) {
+                request.jobStarted(jobId);
             }
         }
     }
@@ -166,10 +174,20 @@
             throws HyracksException {
         String requestId = spec.getRequestId();
         if (requestId != null) {
-            IClientRequest clientRequest = runningRequests.get(requestId);
-            if (clientRequest != null) {
-                clientRequest.jobFinished(jobId, jobStatus, exceptions);
+            IClientRequest request = getRequest(requestId);
+            if (request != null) {
+                request.jobFinished(jobId, jobStatus, exceptions);
             }
         }
     }
+
+    private IClientRequest getRequest(String requestId) {
+        IClientRequest clientRequest = runningRequests.get(requestId);
+        if (clientRequest != null) {
+            return clientRequest;
+        }
+        synchronized (this) {
+            return completedRequests.get(requestId);
+        }
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
index 1b0f377..b476993 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
@@ -539,7 +539,7 @@
 
     private void abortTaskCluster(TaskClusterAttempt tcAttempt,
             TaskClusterAttempt.TaskClusterStatus failedOrAbortedStatus) {
-        LOGGER.trace(() -> "Aborting task cluster: " + tcAttempt.getAttempt());
+        LOGGER.trace("Aborting task cluster: {}", tcAttempt.getAttempt());
         Set<TaskAttemptId> abortTaskIds = new HashSet<>();
         Map<String, List<TaskAttemptId>> abortTaskAttemptMap = new HashMap<>();
         for (TaskAttempt ta : tcAttempt.getTaskAttempts().values()) {
@@ -561,14 +561,12 @@
             }
         }
         final JobId jobId = jobRun.getJobId();
-        LOGGER.trace(() -> "Abort map for job: " + jobId + ": " + abortTaskAttemptMap);
+        LOGGER.trace("Abort map for job: {}: {}", jobId, abortTaskAttemptMap);
         INodeManager nodeManager = ccs.getNodeManager();
         abortTaskAttemptMap.forEach((key, abortTaskAttempts) -> {
             final NodeControllerState node = nodeManager.getNodeControllerState(key);
             if (node != null) {
-                if (LOGGER.isTraceEnabled()) {
-                    LOGGER.trace("Aborting: " + abortTaskAttempts + " at " + key);
-                }
+                LOGGER.trace("Aborting: {} at {}", abortTaskAttempts, key);
                 try {
                     node.getNodeController().abortTasks(jobId, abortTaskAttempts);
                 } catch (Exception e) {
@@ -579,8 +577,8 @@
         inProgressTaskClusters.remove(tcAttempt.getTaskCluster());
         TaskCluster tc = tcAttempt.getTaskCluster();
         PartitionMatchMaker pmm = jobRun.getPartitionMatchMaker();
-        pmm.removeUncommittedPartitions(tc.getProducedPartitions(), abortTaskIds);
-        pmm.removePartitionRequests(tc.getRequiredPartitions(), abortTaskIds);
+        pmm.removeUncommittedPartitions(tc.getProducedPartitions(), abortTaskIds, jobId);
+        pmm.removePartitionRequests(tc.getRequiredPartitions(), abortTaskIds, jobId);
 
         tcAttempt.setStatus(failedOrAbortedStatus);
         tcAttempt.setEndTime(System.currentTimeMillis());
@@ -683,7 +681,6 @@
      */
     public void notifyTaskFailure(TaskAttempt ta, List<Exception> exceptions) {
         try {
-            LOGGER.debug("Received failure notification for TaskAttempt " + ta.getTaskAttemptId());
             TaskAttemptId taId = ta.getTaskAttemptId();
             TaskCluster tc = ta.getTask().getTaskCluster();
             TaskClusterAttempt lastAttempt = findLastTaskClusterAttempt(tc);
@@ -696,7 +693,7 @@
                 LOGGER.trace(() -> "Marking TaskAttempt " + ta.getTaskAttemptId()
                         + " as failed and the number of max re-attempts = " + maxReattempts);
                 if (lastAttempt.getAttempt() >= maxReattempts || isCancelled()) {
-                    LOGGER.debug(() -> "Aborting the job of " + ta.getTaskAttemptId());
+                    LOGGER.debug("Aborting the job:{} of {}", jobRun.getJobId(), ta.getTaskAttemptId());
                     abortJob(exceptions, NoOpCallback.INSTANCE);
                     return;
                 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
index 9165953..3a954f4 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
@@ -145,7 +145,7 @@
         if (activeRunMap.containsKey(jobId)) {
             JobRun jobRun = activeRunMap.get(jobId);
             // The following call will abort all ongoing tasks and then consequently
-            // trigger JobCleanupWork and JobCleanupNotificationWork which will update the lifecyle of the job.
+            // trigger JobCleanupWork and JobCleanupNotificationWork which will update the lifecycle of the job.
             // Therefore, we do not remove the job out of activeRunMap here.
             jobRun.getExecutor().cancelJob(callback);
             incrementCancelledJobs();
@@ -157,7 +157,7 @@
             incrementCancelledJobs();
             List<Exception> exceptions =
                     Collections.singletonList(HyracksException.create(ErrorCode.JOB_CANCELED, jobId));
-            // Since the job has not been executed, we only need to update its status and lifecyle here.
+            // Since the job has not been executed, we only need to update its status and lifecycle here.
             jobRun.setStatus(JobStatus.FAILURE_BEFORE_EXECUTION, exceptions);
             runMapArchive.put(jobId, jobRun);
             runMapHistory.put(jobId, exceptions);
@@ -189,7 +189,6 @@
             return;
         }
         if (run.getPendingStatus() != null) {
-            LOGGER.warn("Ignoring duplicate cleanup for JobRun with id: {}", run::getJobId);
             return;
         }
         Set<String> targetNodes = run.getParticipatingNodeIds();
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/partitions/PartitionMatchMaker.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/partitions/PartitionMatchMaker.java
index ac29b53..8f91944 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/partitions/PartitionMatchMaker.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/partitions/PartitionMatchMaker.java
@@ -28,6 +28,7 @@
 
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hyracks.api.dataflow.TaskAttemptId;
+import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.partitions.PartitionId;
 import org.apache.hyracks.control.common.job.PartitionDescriptor;
 import org.apache.hyracks.control.common.job.PartitionRequest;
@@ -43,14 +44,13 @@
     private final Map<PartitionId, List<PartitionRequest>> partitionRequests;
 
     public PartitionMatchMaker() {
-        partitionDescriptors = new HashMap<PartitionId, List<PartitionDescriptor>>();
-        partitionRequests = new HashMap<PartitionId, List<PartitionRequest>>();
+        partitionDescriptors = new HashMap<>();
+        partitionRequests = new HashMap<>();
     }
 
     public List<Pair<PartitionDescriptor, PartitionRequest>> registerPartitionDescriptor(
             PartitionDescriptor partitionDescriptor) {
-        List<Pair<PartitionDescriptor, PartitionRequest>> matches =
-                new ArrayList<Pair<PartitionDescriptor, PartitionRequest>>();
+        List<Pair<PartitionDescriptor, PartitionRequest>> matches = new ArrayList<>();
         PartitionId pid = partitionDescriptor.getPartitionId();
         boolean matched = false;
         List<PartitionRequest> requests = partitionRequests.get(pid);
@@ -73,11 +73,7 @@
         }
 
         if (!matched) {
-            List<PartitionDescriptor> descriptors = partitionDescriptors.get(pid);
-            if (descriptors == null) {
-                descriptors = new ArrayList<PartitionDescriptor>();
-                partitionDescriptors.put(pid, descriptors);
-            }
+            List<PartitionDescriptor> descriptors = partitionDescriptors.computeIfAbsent(pid, k -> new ArrayList<>());
             descriptors.add(partitionDescriptor);
         }
 
@@ -108,11 +104,7 @@
         }
 
         if (match == null) {
-            List<PartitionRequest> requests = partitionRequests.get(pid);
-            if (requests == null) {
-                requests = new ArrayList<PartitionRequest>();
-                partitionRequests.put(pid, requests);
-            }
+            List<PartitionRequest> requests = partitionRequests.computeIfAbsent(pid, k -> new ArrayList<>());
             requests.add(partitionRequest);
         }
 
@@ -133,17 +125,11 @@
     }
 
     private interface IEntryFilter<T> {
-        public boolean matches(T o);
+        boolean matches(T o);
     }
 
     private static <T> void removeEntries(List<T> list, IEntryFilter<T> filter) {
-        Iterator<T> j = list.iterator();
-        while (j.hasNext()) {
-            T o = j.next();
-            if (filter.matches(o)) {
-                j.remove();
-            }
-        }
+        list.removeIf(filter::matches);
     }
 
     private static <T> void removeEntries(Map<PartitionId, List<T>> map, IEntryFilter<T> filter) {
@@ -159,30 +145,16 @@
     }
 
     public void notifyNodeFailures(final Collection<String> deadNodes) {
-        removeEntries(partitionDescriptors, new IEntryFilter<PartitionDescriptor>() {
-            @Override
-            public boolean matches(PartitionDescriptor o) {
-                return deadNodes.contains(o.getNodeId());
-            }
-        });
-        removeEntries(partitionRequests, new IEntryFilter<PartitionRequest>() {
-            @Override
-            public boolean matches(PartitionRequest o) {
-                return deadNodes.contains(o.getNodeId());
-            }
-        });
+        removeEntries(partitionDescriptors, o -> deadNodes.contains(o.getNodeId()));
+        removeEntries(partitionRequests, o -> deadNodes.contains(o.getNodeId()));
     }
 
-    public void removeUncommittedPartitions(Set<PartitionId> partitionIds, final Set<TaskAttemptId> taIds) {
-        if (LOGGER.isDebugEnabled()) {
-            LOGGER.debug("Removing uncommitted partitions: " + partitionIds);
+    public void removeUncommittedPartitions(Set<PartitionId> partitionIds, Set<TaskAttemptId> taIds, JobId jobId) {
+        if (!partitionIds.isEmpty()) {
+            LOGGER.debug("Removing uncommitted partitions {}: {}", jobId, partitionIds);
         }
-        IEntryFilter<PartitionDescriptor> filter = new IEntryFilter<PartitionDescriptor>() {
-            @Override
-            public boolean matches(PartitionDescriptor o) {
-                return o.getState() != PartitionState.COMMITTED && taIds.contains(o.getProducingTaskAttemptId());
-            }
-        };
+        IEntryFilter<PartitionDescriptor> filter =
+                o -> o.getState() != PartitionState.COMMITTED && taIds.contains(o.getProducingTaskAttemptId());
         for (PartitionId pid : partitionIds) {
             List<PartitionDescriptor> descriptors = partitionDescriptors.get(pid);
             if (descriptors != null) {
@@ -194,16 +166,11 @@
         }
     }
 
-    public void removePartitionRequests(Set<PartitionId> partitionIds, final Set<TaskAttemptId> taIds) {
-        if (LOGGER.isDebugEnabled()) {
-            LOGGER.debug("Removing partition requests: " + partitionIds);
+    public void removePartitionRequests(Set<PartitionId> partitionIds, Set<TaskAttemptId> taIds, JobId jobId) {
+        if (!partitionIds.isEmpty()) {
+            LOGGER.debug("Removing partition requests {}: {}", jobId, partitionIds);
         }
-        IEntryFilter<PartitionRequest> filter = new IEntryFilter<PartitionRequest>() {
-            @Override
-            public boolean matches(PartitionRequest o) {
-                return taIds.contains(o.getRequestingTaskAttemptId());
-            }
-        };
+        IEntryFilter<PartitionRequest> filter = o -> taIds.contains(o.getRequestingTaskAttemptId());
         for (PartitionId pid : partitionIds) {
             List<PartitionRequest> requests = partitionRequests.get(pid);
             if (requests != null) {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
index a0c2ce4..f56ec33 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
@@ -46,7 +46,6 @@
 import org.apache.hyracks.control.common.result.AbstractResultManager;
 import org.apache.hyracks.control.common.result.ResultStateSweeper;
 import org.apache.hyracks.control.common.work.IResultCallback;
-import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -81,9 +80,7 @@
     @Override
     public synchronized void notifyJobCreation(JobId jobId, JobSpecification spec,
             IJobCapacityController.JobSubmissionStatus status) throws HyracksException {
-        if (LOGGER.isDebugEnabled()) {
-            LOGGER.debug(getClass().getSimpleName() + " notified of new job " + jobId);
-        }
+        LOGGER.debug("{} notified of new job {}", getClass().getSimpleName(), jobId);
         if (jobResultLocations.get(jobId) != null) {
             throw HyracksDataException.create(ErrorCode.MORE_THAN_ONE_RESULT, jobId);
         }
@@ -160,15 +157,14 @@
 
     @Override
     public synchronized void reportJobFailure(JobId jobId, List<Exception> exceptions) {
-        Exception ex = exceptions.isEmpty() ? null : exceptions.get(0);
-        Level logLevel = Level.DEBUG;
-        if (LOGGER.isEnabled(logLevel)) {
-            LOGGER.log(logLevel, "job " + jobId + " failed and is being reported to " + getClass().getSimpleName(), ex);
-        }
         ResultJobRecord rjr = getResultJobRecord(jobId);
+        if (logFailure(rjr)) {
+            LOGGER.debug("job {} failed and is being reported to {}", jobId, getClass().getSimpleName());
+        }
         if (rjr != null) {
             rjr.fail(exceptions);
         }
+        Exception ex = exceptions.isEmpty() ? null : exceptions.get(0);
         final JobResultInfo jobResultInfo = jobResultLocations.get(jobId);
         if (jobResultInfo != null) {
             jobResultInfo.setException(ex);
@@ -214,6 +210,15 @@
         }
     }
 
+    private static boolean logFailure(ResultJobRecord rjr) {
+        if (rjr == null) {
+            return true;
+        }
+        // don't re-log if the state is already failed
+        ResultJobRecord.Status status = rjr.getStatus();
+        return status == null || status.getState() != State.FAILED;
+    }
+
     /**
      * Compares the records already known by the client for the given job's result set id with the records that the
      * result directory service knows and if there are any newly discovered records returns a whole array with the
@@ -267,7 +272,7 @@
 
 class JobResultInfo {
 
-    private ResultJobRecord record;
+    private final ResultJobRecord record;
     private Waiters waiters;
     private Exception exception;
 
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java
index 6262c47..f065940 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java
@@ -52,7 +52,6 @@
 
     @Override
     public void run() {
-        LOGGER.info("cleaning up {} on NCs, status={}", jobId, status);
         final JobRun jobRun = jobManager.get(jobId);
         if (jobRun == null) {
             LOGGER.debug("ignoring cleanup for unknown {}", jobId);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java
index 833066e..33d391f 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java
@@ -22,17 +22,13 @@
 
 import org.apache.hyracks.api.dataflow.TaskAttemptId;
 import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.util.ExceptionUtils;
 import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.cc.job.IJobManager;
 import org.apache.hyracks.control.cc.job.JobRun;
 import org.apache.hyracks.control.cc.job.TaskAttempt;
-import org.apache.logging.log4j.Level;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
 
 public class TaskFailureWork extends AbstractTaskLifecycleWork {
-    private static final Logger LOGGER = LogManager.getLogger();
+
     private final List<Exception> exceptions;
 
     public TaskFailureWork(ClusterControllerService ccs, JobId jobId, TaskAttemptId taId, String nodeId,
@@ -43,9 +39,6 @@
 
     @Override
     protected void performEvent(TaskAttempt ta) {
-        Exception ex = exceptions.get(0);
-        LOGGER.log(ExceptionUtils.causedByInterrupt(ex) ? Level.DEBUG : Level.WARN,
-                "Executing task failure work for " + this, ex);
         IJobManager jobManager = ccs.getJobManager();
         JobRun run = jobManager.get(jobId);
         if (run == null) {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigManager.java
index a2e43a2f..7f2d78b 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigManager.java
@@ -148,7 +148,7 @@
             if (configured) {
                 throw new IllegalStateException("configuration already processed");
             }
-            LOGGER.debug("registering option: " + option.toIniString());
+            LOGGER.trace("registering option: {}", option::toIniString);
             Map<String, IOption> optionMap = sectionMap.computeIfAbsent(option.section(), section -> new HashMap<>());
             IOption prev = optionMap.put(option.ini(), option);
             if (prev != null) {
@@ -160,8 +160,13 @@
                 registeredOptions.add(option);
                 optionSetters.put(option, (node, value, isDefault) -> correctedMap(node, isDefault).put(option, value));
                 if (LOGGER.isDebugEnabled()) {
-                    optionSetters.put(option, (node, value, isDefault) -> LOGGER.debug("{} {} to {} for node {}",
-                            isDefault ? "defaulting" : "setting", option.toIniString(), value, node));
+                    optionSetters.put(option, (node, value, isDefault) -> {
+                        if (isDefault) {
+                            LOGGER.trace("defaulting {} to {} for node {}", option.toIniString(), value, node);
+                        } else {
+                            LOGGER.debug("setting {} to {} for node {}", option.toIniString(), value, node);
+                        }
+                    });
                 }
             }
         }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java
index d1f7d5a..c99898d 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java
@@ -70,6 +70,8 @@
             InvokeUtil.runWithTimeout(() -> {
                 this.wait(REGISTRATION_RESPONSE_POLL_PERIOD); // NOSONAR while loop in timeout call
             }, () -> !registrationPending, 1, TimeUnit.MINUTES);
+        } catch (InterruptedException e) {
+            throw e;
         } catch (Exception e) {
             registrationException = e;
         }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java
index b0c60aa..6dd4307 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java
@@ -23,7 +23,6 @@
 import org.apache.hyracks.api.dataflow.TaskAttemptId;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.result.IResultPartitionManager;
-import org.apache.hyracks.api.util.ExceptionUtils;
 import org.apache.hyracks.control.common.work.AbstractWork;
 import org.apache.hyracks.control.nc.NodeControllerService;
 import org.apache.hyracks.control.nc.Task;
@@ -50,9 +49,6 @@
 
     @Override
     public void run() {
-        Exception ex = exceptions.get(0);
-        LOGGER.log(ExceptionUtils.causedByInterrupt(ex) ? Level.DEBUG : Level.WARN, "task " + taskId + " has failed",
-                ex);
         try {
             IResultPartitionManager resultPartitionManager = ncs.getResultPartitionManager();
             if (resultPartitionManager != null) {