Merge branch 'trinity' into 'master'
Change-Id: I6b458d2761d490355dde6e4238a15bd42a1e0d9d
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java
index 84a6488..f3e5268 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java
@@ -94,6 +94,15 @@
@Override
public ObjectNode asJson() {
+ return putJson();
+ }
+
+ @Override
+ public ObjectNode asRedactedJson() {
+ return putJson();
+ }
+
+ private ObjectNode putJson() {
ObjectNode json = JSONUtil.createObject();
json.put("uuid", requestReference.getUuid());
json.put("requestTime", new ADateTime(requestReference.getTime()).toSimpleString());
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
index 9f2cbdd..31f1979 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
@@ -33,6 +33,7 @@
import org.apache.hyracks.api.job.resource.IJobCapacityController;
import org.apache.hyracks.api.job.resource.IReadOnlyClusterCapacity;
import org.apache.hyracks.api.util.ExceptionUtils;
+import org.apache.hyracks.util.LogRedactionUtil;
import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -95,11 +96,21 @@
@Override
public ObjectNode asJson() {
ObjectNode json = super.asJson();
- putJobDetails(json);
- json.put("statement", statement);
+ return asJson(json, false);
+ }
+
+ @Override
+ public ObjectNode asRedactedJson() {
+ ObjectNode json = super.asRedactedJson();
+ return asJson(json, true);
+ }
+
+ private ObjectNode asJson(ObjectNode json, boolean redact) {
+ putJobDetails(json, redact);
+ json.put("statement", redact ? LogRedactionUtil.statement(statement) : statement);
json.put("clientContextID", clientContextId);
if (plan != null) {
- json.put("plan", plan);
+ json.put("plan", redact ? LogRedactionUtil.userData(plan) : plan);
}
return json;
}
@@ -132,16 +143,16 @@
return ExceptionUtils.unwrap(e).getMessage();
}
- private void putJobDetails(ObjectNode json) {
+ private void putJobDetails(ObjectNode json, boolean redact) {
try {
json.put("jobId", jobId != null ? jobId.toString() : null);
- putJobState(json, jobState);
+ putJobState(json, jobState, redact);
} catch (Throwable th) {
// ignore
}
}
- private static void putJobState(ObjectNode json, JobState state) {
+ private static void putJobState(ObjectNode json, JobState state, boolean redact) {
AMutableDateTime dateTime = new AMutableDateTime(0);
putTime(json, state.createTime, "jobCreateTime", dateTime);
putTime(json, state.startTime, "jobStartTime", dateTime);
@@ -152,7 +163,7 @@
json.put("jobRequiredCPUs", state.requiredCPUs);
json.put("jobRequiredMemory", state.requiredMemoryInBytes);
if (state.errorMsg != null) {
- json.put("error", state.errorMsg);
+ json.put("error", redact ? LogRedactionUtil.userData(state.errorMsg) : state.errorMsg);
}
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractRequestsServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractRequestsServlet.java
index 285c4c8..b6c6a71 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractRequestsServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractRequestsServlet.java
@@ -35,6 +35,7 @@
public abstract class AbstractRequestsServlet extends AbstractServlet {
+ public static final String REDACT_PARAM = "redact";
protected final ICcApplicationContext appCtx;
public AbstractRequestsServlet(ConcurrentMap<String, Object> ctx, ICcApplicationContext appCtx, String... paths) {
@@ -46,8 +47,15 @@
protected void get(IServletRequest request, IServletResponse response) throws Exception {
ArrayNode requestsJson = JSONUtil.createArray();
Collection<IClientRequest> requests = getRequests();
- for (IClientRequest req : requests) {
- requestsJson.add(req.asJson());
+ String redact = request.getParameter(REDACT_PARAM);
+ if (Boolean.parseBoolean(redact)) {
+ for (IClientRequest req : requests) {
+ requestsJson.add(req.asRedactedJson());
+ }
+ } else {
+ for (IClientRequest req : requests) {
+ requestsJson.add(req.asJson());
+ }
}
HttpUtil.setContentType(response, HttpUtil.ContentType.APPLICATION_JSON, request);
response.setStatus(HttpResponseStatus.OK);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java
index d0a2e74..a201ec8 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java
@@ -140,6 +140,7 @@
if (!cancelRecovery && listener.getState() == ActivityState.TEMPORARILY_FAILED) {
listener.setState(ActivityState.RECOVERING);
listener.doRecover(metadataProvider);
+ listener.setRunning(metadataProvider, true);
}
LOGGER.log(level, "Recovery completed successfully");
return null;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.2.pollquery.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.2.pollquery.sqlpp
index 7eee42e..07f8eea 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.2.pollquery.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.2.pollquery.sqlpp
@@ -18,5 +18,5 @@
*/
-- param client_context_id=ensure_running_query
-- polltimeoutsecs=15
-SELECT rqst.cancellable, rqst.jobId, rqst.state, rqst.uuid FROM active_requests() rqst
+SELECT rqst.cancellable, rqst.jobId, rqst.state, rqst.uuid, rqst.jobStatus FROM active_requests() rqst
WHERE rqst.clientContextID = 'sleep_async_query';
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/completed_requests/completed_requests.4.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/completed_requests/completed_requests.4.query.sqlpp
new file mode 100644
index 0000000..de35939
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/completed_requests/completed_requests.4.query.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ -- param client_context_id=ensure_completed_query
+ -- param ignoreextrafields=true
+SELECT VALUE (SELECT VALUE r FROM completed_requests() r
+WHERE r.state="completed" AND r.clientContextID = "completed_requests_query")[0];
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/active_requests/active_requests.2.regex b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/active_requests/active_requests.2.regex
index e31fe3b..170838e 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/active_requests/active_requests.2.regex
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/active_requests/active_requests.2.regex
@@ -1 +1 @@
-/\{ "cancellable": true, "jobId": "JID:.*", "state": "running", "uuid": ".*" \}/
\ No newline at end of file
+/\{ "cancellable": true, "jobId": "JID:.*", "state": "running", "uuid": ".*", "jobStatus": "RUNNING" \}/
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/completed_requests/completed_requests.4.regexjson b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/completed_requests/completed_requests.4.regexjson
new file mode 100644
index 0000000..15affd1
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/completed_requests/completed_requests.4.regexjson
@@ -0,0 +1,19 @@
+{
+ "cancellable": true,
+ "clientContextID": "completed_requests_query",
+ "elapsedTime": "R{.*}",
+ "jobCreateTime": "R{.*}",
+ "jobEndTime": "R{.*}",
+ "jobId": "R{.*}",
+ "jobQueueTime": "R{.*}",
+ "jobRequiredCPUs": "R{.*}",
+ "jobRequiredMemory": "R{.*}",
+ "jobStartTime": "R{.*}",
+ "jobStatus": "TERMINATED",
+ "node": "R{.*}",
+ "remoteAddr": "R{.*}",
+ "requestTime": "R{.*}",
+ "state": "completed",
+ "userAgent": "R{.*}",
+ "uuid": "R{.*}"
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
index 8d2c91a..ee518dd 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
@@ -104,6 +104,11 @@
ObjectNode asJson();
/**
+ * @return A redacted json node representation of this request
+ */
+ ObjectNode asRedactedJson();
+
+ /**
* Called when the job is created.
*
* @param jobId the job id
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
index 5771201..9875651 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
@@ -18,9 +18,9 @@
*/
package org.apache.asterix.runtime.utils;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -30,7 +30,6 @@
import org.apache.asterix.common.api.IClientRequest;
import org.apache.asterix.common.api.IRequestTracker;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
-import org.apache.commons.collections4.queue.CircularFifoQueue;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.job.JobId;
@@ -42,14 +41,22 @@
private final Map<String, IClientRequest> runningRequests = new ConcurrentHashMap<>();
private final Map<String, IClientRequest> clientIdRequests = new ConcurrentHashMap<>();
- private final CircularFifoQueue<IClientRequest> completedRequests;
+ private final Map<String, IClientRequest> completedRequests;
private final ICcApplicationContext ccAppCtx;
private final AtomicLong numRequests;
private final AtomicLong numOfFailedRequests;
public RequestTracker(ICcApplicationContext ccAppCtx) {
this.ccAppCtx = ccAppCtx;
- completedRequests = new CircularFifoQueue<>(ccAppCtx.getExternalProperties().getRequestsArchiveSize());
+ int archiveSize = ccAppCtx.getExternalProperties().getRequestsArchiveSize();
+ completedRequests = new LinkedHashMap<>(archiveSize) {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ protected boolean removeEldestEntry(Map.Entry<String, IClientRequest> eldest) {
+ return size() > archiveSize;
+ }
+ };
numRequests = new AtomicLong(0);
numOfFailedRequests = new AtomicLong(0);
}
@@ -102,7 +109,7 @@
@Override
public synchronized Collection<IClientRequest> getCompletedRequests() {
- return Collections.unmodifiableCollection(new ArrayList<>(completedRequests));
+ return Collections.unmodifiableCollection(completedRequests.values());
}
private void cancel(IClientRequest request) throws HyracksDataException {
@@ -122,7 +129,7 @@
}
private synchronized void archive(IClientRequest request) {
- completedRequests.add(request);
+ completedRequests.put(request.getId(), request);
}
public long getTotalNumberOfRequests() {
@@ -139,13 +146,14 @@
return numOfFailedRequests.get();
}
+ @Override
public void notifyJobCreation(JobId jobId, JobSpecification spec, IJobCapacityController.JobSubmissionStatus status)
throws HyracksException {
String requestId = spec.getRequestId();
if (requestId != null) {
- IClientRequest clientRequest = runningRequests.get(requestId);
- if (clientRequest != null) {
- clientRequest.jobCreated(jobId, spec.getRequiredClusterCapacity(), status);
+ IClientRequest request = getRequest(requestId);
+ if (request != null) {
+ request.jobCreated(jobId, spec.getRequiredClusterCapacity(), status);
}
}
}
@@ -154,9 +162,9 @@
public void notifyJobStart(JobId jobId, JobSpecification spec) throws HyracksException {
String requestId = spec.getRequestId();
if (requestId != null) {
- IClientRequest clientRequest = runningRequests.get(requestId);
- if (clientRequest != null) {
- clientRequest.jobStarted(jobId);
+ IClientRequest request = getRequest(requestId);
+ if (request != null) {
+ request.jobStarted(jobId);
}
}
}
@@ -166,10 +174,20 @@
throws HyracksException {
String requestId = spec.getRequestId();
if (requestId != null) {
- IClientRequest clientRequest = runningRequests.get(requestId);
- if (clientRequest != null) {
- clientRequest.jobFinished(jobId, jobStatus, exceptions);
+ IClientRequest request = getRequest(requestId);
+ if (request != null) {
+ request.jobFinished(jobId, jobStatus, exceptions);
}
}
}
+
+ private IClientRequest getRequest(String requestId) {
+ IClientRequest clientRequest = runningRequests.get(requestId);
+ if (clientRequest != null) {
+ return clientRequest;
+ }
+ synchronized (this) {
+ return completedRequests.get(requestId);
+ }
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
index 1b0f377..b476993 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
@@ -539,7 +539,7 @@
private void abortTaskCluster(TaskClusterAttempt tcAttempt,
TaskClusterAttempt.TaskClusterStatus failedOrAbortedStatus) {
- LOGGER.trace(() -> "Aborting task cluster: " + tcAttempt.getAttempt());
+ LOGGER.trace("Aborting task cluster: {}", tcAttempt.getAttempt());
Set<TaskAttemptId> abortTaskIds = new HashSet<>();
Map<String, List<TaskAttemptId>> abortTaskAttemptMap = new HashMap<>();
for (TaskAttempt ta : tcAttempt.getTaskAttempts().values()) {
@@ -561,14 +561,12 @@
}
}
final JobId jobId = jobRun.getJobId();
- LOGGER.trace(() -> "Abort map for job: " + jobId + ": " + abortTaskAttemptMap);
+ LOGGER.trace("Abort map for job: {}: {}", jobId, abortTaskAttemptMap);
INodeManager nodeManager = ccs.getNodeManager();
abortTaskAttemptMap.forEach((key, abortTaskAttempts) -> {
final NodeControllerState node = nodeManager.getNodeControllerState(key);
if (node != null) {
- if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("Aborting: " + abortTaskAttempts + " at " + key);
- }
+ LOGGER.trace("Aborting: {} at {}", abortTaskAttempts, key);
try {
node.getNodeController().abortTasks(jobId, abortTaskAttempts);
} catch (Exception e) {
@@ -579,8 +577,8 @@
inProgressTaskClusters.remove(tcAttempt.getTaskCluster());
TaskCluster tc = tcAttempt.getTaskCluster();
PartitionMatchMaker pmm = jobRun.getPartitionMatchMaker();
- pmm.removeUncommittedPartitions(tc.getProducedPartitions(), abortTaskIds);
- pmm.removePartitionRequests(tc.getRequiredPartitions(), abortTaskIds);
+ pmm.removeUncommittedPartitions(tc.getProducedPartitions(), abortTaskIds, jobId);
+ pmm.removePartitionRequests(tc.getRequiredPartitions(), abortTaskIds, jobId);
tcAttempt.setStatus(failedOrAbortedStatus);
tcAttempt.setEndTime(System.currentTimeMillis());
@@ -683,7 +681,6 @@
*/
public void notifyTaskFailure(TaskAttempt ta, List<Exception> exceptions) {
try {
- LOGGER.debug("Received failure notification for TaskAttempt " + ta.getTaskAttemptId());
TaskAttemptId taId = ta.getTaskAttemptId();
TaskCluster tc = ta.getTask().getTaskCluster();
TaskClusterAttempt lastAttempt = findLastTaskClusterAttempt(tc);
@@ -696,7 +693,7 @@
LOGGER.trace(() -> "Marking TaskAttempt " + ta.getTaskAttemptId()
+ " as failed and the number of max re-attempts = " + maxReattempts);
if (lastAttempt.getAttempt() >= maxReattempts || isCancelled()) {
- LOGGER.debug(() -> "Aborting the job of " + ta.getTaskAttemptId());
+ LOGGER.debug("Aborting the job:{} of {}", jobRun.getJobId(), ta.getTaskAttemptId());
abortJob(exceptions, NoOpCallback.INSTANCE);
return;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
index 9165953..3a954f4 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
@@ -145,7 +145,7 @@
if (activeRunMap.containsKey(jobId)) {
JobRun jobRun = activeRunMap.get(jobId);
// The following call will abort all ongoing tasks and then consequently
- // trigger JobCleanupWork and JobCleanupNotificationWork which will update the lifecyle of the job.
+ // trigger JobCleanupWork and JobCleanupNotificationWork which will update the lifecycle of the job.
// Therefore, we do not remove the job out of activeRunMap here.
jobRun.getExecutor().cancelJob(callback);
incrementCancelledJobs();
@@ -157,7 +157,7 @@
incrementCancelledJobs();
List<Exception> exceptions =
Collections.singletonList(HyracksException.create(ErrorCode.JOB_CANCELED, jobId));
- // Since the job has not been executed, we only need to update its status and lifecyle here.
+ // Since the job has not been executed, we only need to update its status and lifecycle here.
jobRun.setStatus(JobStatus.FAILURE_BEFORE_EXECUTION, exceptions);
runMapArchive.put(jobId, jobRun);
runMapHistory.put(jobId, exceptions);
@@ -189,7 +189,6 @@
return;
}
if (run.getPendingStatus() != null) {
- LOGGER.warn("Ignoring duplicate cleanup for JobRun with id: {}", run::getJobId);
return;
}
Set<String> targetNodes = run.getParticipatingNodeIds();
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/partitions/PartitionMatchMaker.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/partitions/PartitionMatchMaker.java
index ac29b53..8f91944 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/partitions/PartitionMatchMaker.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/partitions/PartitionMatchMaker.java
@@ -28,6 +28,7 @@
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hyracks.api.dataflow.TaskAttemptId;
+import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.partitions.PartitionId;
import org.apache.hyracks.control.common.job.PartitionDescriptor;
import org.apache.hyracks.control.common.job.PartitionRequest;
@@ -43,14 +44,13 @@
private final Map<PartitionId, List<PartitionRequest>> partitionRequests;
public PartitionMatchMaker() {
- partitionDescriptors = new HashMap<PartitionId, List<PartitionDescriptor>>();
- partitionRequests = new HashMap<PartitionId, List<PartitionRequest>>();
+ partitionDescriptors = new HashMap<>();
+ partitionRequests = new HashMap<>();
}
public List<Pair<PartitionDescriptor, PartitionRequest>> registerPartitionDescriptor(
PartitionDescriptor partitionDescriptor) {
- List<Pair<PartitionDescriptor, PartitionRequest>> matches =
- new ArrayList<Pair<PartitionDescriptor, PartitionRequest>>();
+ List<Pair<PartitionDescriptor, PartitionRequest>> matches = new ArrayList<>();
PartitionId pid = partitionDescriptor.getPartitionId();
boolean matched = false;
List<PartitionRequest> requests = partitionRequests.get(pid);
@@ -73,11 +73,7 @@
}
if (!matched) {
- List<PartitionDescriptor> descriptors = partitionDescriptors.get(pid);
- if (descriptors == null) {
- descriptors = new ArrayList<PartitionDescriptor>();
- partitionDescriptors.put(pid, descriptors);
- }
+ List<PartitionDescriptor> descriptors = partitionDescriptors.computeIfAbsent(pid, k -> new ArrayList<>());
descriptors.add(partitionDescriptor);
}
@@ -108,11 +104,7 @@
}
if (match == null) {
- List<PartitionRequest> requests = partitionRequests.get(pid);
- if (requests == null) {
- requests = new ArrayList<PartitionRequest>();
- partitionRequests.put(pid, requests);
- }
+ List<PartitionRequest> requests = partitionRequests.computeIfAbsent(pid, k -> new ArrayList<>());
requests.add(partitionRequest);
}
@@ -133,17 +125,11 @@
}
private interface IEntryFilter<T> {
- public boolean matches(T o);
+ boolean matches(T o);
}
private static <T> void removeEntries(List<T> list, IEntryFilter<T> filter) {
- Iterator<T> j = list.iterator();
- while (j.hasNext()) {
- T o = j.next();
- if (filter.matches(o)) {
- j.remove();
- }
- }
+ list.removeIf(filter::matches);
}
private static <T> void removeEntries(Map<PartitionId, List<T>> map, IEntryFilter<T> filter) {
@@ -159,30 +145,16 @@
}
public void notifyNodeFailures(final Collection<String> deadNodes) {
- removeEntries(partitionDescriptors, new IEntryFilter<PartitionDescriptor>() {
- @Override
- public boolean matches(PartitionDescriptor o) {
- return deadNodes.contains(o.getNodeId());
- }
- });
- removeEntries(partitionRequests, new IEntryFilter<PartitionRequest>() {
- @Override
- public boolean matches(PartitionRequest o) {
- return deadNodes.contains(o.getNodeId());
- }
- });
+ removeEntries(partitionDescriptors, o -> deadNodes.contains(o.getNodeId()));
+ removeEntries(partitionRequests, o -> deadNodes.contains(o.getNodeId()));
}
- public void removeUncommittedPartitions(Set<PartitionId> partitionIds, final Set<TaskAttemptId> taIds) {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("Removing uncommitted partitions: " + partitionIds);
+ public void removeUncommittedPartitions(Set<PartitionId> partitionIds, Set<TaskAttemptId> taIds, JobId jobId) {
+ if (!partitionIds.isEmpty()) {
+ LOGGER.debug("Removing uncommitted partitions {}: {}", jobId, partitionIds);
}
- IEntryFilter<PartitionDescriptor> filter = new IEntryFilter<PartitionDescriptor>() {
- @Override
- public boolean matches(PartitionDescriptor o) {
- return o.getState() != PartitionState.COMMITTED && taIds.contains(o.getProducingTaskAttemptId());
- }
- };
+ IEntryFilter<PartitionDescriptor> filter =
+ o -> o.getState() != PartitionState.COMMITTED && taIds.contains(o.getProducingTaskAttemptId());
for (PartitionId pid : partitionIds) {
List<PartitionDescriptor> descriptors = partitionDescriptors.get(pid);
if (descriptors != null) {
@@ -194,16 +166,11 @@
}
}
- public void removePartitionRequests(Set<PartitionId> partitionIds, final Set<TaskAttemptId> taIds) {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("Removing partition requests: " + partitionIds);
+ public void removePartitionRequests(Set<PartitionId> partitionIds, Set<TaskAttemptId> taIds, JobId jobId) {
+ if (!partitionIds.isEmpty()) {
+ LOGGER.debug("Removing partition requests {}: {}", jobId, partitionIds);
}
- IEntryFilter<PartitionRequest> filter = new IEntryFilter<PartitionRequest>() {
- @Override
- public boolean matches(PartitionRequest o) {
- return taIds.contains(o.getRequestingTaskAttemptId());
- }
- };
+ IEntryFilter<PartitionRequest> filter = o -> taIds.contains(o.getRequestingTaskAttemptId());
for (PartitionId pid : partitionIds) {
List<PartitionRequest> requests = partitionRequests.get(pid);
if (requests != null) {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
index a0c2ce4..f56ec33 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
@@ -46,7 +46,6 @@
import org.apache.hyracks.control.common.result.AbstractResultManager;
import org.apache.hyracks.control.common.result.ResultStateSweeper;
import org.apache.hyracks.control.common.work.IResultCallback;
-import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -81,9 +80,7 @@
@Override
public synchronized void notifyJobCreation(JobId jobId, JobSpecification spec,
IJobCapacityController.JobSubmissionStatus status) throws HyracksException {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug(getClass().getSimpleName() + " notified of new job " + jobId);
- }
+ LOGGER.debug("{} notified of new job {}", getClass().getSimpleName(), jobId);
if (jobResultLocations.get(jobId) != null) {
throw HyracksDataException.create(ErrorCode.MORE_THAN_ONE_RESULT, jobId);
}
@@ -160,15 +157,14 @@
@Override
public synchronized void reportJobFailure(JobId jobId, List<Exception> exceptions) {
- Exception ex = exceptions.isEmpty() ? null : exceptions.get(0);
- Level logLevel = Level.DEBUG;
- if (LOGGER.isEnabled(logLevel)) {
- LOGGER.log(logLevel, "job " + jobId + " failed and is being reported to " + getClass().getSimpleName(), ex);
- }
ResultJobRecord rjr = getResultJobRecord(jobId);
+ if (logFailure(rjr)) {
+ LOGGER.debug("job {} failed and is being reported to {}", jobId, getClass().getSimpleName());
+ }
if (rjr != null) {
rjr.fail(exceptions);
}
+ Exception ex = exceptions.isEmpty() ? null : exceptions.get(0);
final JobResultInfo jobResultInfo = jobResultLocations.get(jobId);
if (jobResultInfo != null) {
jobResultInfo.setException(ex);
@@ -214,6 +210,15 @@
}
}
+ private static boolean logFailure(ResultJobRecord rjr) {
+ if (rjr == null) {
+ return true;
+ }
+ // don't re-log if the state is already failed
+ ResultJobRecord.Status status = rjr.getStatus();
+ return status == null || status.getState() != State.FAILED;
+ }
+
/**
* Compares the records already known by the client for the given job's result set id with the records that the
* result directory service knows and if there are any newly discovered records returns a whole array with the
@@ -267,7 +272,7 @@
class JobResultInfo {
- private ResultJobRecord record;
+ private final ResultJobRecord record;
private Waiters waiters;
private Exception exception;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java
index 6262c47..f065940 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java
@@ -52,7 +52,6 @@
@Override
public void run() {
- LOGGER.info("cleaning up {} on NCs, status={}", jobId, status);
final JobRun jobRun = jobManager.get(jobId);
if (jobRun == null) {
LOGGER.debug("ignoring cleanup for unknown {}", jobId);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java
index 833066e..33d391f 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java
@@ -22,17 +22,13 @@
import org.apache.hyracks.api.dataflow.TaskAttemptId;
import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.util.ExceptionUtils;
import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.control.cc.job.IJobManager;
import org.apache.hyracks.control.cc.job.JobRun;
import org.apache.hyracks.control.cc.job.TaskAttempt;
-import org.apache.logging.log4j.Level;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
public class TaskFailureWork extends AbstractTaskLifecycleWork {
- private static final Logger LOGGER = LogManager.getLogger();
+
private final List<Exception> exceptions;
public TaskFailureWork(ClusterControllerService ccs, JobId jobId, TaskAttemptId taId, String nodeId,
@@ -43,9 +39,6 @@
@Override
protected void performEvent(TaskAttempt ta) {
- Exception ex = exceptions.get(0);
- LOGGER.log(ExceptionUtils.causedByInterrupt(ex) ? Level.DEBUG : Level.WARN,
- "Executing task failure work for " + this, ex);
IJobManager jobManager = ccs.getJobManager();
JobRun run = jobManager.get(jobId);
if (run == null) {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigManager.java
index a2e43a2f..7f2d78b 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigManager.java
@@ -148,7 +148,7 @@
if (configured) {
throw new IllegalStateException("configuration already processed");
}
- LOGGER.debug("registering option: " + option.toIniString());
+ LOGGER.trace("registering option: {}", option::toIniString);
Map<String, IOption> optionMap = sectionMap.computeIfAbsent(option.section(), section -> new HashMap<>());
IOption prev = optionMap.put(option.ini(), option);
if (prev != null) {
@@ -160,8 +160,13 @@
registeredOptions.add(option);
optionSetters.put(option, (node, value, isDefault) -> correctedMap(node, isDefault).put(option, value));
if (LOGGER.isDebugEnabled()) {
- optionSetters.put(option, (node, value, isDefault) -> LOGGER.debug("{} {} to {} for node {}",
- isDefault ? "defaulting" : "setting", option.toIniString(), value, node));
+ optionSetters.put(option, (node, value, isDefault) -> {
+ if (isDefault) {
+ LOGGER.trace("defaulting {} to {} for node {}", option.toIniString(), value, node);
+ } else {
+ LOGGER.debug("setting {} to {} for node {}", option.toIniString(), value, node);
+ }
+ });
}
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java
index d1f7d5a..c99898d 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java
@@ -70,6 +70,8 @@
InvokeUtil.runWithTimeout(() -> {
this.wait(REGISTRATION_RESPONSE_POLL_PERIOD); // NOSONAR while loop in timeout call
}, () -> !registrationPending, 1, TimeUnit.MINUTES);
+ } catch (InterruptedException e) {
+ throw e;
} catch (Exception e) {
registrationException = e;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java
index b0c60aa..6dd4307 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java
@@ -23,7 +23,6 @@
import org.apache.hyracks.api.dataflow.TaskAttemptId;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.result.IResultPartitionManager;
-import org.apache.hyracks.api.util.ExceptionUtils;
import org.apache.hyracks.control.common.work.AbstractWork;
import org.apache.hyracks.control.nc.NodeControllerService;
import org.apache.hyracks.control.nc.Task;
@@ -50,9 +49,6 @@
@Override
public void run() {
- Exception ex = exceptions.get(0);
- LOGGER.log(ExceptionUtils.causedByInterrupt(ex) ? Level.DEBUG : Level.WARN, "task " + taskId + " has failed",
- ex);
try {
IResultPartitionManager resultPartitionManager = ncs.getResultPartitionManager();
if (resultPartitionManager != null) {