[NO ISSUE][OTH] Logging enhancements + query job logging
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
- Reduce some JobWork logging level to TRACE.
- Remove not useful logs.
- Add context to cancel request/response messages.
- Avoid unnecessary NPEs when closing the pipeline causing
not needed stack traces in logs.
- Add method to get Job Queue size.
- Add method to IJobCapacityController to get cluster
current capacity for logging.
This patch includes backports from:
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18318
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18016
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18237
Change-Id: I0389e693493d99a12483b94c50eeb1697f69515f
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18353
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
index b99e4f2..ed481bc 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
@@ -104,9 +104,9 @@
}
public void submit(ActiveManagerMessage message) throws HyracksDataException {
- LOGGER.debug("Message of type {} received in {}", message.getKind(), nodeId);
switch (message.getKind()) {
case STOP_ACTIVITY:
+ LOGGER.debug("Message of type {} received in {}", message.getKind(), nodeId);
stopRuntime(message);
break;
case REQUEST_STATS:
@@ -151,7 +151,6 @@
return;
}
String stats = runtime.getStats();
- LOGGER.debug("Sending stats response for {} ", runtimeId);
ActiveStatsResponse response = new ActiveStatsResponse(reqId, stats, null);
((NodeControllerService) serviceCtx.getControllerService()).sendRealTimeApplicationMessageToCC(
message.getCcId(), JavaSerializationUtils.serialize(response), null);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
index 626b938..7b253f1 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
@@ -138,7 +138,7 @@
}
protected synchronized void setState(ActivityState newState) {
- LOGGER.log(level, "State of {} is being set to {} from {}", getEntityId(), newState, state);
+ LOGGER.log(level, "state of {} is being set from {} to {}", getEntityId(), state, newState);
this.prevState = state;
this.state = newState;
if (newState == ActivityState.STARTING || newState == ActivityState.RECOVERING
@@ -153,9 +153,8 @@
@Override
public synchronized void notify(ActiveEvent event) {
try {
- if (LOGGER.isEnabled(level)) {
- LOGGER.log(level, "EventListener is notified.");
- }
+ LOGGER.debug("CC handling event {}; state={}, prev state={}, suspended={}", event, state, prevState,
+ suspended);
ActiveEvent.Kind eventKind = event.getEventKind();
switch (eventKind) {
case JOB_CREATED:
@@ -194,26 +193,21 @@
@SuppressWarnings("unchecked")
protected void finish(ActiveEvent event) throws HyracksDataException {
- if (LOGGER.isEnabled(level)) {
- LOGGER.log(level, "Active job {} finished", jobId);
- }
JobId lastJobId = jobId;
+ Pair<JobStatus, List<Exception>> status = (Pair<JobStatus, List<Exception>>) event.getEventObject();
if (numRegistered != numDeRegistered) {
LOGGER.log(Level.WARN,
- "Active job {} finished with reported runtime registrations = {} and deregistrations = {}", jobId,
- numRegistered, numDeRegistered);
+ "ingestion job {} finished with status={}, reported runtime registrations={}, deregistrations={}",
+ jobId, status, numRegistered, numDeRegistered);
}
jobId = null;
- Pair<JobStatus, List<Exception>> status = (Pair<JobStatus, List<Exception>>) event.getEventObject();
JobStatus jobStatus = status.getLeft();
List<Exception> exceptions = status.getRight();
- if (LOGGER.isEnabled(level)) {
- LOGGER.log(level, "Active job {} finished with status {}", lastJobId, jobStatus);
- }
+ LOGGER.debug("ingestion job {} finished with status {}", lastJobId, jobStatus);
if (!jobSuccessfullyTerminated(jobStatus)) {
jobFailure = exceptions.isEmpty() ? new RuntimeDataException(ErrorCode.UNREPORTED_TASK_FAILURE_EXCEPTION)
: exceptions.get(0);
- LOGGER.error("Active Job {} failed", lastJobId, jobFailure);
+ LOGGER.error("ingestion job {} failed", lastJobId, jobFailure);
setState((state == ActivityState.STOPPING || state == ActivityState.CANCELLING) ? ActivityState.STOPPED
: ActivityState.TEMPORARILY_FAILED);
if (prevState == ActivityState.RUNNING) {
@@ -371,16 +365,14 @@
@Override
public synchronized void recover() {
- if (LOGGER.isEnabled(level)) {
- LOGGER.log(level, "Recover is called on {}", entityId);
- }
if (retryPolicyFactory == NoRetryPolicyFactory.INSTANCE) {
- LOGGER.log(level, "But it has no recovery policy, so it is set to permanent failure");
+ LOGGER.debug("recover is called on {} w/o recovery policy; setting to permanent failure", entityId);
setState(ActivityState.STOPPED);
} else {
+ LOGGER.debug("recover is called on {}", entityId);
ExecutorService executor = appCtx.getServiceContext().getControllerService().getExecutor();
setState(ActivityState.TEMPORARILY_FAILED);
- LOGGER.log(level, "Recovery task has been submitted");
+ LOGGER.debug("recovery task has been submitted");
rt = createRecoveryTask();
executor.submit(rt.recover());
}
@@ -479,15 +471,11 @@
// Note: once we start sending stop messages, we can't go back until the entity is stopped
final String nameBefore = Thread.currentThread().getName();
try {
- Thread.currentThread().setName(nameBefore + " : WaitForCompletionForJobId: " + jobId);
+ Thread.currentThread().setName(nameBefore + " : wait-for-ingestion-completion: " + jobId);
sendStopMessages(metadataProvider, timeout, unit);
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("Waiting for its state to become " + waitFor);
- }
+ LOGGER.debug("waiting for {} to become {}", jobId, waitFor);
subscriber.sync();
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("Disconnect has been completed " + waitFor);
- }
+ LOGGER.debug("disconnect has been completed {}", waitFor);
} catch (InterruptedException ie) {
forceStop(subscriber, ie);
Thread.currentThread().interrupt();
@@ -513,13 +501,8 @@
ICCMessageBroker messageBroker = (ICCMessageBroker) applicationCtx.getServiceContext().getMessageBroker();
AlgebricksAbsolutePartitionConstraint runtimeLocations = getLocations();
int partition = 0;
- if (LOGGER.isInfoEnabled()) {
- LOGGER.log(Level.INFO, "Sending stop messages to " + runtimeLocations);
- }
+ LOGGER.log(Level.INFO, "sending stop messages to {}", runtimeLocations);
for (String location : runtimeLocations.getLocations()) {
- if (LOGGER.isInfoEnabled()) {
- LOGGER.log(Level.INFO, "Sending to " + location);
- }
ActiveRuntimeId runtimeId = getActiveRuntimeId(partition++);
messageBroker.sendApplicationMessageToNC(new ActiveManagerMessage(ActiveManagerMessage.Kind.STOP_ACTIVITY,
runtimeId, new StopRuntimeParameters(timeout, unit)), location);
@@ -581,14 +564,10 @@
WaitForStateSubscriber subscriber;
Future<Void> suspendTask;
synchronized (this) {
- if (LOGGER.isEnabled(level)) {
- LOGGER.log(level, "suspending entity " + entityId);
- LOGGER.log(level, "Waiting for ongoing activities");
- }
+ LOGGER.log(level, "{} suspending entity {}", jobId, entityId);
+ LOGGER.log(level, "{} waiting for ongoing activities", jobId);
waitForNonTransitionState();
- if (LOGGER.isEnabled(level)) {
- LOGGER.log(level, "Proceeding with suspension. Current state is " + state);
- }
+ LOGGER.log(level, "{} proceeding with suspension. current state is {}", jobId, state);
if (state == ActivityState.STOPPED) {
suspended = true;
return;
@@ -609,12 +588,12 @@
doSuspend(metadataProvider);
return null;
});
- LOGGER.log(level, "Suspension task has been submitted");
+ LOGGER.log(level, "{} suspension task has been submitted", jobId);
}
try {
- LOGGER.log(level, "Waiting for suspension task to complete");
+ LOGGER.log(level, "{} waiting for suspension task to complete", jobId);
suspendTask.get();
- LOGGER.log(level, "waiting for state to become SUSPENDED or TEMPORARILY_FAILED");
+ LOGGER.log(level, "{} waiting for state to become SUSPENDED or TEMPORARILY_FAILED", jobId);
subscriber.sync();
suspended = true;
} catch (Exception e) {
@@ -736,7 +715,7 @@
@Override
public String toString() {
- return "{\"class\":\"" + getClass().getSimpleName() + "\"," + "\"entityId\":\"" + entityId + "\","
- + "\"state\":\"" + state + "\"" + "}";
+ return "{\"class\":\"" + getClass().getSimpleName() + "\", \"entityId\":\"" + entityId + "\", \"state\":\""
+ + state + "\", \"prev state\":\"" + prevState + "\", \"suspended\":" + suspended + "}";
}
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
index 3c277d5..662884d 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
@@ -72,16 +72,14 @@
EntityId entityId = jobId2EntityId.get(jobId);
if (entityId != null) {
IActiveEntityEventsListener listener = entityEventListeners.get(entityId);
- if (LOGGER.isEnabled(level)) {
- LOGGER.log(level, "Next event is {} for job {}", eventKind, jobId);
- }
if (eventKind == Kind.JOB_FINISHED) {
- LOGGER.log(level, "Removing job {}", jobId);
+ LOGGER.debug("removing ingestion job {}", jobId);
jobId2EntityId.remove(jobId);
}
if (listener != null) {
- LOGGER.log(level, "Notifying the listener");
listener.notify(event);
+ } else {
+ LOGGER.debug("listener not found for entity {} on event={}", entityId, event);
}
} else {
LOGGER.log(Level.ERROR, "Entity not found for event {} for job {}", eventKind, jobId);
@@ -92,45 +90,29 @@
@Override
public void notifyJobCreation(JobId jobId, JobSpecification jobSpecification) throws HyracksDataException {
- if (LOGGER.isEnabled(level)) {
- LOGGER.log(level, "notifyJobCreation was called for job {}", jobId);
- }
Object property = jobSpecification.getProperty(ACTIVE_ENTITY_PROPERTY_NAME);
if (!(property instanceof EntityId)) {
- if (LOGGER.isEnabled(level)) {
- LOGGER.log(level, "Job {} is not of type active job. property found to be {}", jobId, property);
+ if (property != null) {
+ LOGGER.debug("{} is not an ingestion job. job property={}", jobId, property);
}
return;
}
+ LOGGER.debug("notified of ingestion job creation {}", jobId);
EntityId entityId = (EntityId) property;
monitorJob(jobId, entityId);
- boolean found = jobId2EntityId.get(jobId) != null;
- LOGGER.log(level, "Job {} was found to be {}", jobId, (found ? "Active" : "Inactive"));
add(new ActiveEvent(jobId, Kind.JOB_CREATED, entityId, jobSpecification));
}
private synchronized void monitorJob(JobId jobId, EntityId entityId) {
- if (LOGGER.isEnabled(level)) {
- LOGGER.log(level, "monitorJob was called for job {}", jobId);
- }
- boolean found = jobId2EntityId.get(jobId) != null;
- if (LOGGER.isEnabled(level)) {
- LOGGER.log(level, "Job {} was found to be {}", jobId, (found ? "Active" : "Inactive"));
- }
+ boolean found = jobId2EntityId.containsKey(jobId);
+ LOGGER.debug("{} was {}", jobId, (found ? "active" : "inactive"));
if (entityEventListeners.containsKey(entityId)) {
- if (jobId2EntityId.containsKey(jobId)) {
- if (LOGGER.isErrorEnabled()) {
- LOGGER.error("Job {} is already being monitored", jobId);
- }
+ if (found) {
+ LOGGER.error("{} is already being monitored", jobId);
return;
}
- if (LOGGER.isEnabled(level)) {
- LOGGER.log(level, "Monitoring started for job {}", jobId);
- }
} else {
- if (LOGGER.isEnabled(level)) {
- LOGGER.info("No listener was found for the entity {} for job {}", entityId, jobId);
- }
+ LOGGER.debug("no listener found for entity {}; {}", entityId, jobId);
}
jobId2EntityId.put(jobId, entityId);
}
@@ -141,22 +123,18 @@
if (entityId != null) {
add(new ActiveEvent(jobId, Kind.JOB_STARTED, entityId, null));
}
+ // else must be non-active job, e.g. a job for a query
}
@Override
public synchronized void notifyJobFinish(JobId jobId, JobStatus jobStatus, List<Exception> exceptions)
throws HyracksException {
- if (LOGGER.isEnabled(level)) {
- LOGGER.log(level, "Getting notified of job finish for job {}", jobId);
- }
EntityId entityId = jobId2EntityId.get(jobId);
if (entityId != null) {
+ LOGGER.debug("notified of ingestion job finish {}", jobId);
add(new ActiveEvent(jobId, Kind.JOB_FINISHED, entityId, Pair.of(jobStatus, exceptions)));
- } else {
- if (LOGGER.isEnabled(level)) {
- LOGGER.log(level, "No need to notify JOB_FINISHED for job {}", jobId);
- }
}
+ // else must be non-active job, e.g. a job for a query
}
// *** IActiveNotificationHandler
@@ -169,13 +147,6 @@
@Override
public IActiveEntityEventsListener getListener(EntityId entityId) {
- if (LOGGER.isEnabled(level)) {
- LOGGER.log(level, "getActiveEntityListener was called with entity {}", entityId);
- }
- IActiveEntityEventsListener listener = entityEventListeners.get(entityId);
- if (LOGGER.isEnabled(level)) {
- LOGGER.log(level, "Listener found: {}", listener);
- }
return entityEventListeners.get(entityId);
}
@@ -197,9 +168,7 @@
if (suspended) {
throw new RuntimeDataException(ErrorCode.ACTIVE_NOTIFICATION_HANDLER_IS_SUSPENDED);
}
- if (LOGGER.isEnabled(level)) {
- LOGGER.log(level, "registerListener was called for the entity {}", listener.getEntityId());
- }
+ LOGGER.debug("register listener for entity {}, state={}", listener.getEntityId(), listener.getState());
if (entityEventListeners.containsKey(listener.getEntityId())) {
throw new RuntimeDataException(ErrorCode.ACTIVE_ENTITY_IS_ALREADY_REGISTERED, listener.getEntityId());
}
@@ -211,9 +180,7 @@
if (suspended) {
throw new RuntimeDataException(ErrorCode.ACTIVE_NOTIFICATION_HANDLER_IS_SUSPENDED);
}
- if (LOGGER.isEnabled(level)) {
- LOGGER.log(level, "unregisterListener was called for the entity {}", listener.getEntityId());
- }
+ LOGGER.debug("unregister listener for entity {}, state={}", listener.getEntityId(), listener.getState());
IActiveEntityEventsListener registeredListener = entityEventListeners.remove(listener.getEntityId());
if (registeredListener == null) {
throw new RuntimeDataException(ErrorCode.ACTIVE_ENTITY_LISTENER_IS_NOT_REGISTERED, listener.getEntityId());
@@ -229,9 +196,8 @@
LOGGER.info("Starting active recovery");
for (IActiveEntityEventsListener listener : getEventListeners()) {
synchronized (listener) {
- if (LOGGER.isEnabled(level)) {
- LOGGER.log(level, "Entity {} is {}", listener.getEntityId(), listener.getState());
- }
+ LOGGER.debug("entity {} is {}, active={}, suspended={}", listener.getEntityId(), listener.getState(),
+ listener.isActive(), listener.isSuspended());
listener.notifyAll();
}
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java
index 65d1039..6570041 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java
@@ -69,10 +69,8 @@
}
}
}
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("Sending CancelQueryResponse to {}. requestId:{}, uuid:{}, contextId:{}, status:{}", nodeId,
- requestId, uuid, contextId, status);
- }
+ LOGGER.debug("sending CancelQueryResponse to {}. reqId:{}, uuid:{}, contextId:{}, status:{}", nodeId, requestId,
+ uuid, contextId, status);
CancelQueryResponse response = new CancelQueryResponse(reqId, status);
CCMessageBroker messageBroker = (CCMessageBroker) appCtx.getServiceContext().getMessageBroker();
try {
@@ -82,4 +80,9 @@
}
}
+ @Override
+ public String toString() {
+ return "CancelQueryRequest{from='" + nodeId + "', reqId=" + reqId + ", uuid='" + uuid + "', contextId='"
+ + contextId + "'}";
+ }
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryResponse.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryResponse.java
index d65ae31..68d3430 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryResponse.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryResponse.java
@@ -49,4 +49,8 @@
return status;
}
+ @Override
+ public String toString() {
+ return "CancelQueryResponse{reqId=" + reqId + ", status=" + status + '}';
+ }
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/SleepDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/SleepDescriptor.java
index ac87f7e..ef348fe 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/SleepDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/SleepDescriptor.java
@@ -64,16 +64,16 @@
final long time = ATypeHierarchy.getLongValue(getIdentifier().getName(), 1, bytes, offset);
try {
- if (LOGGER.isInfoEnabled()) {
- LOGGER.log(Level.INFO,
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.log(Level.TRACE,
ctx.getTaskContext().getTaskAttemptId() + " sleeping for " + time + " ms");
}
Thread.sleep(time);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
- if (LOGGER.isInfoEnabled()) {
- LOGGER.log(Level.INFO,
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.log(Level.TRACE,
ctx.getTaskContext().getTaskAttemptId() + " done sleeping for " + time + " ms");
}
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java
index b123a5e..ae903d1 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java
@@ -77,6 +77,11 @@
ensureMaxCapacity();
}
+ @Override
+ public IReadOnlyClusterCapacity getClusterCapacity() {
+ return resourceManager.getCurrentCapacity();
+ }
+
private void ensureMaxCapacity() {
final IClusterCapacity currentCapacity = resourceManager.getCurrentCapacity();
final IReadOnlyClusterCapacity maximumCapacity = resourceManager.getMaximumCapacity();
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
index 9f4541f5..0c74260 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
@@ -74,7 +74,7 @@
}
protected void flushIfNotFailed() throws HyracksDataException {
- if (!failed && appender.getTupleCount() > 0) {
+ if (!failed && appender != null && appender.getTupleCount() > 0) {
flushAndReset();
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/DefaultJobCapacityController.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/DefaultJobCapacityController.java
index 9e38a20..b18bcb1 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/DefaultJobCapacityController.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/DefaultJobCapacityController.java
@@ -24,6 +24,11 @@
public class DefaultJobCapacityController implements IJobCapacityController {
public static final DefaultJobCapacityController INSTANCE = new DefaultJobCapacityController();
+ private static final IClusterCapacity CAPACITY = new ClusterCapacity();
+ static {
+ CAPACITY.setAggregatedCores(Integer.MAX_VALUE);
+ CAPACITY.setAggregatedMemoryByteSize(Long.MAX_VALUE);
+ }
private DefaultJobCapacityController() {
}
@@ -37,4 +42,9 @@
public void release(JobSpecification job) {
// No operation here.
}
+
+ @Override
+ public IReadOnlyClusterCapacity getClusterCapacity() {
+ return CAPACITY;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IJobCapacityController.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IJobCapacityController.java
index 5fa4bd9..f88baa2 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IJobCapacityController.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IJobCapacityController.java
@@ -57,4 +57,10 @@
*/
void release(JobSpecification job);
+ /**
+ * The cluster current capacity.
+ *
+ * @return the cluster current capacity.
+ */
+ IReadOnlyClusterCapacity getClusterCapacity();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
index 3574acd..9891850 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
@@ -539,7 +539,7 @@
private void abortTaskCluster(TaskClusterAttempt tcAttempt,
TaskClusterAttempt.TaskClusterStatus failedOrAbortedStatus) {
- LOGGER.trace(() -> "Aborting task cluster: " + tcAttempt.getAttempt());
+ LOGGER.trace("Aborting task cluster: {}", tcAttempt.getAttempt());
Set<TaskAttemptId> abortTaskIds = new HashSet<>();
Map<String, List<TaskAttemptId>> abortTaskAttemptMap = new HashMap<>();
for (TaskAttempt ta : tcAttempt.getTaskAttempts().values()) {
@@ -561,14 +561,12 @@
}
}
final JobId jobId = jobRun.getJobId();
- LOGGER.trace(() -> "Abort map for job: " + jobId + ": " + abortTaskAttemptMap);
+ LOGGER.trace("Abort map for job: {}: {}", jobId, abortTaskAttemptMap);
INodeManager nodeManager = ccs.getNodeManager();
abortTaskAttemptMap.forEach((key, abortTaskAttempts) -> {
final NodeControllerState node = nodeManager.getNodeControllerState(key);
if (node != null) {
- if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("Aborting: " + abortTaskAttempts + " at " + key);
- }
+ LOGGER.trace("Aborting: {} at {}", abortTaskAttempts, key);
try {
node.getNodeController().abortTasks(jobId, abortTaskAttempts);
} catch (Exception e) {
@@ -683,7 +681,6 @@
*/
public void notifyTaskFailure(TaskAttempt ta, List<Exception> exceptions) {
try {
- LOGGER.debug("Received failure notification for TaskAttempt " + ta.getTaskAttemptId());
TaskAttemptId taId = ta.getTaskAttemptId();
TaskCluster tc = ta.getTask().getTaskCluster();
TaskClusterAttempt lastAttempt = findLastTaskClusterAttempt(tc);
@@ -696,7 +693,7 @@
LOGGER.trace(() -> "Marking TaskAttempt " + ta.getTaskAttemptId()
+ " as failed and the number of max re-attempts = " + maxReattempts);
if (lastAttempt.getAttempt() >= maxReattempts || isCancelled()) {
- LOGGER.debug(() -> "Aborting the job of " + ta.getTaskAttemptId());
+ LOGGER.debug("Aborting the job:{} of {}", jobRun.getJobId(), ta.getTaskAttemptId());
abortJob(exceptions, NoOpCallback.INSTANCE);
return;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
index 4882f4a..ad97188 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
@@ -38,7 +38,9 @@
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.job.resource.IClusterCapacity;
import org.apache.hyracks.api.job.resource.IJobCapacityController;
+import org.apache.hyracks.api.job.resource.IReadOnlyClusterCapacity;
import org.apache.hyracks.api.util.ExceptionUtils;
import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.control.cc.NodeControllerState;
@@ -129,7 +131,7 @@
if (activeRunMap.containsKey(jobId)) {
JobRun jobRun = activeRunMap.get(jobId);
// The following call will abort all ongoing tasks and then consequently
- // trigger JobCleanupWork and JobCleanupNotificationWork which will update the lifecyle of the job.
+ // trigger JobCleanupWork and JobCleanupNotificationWork which will update the lifecycle of the job.
// Therefore, we do not remove the job out of activeRunMap here.
jobRun.getExecutor().cancelJob(callback);
return;
@@ -139,7 +141,7 @@
if (jobRun != null) {
List<Exception> exceptions =
Collections.singletonList(HyracksException.create(ErrorCode.JOB_CANCELED, jobId));
- // Since the job has not been executed, we only need to update its status and lifecyle here.
+ // Since the job has not been executed, we only need to update its status and lifecycle here.
jobRun.setStatus(JobStatus.FAILURE_BEFORE_EXECUTION, exceptions);
runMapArchive.put(jobId, jobRun);
runMapHistory.put(jobId, exceptions);
@@ -170,7 +172,6 @@
return;
}
if (run.getPendingStatus() != null) {
- LOGGER.warn("Ignoring duplicate cleanup for JobRun with id: {}", run::getJobId);
return;
}
Set<String> targetNodes = run.getParticipatingNodeIds();
@@ -313,6 +314,7 @@
run.setStartTime(System.currentTimeMillis());
run.setStartTimeZoneId(ZoneId.systemDefault().getId());
JobId jobId = run.getJobId();
+ logJobCapacity(run, "running", Level.DEBUG);
activeRunMap.put(jobId, run);
run.setStatus(JobStatus.RUNNING, null);
executeJobInternal(run);
@@ -320,6 +322,7 @@
// Queue a job when the required capacity for the job is not met.
private void queueJob(JobRun jobRun) throws HyracksException {
+ logJobCapacity(jobRun, "queueing", Level.INFO);
jobRun.setStatus(JobStatus.PENDING, null);
jobQueue.add(jobRun);
}
@@ -355,5 +358,23 @@
private void releaseJobCapacity(JobRun jobRun) {
final JobSpecification job = jobRun.getJobSpecification();
jobCapacityController.release(job);
+ logJobCapacity(jobRun, "released", Level.DEBUG);
+ }
+
+ private void logJobCapacity(JobRun jobRun, String jobStateDesc, Level lvl) {
+ IClusterCapacity requiredResources = jobRun.getJobSpecification().getRequiredClusterCapacity();
+ if (requiredResources == null) {
+ return;
+ }
+ long requiredMemory = requiredResources.getAggregatedMemoryByteSize();
+ int requiredCPUs = requiredResources.getAggregatedCores();
+ if (requiredMemory == 0 && requiredCPUs == 0) {
+ return;
+ }
+ IReadOnlyClusterCapacity clusterCapacity = jobCapacityController.getClusterCapacity();
+ LOGGER.log(lvl, "{} {}, memory={}, cpu={}, (new) cluster memory={}, cpu={}, currently running={}, queued={}",
+ jobStateDesc, jobRun.getJobId(), requiredMemory, requiredCPUs,
+ clusterCapacity.getAggregatedMemoryByteSize(), clusterCapacity.getAggregatedCores(),
+ getRunningJobsCount(), jobQueue.size());
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/partitions/PartitionMatchMaker.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/partitions/PartitionMatchMaker.java
index ac29b53..b5df593 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/partitions/PartitionMatchMaker.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/partitions/PartitionMatchMaker.java
@@ -174,8 +174,8 @@
}
public void removeUncommittedPartitions(Set<PartitionId> partitionIds, final Set<TaskAttemptId> taIds) {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("Removing uncommitted partitions: " + partitionIds);
+ if (partitionIds != null && !partitionIds.isEmpty()) {
+ LOGGER.debug("Removing uncommitted partitions {}", partitionIds);
}
IEntryFilter<PartitionDescriptor> filter = new IEntryFilter<PartitionDescriptor>() {
@Override
@@ -195,8 +195,8 @@
}
public void removePartitionRequests(Set<PartitionId> partitionIds, final Set<TaskAttemptId> taIds) {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("Removing partition requests: " + partitionIds);
+ if (partitionIds != null && !partitionIds.isEmpty()) {
+ LOGGER.debug("Removing partition requests {}", partitionIds);
}
IEntryFilter<PartitionRequest> filter = new IEntryFilter<PartitionRequest>() {
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
index 9f8a7e2..46dd351 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
@@ -45,7 +45,6 @@
import org.apache.hyracks.control.common.result.AbstractResultManager;
import org.apache.hyracks.control.common.result.ResultStateSweeper;
import org.apache.hyracks.control.common.work.IResultCallback;
-import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -79,9 +78,7 @@
@Override
public synchronized void notifyJobCreation(JobId jobId, JobSpecification spec) throws HyracksException {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug(getClass().getSimpleName() + " notified of new job " + jobId);
- }
+ LOGGER.debug("{} notified of new job {}", getClass().getSimpleName(), jobId);
if (jobResultLocations.get(jobId) != null) {
throw HyracksDataException.create(ErrorCode.MORE_THAN_ONE_RESULT, jobId);
}
@@ -157,15 +154,14 @@
@Override
public synchronized void reportJobFailure(JobId jobId, List<Exception> exceptions) {
- Exception ex = exceptions.isEmpty() ? null : exceptions.get(0);
- Level logLevel = Level.DEBUG;
- if (LOGGER.isEnabled(logLevel)) {
- LOGGER.log(logLevel, "job " + jobId + " failed and is being reported to " + getClass().getSimpleName(), ex);
- }
ResultJobRecord rjr = getResultJobRecord(jobId);
+ if (logFailure(rjr)) {
+ LOGGER.debug("job {} failed and is being reported to {}", jobId, getClass().getSimpleName());
+ }
if (rjr != null) {
rjr.fail(exceptions);
}
+ Exception ex = exceptions.isEmpty() ? null : exceptions.get(0);
final JobResultInfo jobResultInfo = jobResultLocations.get(jobId);
if (jobResultInfo != null) {
jobResultInfo.setException(ex);
@@ -211,6 +207,15 @@
}
}
+ private static boolean logFailure(ResultJobRecord rjr) {
+ if (rjr == null) {
+ return true;
+ }
+ // don't re-log if the state is already failed
+ ResultJobRecord.Status status = rjr.getStatus();
+ return status == null || status.getState() != State.FAILED;
+ }
+
/**
* Compares the records already known by the client for the given job's result set id with the records that the
* result directory service knows and if there are any newly discovered records returns a whole array with the
@@ -264,7 +269,7 @@
class JobResultInfo {
- private ResultJobRecord record;
+ private final ResultJobRecord record;
private Waiters waiters;
private Exception exception;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java
index 260c6b9..38277c2 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java
@@ -121,4 +121,9 @@
public void clear() {
jobListMap.clear();
}
+
+ @Override
+ public int size() {
+ return jobListMap.size();
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/IJobQueue.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/IJobQueue.java
index be40883..1f2c29a 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/IJobQueue.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/IJobQueue.java
@@ -73,4 +73,11 @@
* Clears the job queue
*/
void clear();
+
+ /**
+ * Returns the number of queued jobs.
+ *
+ * @return the number of queued jobs.
+ */
+ int size();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ApplicationMessageWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ApplicationMessageWork.java
index 771832e..6630ba7 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ApplicationMessageWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ApplicationMessageWork.java
@@ -25,6 +25,7 @@
import org.apache.hyracks.api.messages.IMessage;
import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.control.common.deployment.DeploymentUtils;
+import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -34,8 +35,8 @@
public class ApplicationMessageWork extends AbstractHeartbeatWork {
private static final Logger LOGGER = LogManager.getLogger();
- private byte[] message;
- private DeploymentId deploymentId;
+ private final byte[] message;
+ private final DeploymentId deploymentId;
public ApplicationMessageWork(ClusterControllerService ccs, byte[] message, DeploymentId deploymentId,
String nodeId) {
@@ -57,6 +58,11 @@
}
@Override
+ public Level logLevel() {
+ return Level.TRACE;
+ }
+
+ @Override
public String toString() {
return getName() + ": nodeID: " + nodeId;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeControllersInfoWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeControllersInfoWork.java
index c36b887..f08e209 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeControllersInfoWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeControllersInfoWork.java
@@ -24,10 +24,12 @@
import org.apache.hyracks.control.cc.cluster.INodeManager;
import org.apache.hyracks.control.common.work.AbstractWork;
import org.apache.hyracks.control.common.work.IResultCallback;
+import org.apache.logging.log4j.Level;
public class GetNodeControllersInfoWork extends AbstractWork {
+
private final INodeManager nodeManager;
- private IResultCallback<Map<String, NodeControllerInfo>> callback;
+ private final IResultCallback<Map<String, NodeControllerInfo>> callback;
public GetNodeControllersInfoWork(INodeManager nodeManager,
IResultCallback<Map<String, NodeControllerInfo>> callback) {
@@ -39,4 +41,9 @@
public void run() {
callback.setValue(nodeManager.getNodeControllerInfoMap());
}
+
+ @Override
+ public Level logLevel() {
+ return Level.TRACE;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetResultDirectoryAddressWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetResultDirectoryAddressWork.java
index bf95ff2..b7dbd75 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetResultDirectoryAddressWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetResultDirectoryAddressWork.java
@@ -22,8 +22,10 @@
import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.control.common.work.IResultCallback;
import org.apache.hyracks.control.common.work.SynchronizableWork;
+import org.apache.logging.log4j.Level;
public class GetResultDirectoryAddressWork extends SynchronizableWork {
+
private final ClusterControllerService ccs;
private final IResultCallback<NetworkAddress> callback;
@@ -42,4 +44,9 @@
callback.setException(e);
}
}
+
+ @Override
+ public Level logLevel() {
+ return Level.TRACE;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetResultPartitionLocationsWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetResultPartitionLocationsWork.java
index d1d2269..1e34b96 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetResultPartitionLocationsWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetResultPartitionLocationsWork.java
@@ -28,6 +28,7 @@
import org.apache.hyracks.control.cc.result.IResultDirectoryService;
import org.apache.hyracks.control.common.work.IResultCallback;
import org.apache.hyracks.control.common.work.SynchronizableWork;
+import org.apache.logging.log4j.Level;
public class GetResultPartitionLocationsWork extends SynchronizableWork {
private final ClusterControllerService ccs;
@@ -68,4 +69,9 @@
public String toString() {
return getName() + ": JobId@" + jobId + " ResultSetId@" + rsId + " Known@" + Arrays.toString(knownRecords);
}
+
+ @Override
+ public Level logLevel() {
+ return Level.TRACE;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java
index 77d2f82..6fe9909 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java
@@ -28,17 +28,19 @@
import org.apache.hyracks.control.cc.job.JobRun;
import org.apache.hyracks.control.common.work.AbstractWork;
import org.apache.hyracks.control.common.work.IResultCallback;
+import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class JobCleanupWork extends AbstractWork {
+
private static final Logger LOGGER = LogManager.getLogger();
- private IJobManager jobManager;
- private JobId jobId;
- private JobStatus status;
- private List<Exception> exceptions;
- private IResultCallback<Void> callback;
+ private final IJobManager jobManager;
+ private final JobId jobId;
+ private final JobStatus status;
+ private final List<Exception> exceptions;
+ private final IResultCallback<Void> callback;
public JobCleanupWork(IJobManager jobManager, JobId jobId, JobStatus status, List<Exception> exceptions,
IResultCallback<Void> callback) {
@@ -51,12 +53,10 @@
@Override
public void run() {
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info("Cleanup for job: {}", jobId);
- }
+ LOGGER.info("cleaning up {} on NCs, status={}", jobId, status);
final JobRun jobRun = jobManager.get(jobId);
if (jobRun == null) {
- LOGGER.debug("Ignoring cleanup for unknown job: {}", jobId);
+ LOGGER.debug("ignoring cleanup for unknown {}", jobId);
return;
}
try {
@@ -80,4 +80,9 @@
return getName() + ": JobId@" + jobId + " Status@" + status
+ (exceptions == null ? "" : " Exceptions@" + exceptions);
}
+
+ @Override
+ public Level logLevel() {
+ return Level.TRACE;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
index cfedfc9..7606dc9 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
@@ -35,6 +35,7 @@
import org.apache.hyracks.control.common.deployment.DeploymentUtils;
import org.apache.hyracks.control.common.work.IResultCallback;
import org.apache.hyracks.control.common.work.SynchronizableWork;
+import org.apache.logging.log4j.Level;
public class JobStartWork extends SynchronizableWork {
private final ClusterControllerService ccs;
@@ -85,4 +86,9 @@
callback.setException(e);
}
}
+
+ @Override
+ public Level logLevel() {
+ return Level.TRACE;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobletCleanupNotificationWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobletCleanupNotificationWork.java
index c3a09f9..76a72c0 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobletCleanupNotificationWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobletCleanupNotificationWork.java
@@ -35,6 +35,7 @@
import org.apache.logging.log4j.Logger;
public class JobletCleanupNotificationWork extends AbstractHeartbeatWork {
+
private static final Logger LOGGER = LogManager.getLogger();
private final JobId jobId;
@@ -46,6 +47,7 @@
@Override
public void runWork() {
+ LOGGER.debug("node {} finished job clean-up {}", nodeId, jobId);
IJobManager jobManager = ccs.getJobManager();
final JobRun run = jobManager.get(jobId);
if (run == null) {
@@ -82,4 +84,9 @@
public String toString() {
return getName() + " jobId:" + jobId + ", nodeId:" + nodeId;
}
+
+ @Override
+ public Level logLevel() {
+ return Level.TRACE;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
index ec21785..810fda2 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
@@ -31,6 +31,7 @@
import org.apache.hyracks.control.common.controllers.NodeRegistration;
import org.apache.hyracks.control.common.ipc.NodeControllerRemoteProxy;
import org.apache.hyracks.control.common.work.SynchronizableWork;
+import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -83,4 +84,9 @@
nc.sendRegistrationResult(params, e);
}
}
+
+ @Override
+ public Level logLevel() {
+ return Level.TRACE;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RemoveDeadNodesWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RemoveDeadNodesWork.java
index ee10669..9f740ef 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RemoveDeadNodesWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RemoveDeadNodesWork.java
@@ -50,9 +50,7 @@
Collection<JobId> affectedJobIds = result.getRight();
int size = affectedJobIds.size();
if (size > 0) {
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info("Number of affected jobs: " + size);
- }
+ LOGGER.info("number of affected jobs due to dead nodes removal {}", size);
IJobManager jobManager = ccs.getJobManager();
for (JobId jobId : affectedJobIds) {
JobRun run = jobManager.get(jobId);
@@ -71,6 +69,6 @@
@Override
public Level logLevel() {
- return Level.DEBUG;
+ return Level.TRACE;
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskCompleteWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskCompleteWork.java
index 0c53142..80dbd2e 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskCompleteWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskCompleteWork.java
@@ -29,8 +29,13 @@
import org.apache.hyracks.control.common.job.profiling.om.JobProfile;
import org.apache.hyracks.control.common.job.profiling.om.JobletProfile;
import org.apache.hyracks.control.common.job.profiling.om.TaskProfile;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
public class TaskCompleteWork extends AbstractTaskLifecycleWork {
+
+ private static final Logger LOGGER = LogManager.getLogger();
private final TaskProfile statistics;
public TaskCompleteWork(ClusterControllerService ccs, JobId jobId, TaskAttemptId taId, String nodeId,
@@ -46,6 +51,7 @@
if (run == null) {
return;
}
+ LOGGER.debug("node completed task {}:{}:{}", nodeId, jobId, taId);
if (statistics != null) {
JobProfile jobProfile = run.getJobProfile();
Map<String, JobletProfile> jobletProfiles = jobProfile.getJobletProfiles();
@@ -63,4 +69,9 @@
public String toString() {
return getName() + ": [" + nodeId + "[" + jobId + ":" + taId + "]";
}
+
+ @Override
+ public Level logLevel() {
+ return Level.TRACE;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java
index 833066e..48fd403 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java
@@ -22,17 +22,14 @@
import org.apache.hyracks.api.dataflow.TaskAttemptId;
import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.util.ExceptionUtils;
+import org.apache.hyracks.api.util.ErrorMessageUtil;
import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.control.cc.job.IJobManager;
import org.apache.hyracks.control.cc.job.JobRun;
import org.apache.hyracks.control.cc.job.TaskAttempt;
-import org.apache.logging.log4j.Level;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
public class TaskFailureWork extends AbstractTaskLifecycleWork {
- private static final Logger LOGGER = LogManager.getLogger();
+
private final List<Exception> exceptions;
public TaskFailureWork(ClusterControllerService ccs, JobId jobId, TaskAttemptId taId, String nodeId,
@@ -43,9 +40,6 @@
@Override
protected void performEvent(TaskAttempt ta) {
- Exception ex = exceptions.get(0);
- LOGGER.log(ExceptionUtils.causedByInterrupt(ex) ? Level.DEBUG : Level.WARN,
- "Executing task failure work for " + this, ex);
IJobManager jobManager = ccs.getJobManager();
JobRun run = jobManager.get(jobId);
if (run == null) {
@@ -57,6 +51,7 @@
@Override
public String toString() {
- return getName() + ": [" + jobId + ":" + taId + ":" + nodeId + "]";
+ return getName() + ": [" + jobId + ":" + taId + ":" + nodeId + "] "
+ + ErrorMessageUtil.getCauseMessage(exceptions.get(0));
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/WaitForJobCompletionWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/WaitForJobCompletionWork.java
index ed3e574..63d5340 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/WaitForJobCompletionWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/WaitForJobCompletionWork.java
@@ -29,6 +29,7 @@
import org.apache.hyracks.control.cc.job.JobRun;
import org.apache.hyracks.control.common.work.IResultCallback;
import org.apache.hyracks.control.common.work.SynchronizableWork;
+import org.apache.logging.log4j.Level;
public class WaitForJobCompletionWork extends SynchronizableWork {
private final ClusterControllerService ccs;
@@ -92,4 +93,9 @@
public String toString() {
return getName() + " jobId:" + jobId;
}
+
+ @Override
+ public Level logLevel() {
+ return Level.TRACE;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
index eee8950..e52e3ac 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
@@ -52,7 +52,6 @@
private boolean failed;
protected boolean flushRequest;
private boolean deallocated;
- private Level openCloseLevel = Level.DEBUG;
private Thread dataConsumerThread;
public MaterializingPipelinedPartition(IHyracksTaskContext ctx, PartitionManager manager, PartitionId pid,
@@ -181,9 +180,6 @@
@Override
public void open() throws HyracksDataException {
- if (LOGGER.isEnabled(openCloseLevel)) {
- LOGGER.log(openCloseLevel, "open(" + pid + " by " + taId);
- }
size = 0;
eos = false;
failed = false;
@@ -215,9 +211,6 @@
@Override
public void close() throws HyracksDataException {
- if (LOGGER.isEnabled(openCloseLevel)) {
- LOGGER.log(openCloseLevel, "close(" + pid + " by " + taId);
- }
if (writeHandle != null) {
ctx.getIoManager().close(writeHandle);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/ApplicationMessageWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/ApplicationMessageWork.java
index 6d4f173..f386a89 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/ApplicationMessageWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/ApplicationMessageWork.java
@@ -29,11 +29,12 @@
import org.apache.logging.log4j.Logger;
public class ApplicationMessageWork extends AbstractWork {
+
private static final Logger LOGGER = LogManager.getLogger();
- private byte[] message;
- private DeploymentId deploymentId;
- private String nodeId;
- private NodeControllerService ncs;
+ private final byte[] message;
+ private final DeploymentId deploymentId;
+ private final String nodeId;
+ private final NodeControllerService ncs;
public ApplicationMessageWork(NodeControllerService ncs, byte[] message, DeploymentId deploymentId, String nodeId) {
this.ncs = ncs;
@@ -59,6 +60,11 @@
}
@Override
+ public Level logLevel() {
+ return Level.TRACE;
+ }
+
+ @Override
public String toString() {
return getName() + ": nodeId: " + nodeId;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java
index 75edd38..2036d72 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java
@@ -25,10 +25,12 @@
import org.apache.hyracks.control.common.work.AbstractWork;
import org.apache.hyracks.control.nc.Joblet;
import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class CleanupJobletWork extends AbstractWork {
+
private static final Logger LOGGER = LogManager.getLogger();
private final NodeControllerService ncs;
@@ -45,7 +47,7 @@
@Override
public void run() {
- LOGGER.debug("cleaning up after job: {}", jobId);
+ LOGGER.debug("cleaning up {}, status:{}", jobId, status);
ncs.removeJobParameterByteStore(jobId);
ncs.getPartitionManager().jobCompleted(jobId, status);
Map<JobId, Joblet> jobletMap = ncs.getJobletMap();
@@ -59,4 +61,9 @@
public String toString() {
return getName() + " jobId:" + jobId + ", status:" + status;
}
+
+ @Override
+ public Level logLevel() {
+ return Level.TRACE;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskCompleteWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskCompleteWork.java
index 60860c5..52469dc 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskCompleteWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskCompleteWork.java
@@ -18,6 +18,8 @@
*/
package org.apache.hyracks.control.nc.work;
+import org.apache.hyracks.api.dataflow.TaskAttemptId;
+import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.control.common.job.profiling.om.TaskProfile;
import org.apache.hyracks.control.common.work.AbstractWork;
import org.apache.hyracks.control.nc.NodeControllerService;
@@ -38,13 +40,16 @@
@Override
public void run() {
- TaskProfile taskProfile = new TaskProfile(task.getTaskAttemptId(), task.getPartitionSendProfile(),
+ JobId jobId = task.getJoblet().getJobId();
+ TaskAttemptId taskAttemptId = task.getTaskAttemptId();
+ LOGGER.debug("notifying CC of task complete {}:{}", jobId, taskAttemptId);
+ TaskProfile taskProfile = new TaskProfile(taskAttemptId, task.getPartitionSendProfile(),
task.getStatsCollector(), task.getWarnings(), task.getWarningCollector().getTotalWarningsCount());
try {
- ncs.getClusterController(task.getJobletContext().getJobId().getCcId()).notifyTaskComplete(
- task.getJobletContext().getJobId(), task.getTaskAttemptId(), ncs.getId(), taskProfile);
+ ncs.getClusterController(task.getJobletContext().getJobId().getCcId())
+ .notifyTaskComplete(task.getJobletContext().getJobId(), taskAttemptId, ncs.getId(), taskProfile);
} catch (Exception e) {
- LOGGER.log(Level.ERROR, "Failed notifying task complete for " + task.getTaskAttemptId(), e);
+ LOGGER.log(Level.ERROR, "Failed notifying task complete for {}", taskAttemptId, e);
}
task.getJoblet().removeTask(task);
}
@@ -54,4 +59,9 @@
return getName() + ": [" + ncs.getId() + "[" + task.getJoblet().getJobId() + ":" + task.getTaskAttemptId()
+ "]";
}
+
+ @Override
+ public Level logLevel() {
+ return Level.TRACE;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java
index b0c60aa..cd79da7 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java
@@ -23,7 +23,7 @@
import org.apache.hyracks.api.dataflow.TaskAttemptId;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.result.IResultPartitionManager;
-import org.apache.hyracks.api.util.ExceptionUtils;
+import org.apache.hyracks.api.util.ErrorMessageUtil;
import org.apache.hyracks.control.common.work.AbstractWork;
import org.apache.hyracks.control.nc.NodeControllerService;
import org.apache.hyracks.control.nc.Task;
@@ -50,9 +50,6 @@
@Override
public void run() {
- Exception ex = exceptions.get(0);
- LOGGER.log(ExceptionUtils.causedByInterrupt(ex) ? Level.DEBUG : Level.WARN, "task " + taskId + " has failed",
- ex);
try {
IResultPartitionManager resultPartitionManager = ncs.getResultPartitionManager();
if (resultPartitionManager != null) {
@@ -69,6 +66,8 @@
@Override
public String toString() {
- return getName() + ": [" + ncs.getId() + "[" + jobId + ":" + taskId + "]";
+ return getName() + ": [" + ncs.getId() + "[" + jobId + ":" + taskId + "]"
+ + ((exceptions != null && !exceptions.isEmpty())
+ ? " " + ErrorMessageUtil.getCauseMessage(exceptions.get(0)) : "");
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
index f277046..dd4a956 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
@@ -318,6 +318,6 @@
@Override
public String toString() {
- return getName() + " jobId:" + jobId;
+ return getName() + " jobId:" + jobId + " tasks:" + taskDescriptors.size();
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
index 555e8fb..0f31491 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
@@ -319,9 +319,6 @@
if (!failed) {
state.hybridHJ.closeBuild();
ctx.setStateObject(state);
- if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("OptimizedHybridHashJoin closed its build phase");
- }
} else {
state.hybridHJ.clearBuildTempFiles();
}
@@ -402,10 +399,6 @@
writer.open();
state.hybridHJ.initProbe(probComp);
-
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("OptimizedHybridHashJoin is starting the probe phase.");
- }
}
@Override
@@ -416,7 +409,7 @@
@Override
public void fail() throws HyracksDataException {
failed = true;
- if (state.hybridHJ != null) {
+ if (state != null && state.hybridHJ != null) {
state.hybridHJ.fail();
}
writer.fail();
@@ -427,12 +420,13 @@
if (failed) {
try {
// Clear temp files if fail() was called.
- state.hybridHJ.clearBuildTempFiles();
- state.hybridHJ.clearProbeTempFiles();
+ if (state != null && state.hybridHJ != null) {
+ state.hybridHJ.clearBuildTempFiles();
+ state.hybridHJ.clearProbeTempFiles();
+ }
} finally {
writer.close(); // writer should always be closed.
}
- logProbeComplete();
return;
}
try {
@@ -477,17 +471,7 @@
// Re-throw the whatever is caught.
throw e;
} finally {
- try {
- logProbeComplete();
- } finally {
- writer.close();
- }
- }
- }
-
- private void logProbeComplete() {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("OptimizedHybridHashJoin closed its probe phase");
+ writer.close();
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TupleSorterHeapSort.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TupleSorterHeapSort.java
index 08f15b3..a1704ec 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TupleSorterHeapSort.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TupleSorterHeapSort.java
@@ -41,13 +41,9 @@
import org.apache.hyracks.dataflow.std.structures.IResetableComparableFactory;
import org.apache.hyracks.dataflow.std.structures.MaxHeap;
import org.apache.hyracks.dataflow.std.structures.TuplePointer;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
public class TupleSorterHeapSort implements ITupleSorter {
- private static final Logger LOGGER = LogManager.getLogger();
-
class HeapEntryFactory implements IResetableComparableFactory<HeapEntry> {
@Override
public IResetableComparable<HeapEntry> createResetableComparable() {
@@ -288,7 +284,6 @@
int maxFrameSize = outputFrame.getFrameSize();
int numEntries = heap.getNumEntries();
IResetableComparable[] entries = heap.getEntries();
- int io = 0;
for (int i = 0; i < numEntries; i++) {
HeapEntry minEntry = (HeapEntry) entries[i];
bufferAccessor1.reset(minEntry.tuplePointer);
@@ -296,14 +291,10 @@
bufferAccessor1.getTupleStartOffset(), bufferAccessor1.getTupleLength());
if (flushed > 0) {
maxFrameSize = Math.max(maxFrameSize, flushed);
- io++;
}
}
maxFrameSize = Math.max(maxFrameSize, outputFrame.getFrameSize());
outputAppender.write(writer, true);
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info("Flushed records:" + numEntries + "; Flushed through " + (io + 1) + " frames");
- }
return maxFrameSize;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
index be22b9c..7a75a0f 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
@@ -34,7 +34,9 @@
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.job.resource.ClusterCapacity;
import org.apache.hyracks.api.job.resource.IJobCapacityController;
+import org.apache.hyracks.api.job.resource.IReadOnlyClusterCapacity;
import org.apache.hyracks.api.result.IResultSet;
import org.apache.hyracks.api.result.IResultSetReader;
import org.apache.hyracks.client.result.ResultSet;
@@ -254,6 +256,14 @@
public void release(JobSpecification job) {
}
+
+ @Override
+ public IReadOnlyClusterCapacity getClusterCapacity() {
+ ClusterCapacity clusterCapacity = new ClusterCapacity();
+ clusterCapacity.setAggregatedMemoryByteSize(maxRAM);
+ clusterCapacity.setAggregatedCores(Integer.MAX_VALUE);
+ return clusterCapacity;
+ }
};
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/ComponentUtils.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/ComponentUtils.java
index 1ff9fa8..842ec61 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/ComponentUtils.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/ComponentUtils.java
@@ -34,7 +34,6 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
import org.apache.hyracks.storage.common.buffercache.IPageWriteFailureCallback;
-import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -80,11 +79,7 @@
*/
public static void get(ILSMIndex index, IValueReference key, ArrayBackedValueStorage value)
throws HyracksDataException {
- boolean loggable = LOGGER.isDebugEnabled();
value.reset();
- if (loggable) {
- LOGGER.log(Level.DEBUG, "Getting " + key + " from index " + index);
- }
// Lock the opTracker to ensure index components don't change
synchronized (index.getOperationTracker()) {
ILSMMemoryComponent cmc = index.getCurrentMemoryComponent();
@@ -92,33 +87,17 @@
index.getCurrentMemoryComponent().getMetadata().get(key, value);
}
if (value.getLength() == 0) {
- if (loggable) {
- LOGGER.log(Level.DEBUG, key + " was not found in mutable memory component of " + index);
- }
- // was not found in the in current mutable component, search in the other in memory components
+ // was not found in the in current mutable component, search in the other in-memory components
fromImmutableMemoryComponents(index, key, value);
if (value.getLength() == 0) {
- if (loggable) {
- LOGGER.log(Level.DEBUG, key + " was not found in all immmutable memory components of " + index);
- }
- // was not found in the in all in memory components, search in the disk components
+ // was not found in all in-memory components, search in the disk components
fromDiskComponents(index, key, value);
- if (loggable) {
- if (value.getLength() == 0) {
- LOGGER.log(Level.DEBUG, key + " was not found in all disk components of " + index);
- } else {
- LOGGER.log(Level.DEBUG, key + " was found in disk components of " + index);
- }
- }
- } else {
- if (loggable) {
- LOGGER.log(Level.DEBUG, key + " was found in the immutable memory components of " + index);
+ if (value.getLength() == 0) {
+ LOGGER.debug("{} was NOT found", key);
}
}
} else {
- if (loggable) {
- LOGGER.log(Level.DEBUG, key + " was found in mutable memory component of " + index);
- }
+ LOGGER.debug("{} was found in mutable memory component {}", key, cmc);
}
}
}
@@ -143,17 +122,11 @@
private static void fromDiskComponents(ILSMIndex index, IValueReference key, ArrayBackedValueStorage value)
throws HyracksDataException {
- boolean loggable = LOGGER.isDebugEnabled();
- if (loggable) {
- LOGGER.log(Level.DEBUG, "Getting " + key + " from disk components of " + index);
- }
for (ILSMDiskComponent c : index.getDiskComponents()) {
- if (loggable) {
- LOGGER.log(Level.DEBUG, "Getting " + key + " from disk components " + c);
- }
c.getMetadata().get(key, value);
if (value.getLength() != 0) {
// Found
+ LOGGER.debug("{} was found in disk component {}", key, c);
return;
}
}
@@ -161,21 +134,10 @@
private static void fromImmutableMemoryComponents(ILSMIndex index, IValueReference key,
ArrayBackedValueStorage value) throws HyracksDataException {
- boolean loggable = LOGGER.isDebugEnabled();
- if (loggable) {
- LOGGER.log(Level.DEBUG, "Getting " + key + " from immutable memory components of " + index);
- }
List<ILSMMemoryComponent> memComponents = index.getMemoryComponents();
int numOtherMemComponents = memComponents.size() - 1;
int next = index.getCurrentMemoryComponentIndex();
- if (loggable) {
- LOGGER.log(Level.DEBUG, index + " has " + numOtherMemComponents + " immutable memory components");
- }
for (int i = 0; i < numOtherMemComponents; i++) {
- if (loggable) {
- LOGGER.log(Level.DEBUG,
- "trying to get " + key + " from immutable memory components number: " + (i + 1));
- }
next = next - 1;
if (next < 0) {
next = memComponents.size() - 1;
@@ -185,6 +147,7 @@
c.getMetadata().get(key, value);
if (value.getLength() != 0) {
// Found
+ LOGGER.debug("{} was found in immutable memory component {}", key, c);
return;
}
}