Merge branch 'gerrit/trinity' into 'master'
Change-Id: I869ea2df850ce020a23e7c4beaf61aebda6ef373
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveEvent.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveEvent.java
index d6d9b96..279bba1 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveEvent.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveEvent.java
@@ -20,6 +20,7 @@
import java.util.Objects;
+import org.apache.asterix.active.message.ActivePartitionMessage;
import org.apache.hyracks.api.job.JobId;
public class ActiveEvent {
@@ -65,7 +66,12 @@
@Override
public String toString() {
- return "JobId:" + jobId + "," + "EntityId:" + entityId + ", " + "Kind" + eventKind;
+ String kindDesc = "";
+ if (eventObject instanceof ActivePartitionMessage) {
+ ActivePartitionMessage partitionEvent = (ActivePartitionMessage) eventObject;
+ kindDesc = '-' + String.valueOf(partitionEvent.getEvent()) + '(' + partitionEvent.getDesc() + ')';
+ }
+ return jobId + ", " + "EntityId:" + entityId + ", " + "Kind:" + eventKind + kindDesc;
}
@Override
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
index b99e4f2..636279c 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
@@ -103,8 +103,8 @@
return ActiveManager.class.getSimpleName() + "[" + nodeId + "]";
}
- public void submit(ActiveManagerMessage message) throws HyracksDataException {
- LOGGER.debug("Message of type {} received in {}", message.getKind(), nodeId);
+ public void handle(ActiveManagerMessage message) throws HyracksDataException {
+ LOGGER.debug("NC handling {}({})({})", message.getKind(), message.getRuntimeId(), message.getDesc());
switch (message.getKind()) {
case STOP_ACTIVITY:
stopRuntime(message);
@@ -125,7 +125,7 @@
ActiveRuntimeId runtimeId = message.getRuntimeId();
IActiveRuntime runtime = runtimes.get(runtimeId);
if (runtime == null) {
- LOGGER.warn("Request for a runtime {} that is not registered {}", runtimeId, message);
+ LOGGER.warn("not handling {} for a runtime {} that is not registered", message, runtimeId);
return;
}
runtime.handleGenericEvent(message);
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
index 453ffa0..9123503 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
@@ -27,7 +27,6 @@
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
-import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -98,14 +97,14 @@
try {
// notify cc that runtime has been registered
ctx.sendApplicationMessageToCC(new ActivePartitionMessage(runtimeId, ctx.getJobletContext().getJobId(),
- Event.RUNTIME_REGISTERED, null), null);
+ Event.RUNTIME_REGISTERED, null, ""), null);
start();
} catch (InterruptedException e) {
- LOGGER.log(Level.INFO, "initialize() interrupted on ActiveSourceOperatorNodePushable", e);
+ LOGGER.info("ingestion op interrupted", e);
Thread.currentThread().interrupt();
throw HyracksDataException.create(e);
} catch (Exception e) {
- LOGGER.log(Level.INFO, "initialize() failed on ActiveSourceOperatorNodePushable", e);
+ logIngestionFailure(e);
throw HyracksDataException.create(e);
} finally {
synchronized (this) {
@@ -121,12 +120,12 @@
activeManager.deregisterRuntime(runtimeId);
try {
ctx.sendApplicationMessageToCC(new ActivePartitionMessage(runtimeId, ctx.getJobletContext().getJobId(),
- Event.RUNTIME_DEREGISTERED, null), null);
+ Event.RUNTIME_DEREGISTERED, null, ""), null);
} catch (Exception e) {
- LOGGER.log(Level.INFO, "deinitialize() failed on ActiveSourceOperatorNodePushable", e);
+ LOGGER.info("ingestion op stopped w/ failure", e);
throw HyracksDataException.create(e);
} finally {
- LOGGER.log(Level.INFO, "deinitialize() returning on ActiveSourceOperatorNodePushable");
+ LOGGER.info("ingestion op stopped");
}
}
@@ -134,4 +133,12 @@
public final IFrameWriter getInputFrameWriter(int index) {
return null;
}
+
+ private void logIngestionFailure(Exception e) {
+ if (e.getCause() instanceof InterruptedException) {
+ LOGGER.info("ingestion op interrupted", e);
+ } else {
+ LOGGER.info("ingestion op failed", e);
+ }
+ }
}
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
index bad3f79..692bbb9 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
@@ -34,15 +34,17 @@
GENERIC_EVENT
}
- private static final long serialVersionUID = 2L;
+ private static final long serialVersionUID = 3L;
private final Kind kind;
private final ActiveRuntimeId runtimeId;
private final Serializable payload;
+ private final String desc;
- public ActiveManagerMessage(Kind kind, ActiveRuntimeId runtimeId, Serializable payload) {
+ public ActiveManagerMessage(Kind kind, ActiveRuntimeId runtimeId, Serializable payload, String desc) {
this.kind = kind;
this.runtimeId = runtimeId;
this.payload = payload;
+ this.desc = desc;
}
public Serializable getPayload() {
@@ -57,13 +59,18 @@
return kind;
}
+ public String getDesc() {
+ return desc;
+ }
+
@Override
public void handle(INcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
- ((ActiveManager) appCtx.getActiveManager()).submit(this);
+ ((ActiveManager) appCtx.getActiveManager()).handle(this);
}
@Override
public String toString() {
- return getClass().getSimpleName() + "{" + "kind=" + kind + ", runtimeId=" + runtimeId + '}';
+ return getClass().getSimpleName() + "{kind=" + kind + ", runtimeId=" + runtimeId
+ + (desc != null && !desc.isEmpty() ? ", desc=" + desc : "") + '}';
}
}
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
index cb9c61b..2955271 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
@@ -29,23 +29,27 @@
import org.apache.hyracks.api.job.JobId;
public class ActivePartitionMessage implements ICcAddressedMessage {
+
public enum Event {
RUNTIME_REGISTERED,
RUNTIME_DEREGISTERED,
GENERIC_EVENT
}
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 2L;
private final ActiveRuntimeId activeRuntimeId;
private final JobId jobId;
private final Serializable payload;
+ private final String desc;
private final Event event;
- public ActivePartitionMessage(ActiveRuntimeId activeRuntimeId, JobId jobId, Event event, Serializable payload) {
+ public ActivePartitionMessage(ActiveRuntimeId activeRuntimeId, JobId jobId, Event event, Serializable payload,
+ String desc) {
this.activeRuntimeId = activeRuntimeId;
this.jobId = jobId;
this.event = event;
this.payload = payload;
+ this.desc = desc;
}
public ActiveRuntimeId getActiveRuntimeId() {
@@ -64,6 +68,10 @@
return event;
}
+ public String getDesc() {
+ return desc;
+ }
+
@Override
public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
IActiveNotificationHandler activeListener = (IActiveNotificationHandler) appCtx.getActiveNotificationHandler();
@@ -72,7 +80,7 @@
@Override
public String toString() {
- return activeRuntimeId + ":" + ActivePartitionMessage.class.getSimpleName() + '-' + event;
+ return activeRuntimeId + ":" + ActivePartitionMessage.class.getSimpleName() + '-' + event + '(' + desc + ')';
}
@Override
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveStatsRequestMessage.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveStatsRequestMessage.java
index 94668a0..2e5a571 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveStatsRequestMessage.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveStatsRequestMessage.java
@@ -25,7 +25,7 @@
private final long reqId;
public ActiveStatsRequestMessage(ActiveRuntimeId runtimeId, long reqId) {
- super(Kind.REQUEST_STATS, runtimeId, null);
+ super(Kind.REQUEST_STATS, runtimeId, null, "");
this.reqId = reqId;
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RebalanceApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RebalanceApiServlet.java
index c730dd0..0977e46 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RebalanceApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RebalanceApiServlet.java
@@ -273,7 +273,7 @@
try {
ActiveNotificationHandler activeNotificationHandler =
(ActiveNotificationHandler) appCtx.getActiveNotificationHandler();
- activeNotificationHandler.suspend(metadataProvider);
+ activeNotificationHandler.suspend(metadataProvider, "rebalance api");
try {
IMetadataLockManager lockManager = appCtx.getMetadataLockManager();
lockManager.acquireDatasetExclusiveModificationLock(metadataProvider.getLocks(), database,
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
index e8926b8..697cf21 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
@@ -138,7 +138,7 @@
}
protected synchronized void setState(ActivityState newState) {
- LOGGER.log(level, "State of {} is being set to {} from {}", getEntityId(), newState, state);
+ LOGGER.log(level, "state of {} is being set from {} to {}", getEntityId(), state, newState);
this.prevState = state;
this.state = newState;
if (newState == ActivityState.STARTING || newState == ActivityState.RECOVERING
@@ -153,9 +153,8 @@
@Override
public synchronized void notify(ActiveEvent event) {
try {
- if (LOGGER.isEnabled(level)) {
- LOGGER.log(level, "EventListener is notified.");
- }
+ LOGGER.debug("CC handling event {}; state={}, prev state={}, suspended={}", event, state, prevState,
+ suspended);
ActiveEvent.Kind eventKind = event.getEventKind();
switch (eventKind) {
case JOB_CREATED:
@@ -194,26 +193,21 @@
@SuppressWarnings("unchecked")
protected void finish(ActiveEvent event) throws HyracksDataException {
- if (LOGGER.isEnabled(level)) {
- LOGGER.log(level, "Active job {} finished", jobId);
- }
JobId lastJobId = jobId;
+ Pair<JobStatus, List<Exception>> status = (Pair<JobStatus, List<Exception>>) event.getEventObject();
if (numRegistered != numDeRegistered) {
LOGGER.log(Level.WARN,
- "Active job {} finished with reported runtime registrations = {} and deregistrations = {}", jobId,
- numRegistered, numDeRegistered);
+ "ingestion job {} finished with status={}, reported runtime registrations={}, deregistrations={}",
+ jobId, status, numRegistered, numDeRegistered);
}
jobId = null;
- Pair<JobStatus, List<Exception>> status = (Pair<JobStatus, List<Exception>>) event.getEventObject();
JobStatus jobStatus = status.getLeft();
List<Exception> exceptions = status.getRight();
- if (LOGGER.isEnabled(level)) {
- LOGGER.log(level, "Active job {} finished with status {}", lastJobId, jobStatus);
- }
+ LOGGER.debug("ingestion job {} finished with status {}", lastJobId, jobStatus);
if (!jobSuccessfullyTerminated(jobStatus)) {
jobFailure = exceptions.isEmpty() ? new RuntimeDataException(ErrorCode.UNREPORTED_TASK_FAILURE_EXCEPTION)
: exceptions.get(0);
- LOGGER.error("Active Job {} failed", lastJobId, jobFailure);
+ LOGGER.error("ingestion job {} failed", lastJobId, jobFailure);
setState((state == ActivityState.STOPPING || state == ActivityState.CANCELLING) ? ActivityState.STOPPED
: ActivityState.TEMPORARILY_FAILED);
if (prevState == ActivityState.RUNNING) {
@@ -371,16 +365,14 @@
@Override
public synchronized void recover() {
- if (LOGGER.isEnabled(level)) {
- LOGGER.log(level, "Recover is called on {}", entityId);
- }
if (retryPolicyFactory == NoRetryPolicyFactory.INSTANCE) {
- LOGGER.log(level, "But it has no recovery policy, so it is set to permanent failure");
+ LOGGER.debug("recover is called on {} w/o recovery policy; setting to permanent failure", entityId);
setState(ActivityState.STOPPED);
} else {
+ LOGGER.debug("recover is called on {}", entityId);
ExecutorService executor = appCtx.getServiceContext().getControllerService().getExecutor();
setState(ActivityState.TEMPORARILY_FAILED);
- LOGGER.log(level, "Recovery task has been submitted");
+ LOGGER.debug("recovery task has been submitted");
rt = createRecoveryTask();
executor.submit(rt.recover());
}
@@ -479,15 +471,11 @@
// Note: once we start sending stop messages, we can't go back until the entity is stopped
final String nameBefore = Thread.currentThread().getName();
try {
- Thread.currentThread().setName(nameBefore + " : WaitForCompletionForJobId: " + jobId);
+ Thread.currentThread().setName(nameBefore + " : wait-for-ingestion-completion: " + jobId);
sendStopMessages(metadataProvider, timeout, unit);
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("Waiting for its state to become " + waitFor);
- }
+ LOGGER.debug("waiting for {} to become {}", jobId, waitFor);
subscriber.sync();
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("Disconnect has been completed " + waitFor);
- }
+ LOGGER.debug("disconnect has been completed {}", waitFor);
} catch (InterruptedException ie) {
forceStop(subscriber, ie);
Thread.currentThread().interrupt();
@@ -517,12 +505,9 @@
LOGGER.log(Level.INFO, "Sending stop messages to " + runtimeLocations);
}
for (String location : runtimeLocations.getLocations()) {
- if (LOGGER.isInfoEnabled()) {
- LOGGER.log(Level.INFO, "Sending to " + location);
- }
ActiveRuntimeId runtimeId = getActiveRuntimeId(partition++);
messageBroker.sendApplicationMessageToNC(new ActiveManagerMessage(ActiveManagerMessage.Kind.STOP_ACTIVITY,
- runtimeId, new StopRuntimeParameters(timeout, unit)), location);
+ runtimeId, new StopRuntimeParameters(timeout, unit), ""), location);
}
}
@@ -737,7 +722,7 @@
@Override
public String toString() {
- return "{\"class\":\"" + getClass().getSimpleName() + "\"," + "\"entityId\":\"" + entityId + "\","
- + "\"state\":\"" + state + "\"" + "}";
+ return "{\"class\":\"" + getClass().getSimpleName() + "\", \"entityId\":\"" + entityId + "\", \"state\":\""
+ + state + "\", \"prev state\":\"" + prevState + "\", \"suspended\":" + suspended + "}";
}
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
index 3c277d5..8821c67 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
@@ -72,19 +72,17 @@
EntityId entityId = jobId2EntityId.get(jobId);
if (entityId != null) {
IActiveEntityEventsListener listener = entityEventListeners.get(entityId);
- if (LOGGER.isEnabled(level)) {
- LOGGER.log(level, "Next event is {} for job {}", eventKind, jobId);
- }
if (eventKind == Kind.JOB_FINISHED) {
- LOGGER.log(level, "Removing job {}", jobId);
+ LOGGER.debug("removing ingestion job {}", jobId);
jobId2EntityId.remove(jobId);
}
if (listener != null) {
- LOGGER.log(level, "Notifying the listener");
listener.notify(event);
+ } else {
+ LOGGER.debug("listener not found for entity {} on event={}", entityId, event);
}
} else {
- LOGGER.log(Level.ERROR, "Entity not found for event {} for job {}", eventKind, jobId);
+ LOGGER.error("entity not found for event {}", event);
}
}
@@ -92,45 +90,30 @@
@Override
public void notifyJobCreation(JobId jobId, JobSpecification jobSpecification) throws HyracksDataException {
- if (LOGGER.isEnabled(level)) {
- LOGGER.log(level, "notifyJobCreation was called for job {}", jobId);
- }
Object property = jobSpecification.getProperty(ACTIVE_ENTITY_PROPERTY_NAME);
if (!(property instanceof EntityId)) {
- if (LOGGER.isEnabled(level)) {
- LOGGER.log(level, "Job {} is not of type active job. property found to be {}", jobId, property);
+ if (property != null) {
+ LOGGER.debug("{} is not an active job. job property={}", jobId, property);
}
return;
}
+ LOGGER.debug("notified of ingestion job creation {}", jobId);
EntityId entityId = (EntityId) property;
monitorJob(jobId, entityId);
- boolean found = jobId2EntityId.get(jobId) != null;
- LOGGER.log(level, "Job {} was found to be {}", jobId, (found ? "Active" : "Inactive"));
add(new ActiveEvent(jobId, Kind.JOB_CREATED, entityId, jobSpecification));
}
private synchronized void monitorJob(JobId jobId, EntityId entityId) {
- if (LOGGER.isEnabled(level)) {
- LOGGER.log(level, "monitorJob was called for job {}", jobId);
- }
- boolean found = jobId2EntityId.get(jobId) != null;
- if (LOGGER.isEnabled(level)) {
- LOGGER.log(level, "Job {} was found to be {}", jobId, (found ? "Active" : "Inactive"));
- }
+ boolean found = jobId2EntityId.containsKey(jobId);
+ LOGGER.debug("{} is {}", jobId, (found ? "active" : "inactive"));
if (entityEventListeners.containsKey(entityId)) {
- if (jobId2EntityId.containsKey(jobId)) {
- if (LOGGER.isErrorEnabled()) {
- LOGGER.error("Job {} is already being monitored", jobId);
- }
+ if (found) {
+ LOGGER.error("{} is already being monitored", jobId);
return;
}
- if (LOGGER.isEnabled(level)) {
- LOGGER.log(level, "Monitoring started for job {}", jobId);
- }
+ LOGGER.debug("monitoring started for {}", jobId);
} else {
- if (LOGGER.isEnabled(level)) {
- LOGGER.info("No listener was found for the entity {} for job {}", entityId, jobId);
- }
+ LOGGER.debug("no listener found for entity {}; {}", entityId, jobId);
}
jobId2EntityId.put(jobId, entityId);
}
@@ -141,22 +124,18 @@
if (entityId != null) {
add(new ActiveEvent(jobId, Kind.JOB_STARTED, entityId, null));
}
+ // else must be non-active job, e.g. a job for a query
}
@Override
public synchronized void notifyJobFinish(JobId jobId, JobStatus jobStatus, List<Exception> exceptions)
throws HyracksException {
- if (LOGGER.isEnabled(level)) {
- LOGGER.log(level, "Getting notified of job finish for job {}", jobId);
- }
EntityId entityId = jobId2EntityId.get(jobId);
if (entityId != null) {
+ LOGGER.debug("notified of ingestion job finish {}", jobId);
add(new ActiveEvent(jobId, Kind.JOB_FINISHED, entityId, Pair.of(jobStatus, exceptions)));
- } else {
- if (LOGGER.isEnabled(level)) {
- LOGGER.log(level, "No need to notify JOB_FINISHED for job {}", jobId);
- }
}
+ // else must be non-active job, e.g. a job for a query
}
// *** IActiveNotificationHandler
@@ -169,13 +148,6 @@
@Override
public IActiveEntityEventsListener getListener(EntityId entityId) {
- if (LOGGER.isEnabled(level)) {
- LOGGER.log(level, "getActiveEntityListener was called with entity {}", entityId);
- }
- IActiveEntityEventsListener listener = entityEventListeners.get(entityId);
- if (LOGGER.isEnabled(level)) {
- LOGGER.log(level, "Listener found: {}", listener);
- }
return entityEventListeners.get(entityId);
}
@@ -197,9 +169,7 @@
if (suspended) {
throw new RuntimeDataException(ErrorCode.ACTIVE_NOTIFICATION_HANDLER_IS_SUSPENDED);
}
- if (LOGGER.isEnabled(level)) {
- LOGGER.log(level, "registerListener was called for the entity {}", listener.getEntityId());
- }
+ LOGGER.debug("register listener for entity {}, state={}", listener.getEntityId(), listener.getState());
if (entityEventListeners.containsKey(listener.getEntityId())) {
throw new RuntimeDataException(ErrorCode.ACTIVE_ENTITY_IS_ALREADY_REGISTERED, listener.getEntityId());
}
@@ -211,9 +181,7 @@
if (suspended) {
throw new RuntimeDataException(ErrorCode.ACTIVE_NOTIFICATION_HANDLER_IS_SUSPENDED);
}
- if (LOGGER.isEnabled(level)) {
- LOGGER.log(level, "unregisterListener was called for the entity {}", listener.getEntityId());
- }
+ LOGGER.debug("unregister listener for entity {}, state={}", listener.getEntityId(), listener.getState());
IActiveEntityEventsListener registeredListener = entityEventListeners.remove(listener.getEntityId());
if (registeredListener == null) {
throw new RuntimeDataException(ErrorCode.ACTIVE_ENTITY_LISTENER_IS_NOT_REGISTERED, listener.getEntityId());
@@ -229,20 +197,19 @@
LOGGER.info("Starting active recovery");
for (IActiveEntityEventsListener listener : getEventListeners()) {
synchronized (listener) {
- if (LOGGER.isEnabled(level)) {
- LOGGER.log(level, "Entity {} is {}", listener.getEntityId(), listener.getState());
- }
+ LOGGER.debug("entity {} is {}, active={}, suspended={}", listener.getEntityId(), listener.getState(),
+ listener.isActive(), listener.isSuspended());
listener.notifyAll();
}
}
}
- public void suspend(MetadataProvider mdProvider) throws HyracksDataException {
+ public void suspend(MetadataProvider mdProvider, String reason) throws HyracksDataException {
synchronized (this) {
if (suspended) {
throw new RuntimeDataException(ErrorCode.ACTIVE_EVENT_HANDLER_ALREADY_SUSPENDED);
}
- LOGGER.log(level, "Suspending active events handler");
+ LOGGER.debug("suspending active events handler. reason {}", reason);
suspended = true;
}
Collection<IActiveEntityEventsListener> registeredListeners = entityEventListeners.values();
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java
index 65d1039..6570041 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java
@@ -69,10 +69,8 @@
}
}
}
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("Sending CancelQueryResponse to {}. requestId:{}, uuid:{}, contextId:{}, status:{}", nodeId,
- requestId, uuid, contextId, status);
- }
+ LOGGER.debug("sending CancelQueryResponse to {}. reqId:{}, uuid:{}, contextId:{}, status:{}", nodeId, requestId,
+ uuid, contextId, status);
CancelQueryResponse response = new CancelQueryResponse(reqId, status);
CCMessageBroker messageBroker = (CCMessageBroker) appCtx.getServiceContext().getMessageBroker();
try {
@@ -82,4 +80,9 @@
}
}
+ @Override
+ public String toString() {
+ return "CancelQueryRequest{from='" + nodeId + "', reqId=" + reqId + ", uuid='" + uuid + "', contextId='"
+ + contextId + "'}";
+ }
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryResponse.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryResponse.java
index d65ae31..68d3430 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryResponse.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryResponse.java
@@ -49,4 +49,8 @@
return status;
}
+ @Override
+ public String toString() {
+ return "CancelQueryResponse{reqId=" + reqId + ", status=" + status + '}';
+ }
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
index ed81dd8..0cdbf95 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
@@ -150,7 +150,8 @@
ILangCompilationProvider compilationProvider = ccExtMgr.getCompilationProvider(lang);
IStorageComponentProvider storageComponentProvider = ccAppCtx.getStorageComponentProvider();
IStatementExecutorFactory statementExecutorFactory = ccApp.getStatementExecutorFactory();
- ExecuteStatementResponseMessage responseMsg = new ExecuteStatementResponseMessage(requestMessageId);
+ ExecuteStatementResponseMessage responseMsg =
+ new ExecuteStatementResponseMessage(requestMessageId, clientContextID, requestReference.getUuid());
final IStatementExecutor.StatementProperties statementProperties = new IStatementExecutor.StatementProperties();
responseMsg.setStatementProperties(statementProperties);
try {
@@ -230,9 +231,10 @@
return null;
}
- protected static void sendRejection(Exception reason, CCMessageBroker messageBroker, long requestMessageId,
+ protected void sendRejection(Exception reason, CCMessageBroker messageBroker, long requestMessageId,
String requestNodeId) {
- ExecuteStatementResponseMessage responseMsg = new ExecuteStatementResponseMessage(requestMessageId);
+ ExecuteStatementResponseMessage responseMsg =
+ new ExecuteStatementResponseMessage(requestMessageId, clientContextID, requestReference.getUuid());
responseMsg.setError(reason);
try {
messageBroker.sendApplicationMessageToNC(responseMsg, requestNodeId);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementResponseMessage.java
index 2cdede1..eaadebe 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementResponseMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementResponseMessage.java
@@ -31,9 +31,12 @@
import org.apache.hyracks.api.exceptions.Warning;
public final class ExecuteStatementResponseMessage implements INcAddressedMessage {
- private static final long serialVersionUID = 1L;
+
+ private static final long serialVersionUID = 2L;
private final long requestMessageId;
+ private final String clientContextID;
+ private final String uuid;
private String result;
@@ -49,8 +52,10 @@
private Collection<Warning> warnings;
- public ExecuteStatementResponseMessage(long requestMessageId) {
+ public ExecuteStatementResponseMessage(long requestMessageId, String clientContextID, String uuid) {
this.requestMessageId = requestMessageId;
+ this.clientContextID = clientContextID;
+ this.uuid = uuid;
}
@Override
@@ -120,7 +125,7 @@
@Override
public String toString() {
- return String.format("%s(id=%s): %d characters", getClass().getSimpleName(), requestMessageId,
- result != null ? result.length() : 0);
+ return String.format("%s(id=%s, uuid=%s, clientContextId=%s): %d characters", getClass().getSimpleName(),
+ requestMessageId, uuid, clientContextID, result != null ? result.length() : 0);
}
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
index bee5ff9..9d378f5 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
@@ -100,4 +100,9 @@
public MessageType getType() {
return MessageType.REGISTRATION_TASKS_RESPONSE;
}
+
+ @Override
+ public String toString() {
+ return "RegistrationTasksResponseMessage{from='" + nodeId + '}';
+ }
}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/common/TestDataUtil.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/common/TestDataUtil.java
index b213e9d..f22962f 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/common/TestDataUtil.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/common/TestDataUtil.java
@@ -195,7 +195,7 @@
try {
ActiveNotificationHandler activeNotificationHandler =
(ActiveNotificationHandler) ccAppCtx.getActiveNotificationHandler();
- activeNotificationHandler.suspend(metadataProvider);
+ activeNotificationHandler.suspend(metadataProvider, "");
try {
IMetadataLockManager lockManager = ccAppCtx.getMetadataLockManager();
lockManager.acquireDatasetExclusiveModificationLock(metadataProvider.getLocks(), database,
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
index 59e0878..7d66760 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
@@ -148,7 +148,7 @@
Assert.assertTrue(requestedStats.contains("N/A"));
// Fake partition message and notify eventListener
ActivePartitionMessage partitionMessage =
- new ActivePartitionMessage(activeRuntimeId, jobId, Event.RUNTIME_REGISTERED, null);
+ new ActivePartitionMessage(activeRuntimeId, jobId, Event.RUNTIME_REGISTERED, null, "");
partitionMessage.handle(appCtx);
start.sync();
if (start.hasFailed()) {
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/RuntimeRegistration.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/RuntimeRegistration.java
index 0c4b806..0eba381 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/RuntimeRegistration.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/RuntimeRegistration.java
@@ -49,7 +49,7 @@
subscriber.beforeExecute();
}
ActiveEvent event = new ActiveEvent(jobId, Kind.PARTITION_EVENT, entityId, new ActivePartitionMessage(
- new ActiveRuntimeId(entityId, nc.getId(), partition), jobId, Event.RUNTIME_REGISTERED, null));
+ new ActiveRuntimeId(entityId, nc.getId(), partition), jobId, Event.RUNTIME_REGISTERED, null, ""));
nc.getClusterController().activeEvent(event);
}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestNodeControllerActor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestNodeControllerActor.java
index f849f08..fcd4d85 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestNodeControllerActor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestNodeControllerActor.java
@@ -92,7 +92,7 @@
subscriber.beforeExecute();
}
ActiveEvent event = new ActiveEvent(jobId, Kind.PARTITION_EVENT, entityId, new ActivePartitionMessage(
- new ActiveRuntimeId(entityId, id, partition), jobId, Event.RUNTIME_DEREGISTERED, null));
+ new ActiveRuntimeId(entityId, id, partition), jobId, Event.RUNTIME_DEREGISTERED, null, ""));
clusterController.activeEvent(event);
}
};
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestUserActor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestUserActor.java
index 919e0e4..2086daf 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestUserActor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestUserActor.java
@@ -280,7 +280,7 @@
Action action = new Action() {
@Override
protected void doExecute(MetadataProvider mdProvider) throws Exception {
- handler.suspend(mdProvider);
+ handler.suspend(mdProvider, "");
}
};
add(action);
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index 34f5114..1a45155 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -150,7 +150,7 @@
IndexInfo iInfo = dsr == null ? null : dsr.getIndexInfo(resourceID);
if (dsr == null || iInfo == null) {
- throw HyracksDataException.create(ErrorCode.INDEX_DOES_NOT_EXIST);
+ throw HyracksDataException.create(ErrorCode.INDEX_DOES_NOT_EXIST, resourcePath);
}
PrimaryIndexOperationTracker opTracker = dsr.getOpTracker(iInfo.getPartition());
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
index 150d5ee..e3fe663 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
@@ -54,7 +54,6 @@
import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
import org.apache.hyracks.storage.am.lsm.common.util.ComponentUtils;
import org.apache.hyracks.storage.am.lsm.common.util.LSMComponentIdUtils;
-import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -173,11 +172,11 @@
synchronized (lsmIndex.getOperationTracker()) {
List<ILSMDiskComponent> diskComponents = lsmIndex.getDiskComponents();
if (diskComponents.isEmpty()) {
- LOGGER.log(Level.INFO, "There are no disk components");
+ LOGGER.info("there are no disk components for {}", lsmIndex);
return LSMComponentId.EMPTY_INDEX_LAST_COMPONENT_ID;
}
if (deletedComponents.contains(diskComponents.get(diskComponents.size() - 1))) {
- LOGGER.log(Level.INFO, "All disk components have been deleted");
+ LOGGER.info("all disk components have been deleted for {}", lsmIndex);
return LSMComponentId.EMPTY_INDEX_LAST_COMPONENT_ID;
}
int mostRecentComponentIndex = 0;
diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index d6ff691..b3eb3a3 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -307,7 +307,7 @@
3006 = Illegal field %1$s in closed type %2$s
3007 = Twitter4J library not found!
3008 = Unable to ingest data
-3009 = Exception in get record type %1$s for feed
+3009 = Exception in get record type %1$s for ingestion
3010 = Does not support Hive data with list of non-primitive types
3011 = Cannot get hive type for field of type %1$s
3012 = Failed to get columns of record
@@ -404,7 +404,7 @@
3105 = %1$s is already registered
3106 = %1$s is not registered
3107 = Active Notification Handler is already suspended
-3110 = Feed failed while reading a new record
+3110 = Ingestion failed while reading a new record
3111 = Feed %1$s is not connected to any dataset
3112 = Array/Multiset item cannot be null
3113 = Failed to parse record
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
index ed46471..5de2717 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
@@ -98,7 +98,7 @@
}
}
} catch (HyracksDataException e) {
- LOGGER.log(Level.WARN, "Exception during ingestion", e);
+ logFailure(e);
if (e.matches(ErrorCode.FEED_FAILED_WHILE_GETTING_A_NEW_RECORD)) {
// Failure but we know we can for sure push the previously parsed records safely
failure = e;
@@ -113,7 +113,7 @@
}
} catch (Throwable e) {
failure = e;
- LOGGER.log(Level.WARN, "Failure while operating a feed source", e);
+ logFailure(e);
} finally {
failure = finish(failure);
}
@@ -126,7 +126,7 @@
}
private synchronized void setState(State newState) {
- LOGGER.log(Level.INFO, "State is being set from " + state + " to " + newState);
+ LOGGER.info("controller is being set from {} to {} ", state, newState);
state = newState;
}
@@ -289,4 +289,12 @@
public void handleGenericEvent(ActiveManagerMessage event) {
recordReader.handleGenericEvent(event);
}
+
+ private void logFailure(Throwable th) {
+ if (th instanceof InterruptedException || th.getCause() instanceof InterruptedException) {
+ LOGGER.warn("data flow controller interrupted", th);
+ } else {
+ LOGGER.warn("data flow controller failed", th);
+ }
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index 34d7065..ea1cc1e 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -121,7 +121,7 @@
101 = Page %1$s does not exist in file %2$s
102 = Failed to open virtual buffer cache since it is already open
103 = Failed to close virtual buffer cache since it is already closed
-104 = Index does not exist
+104 = Index does not exist (%1$s)
105 = Cannot drop in-use index (%1$s)
106 = Failed to deactivate the bloom filter since it is pinned by other users
107 = The given search predicate can't be null.
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ApplicationMessageWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ApplicationMessageWork.java
index 771832e..6630ba7 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ApplicationMessageWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ApplicationMessageWork.java
@@ -25,6 +25,7 @@
import org.apache.hyracks.api.messages.IMessage;
import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.control.common.deployment.DeploymentUtils;
+import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -34,8 +35,8 @@
public class ApplicationMessageWork extends AbstractHeartbeatWork {
private static final Logger LOGGER = LogManager.getLogger();
- private byte[] message;
- private DeploymentId deploymentId;
+ private final byte[] message;
+ private final DeploymentId deploymentId;
public ApplicationMessageWork(ClusterControllerService ccs, byte[] message, DeploymentId deploymentId,
String nodeId) {
@@ -57,6 +58,11 @@
}
@Override
+ public Level logLevel() {
+ return Level.TRACE;
+ }
+
+ @Override
public String toString() {
return getName() + ": nodeID: " + nodeId;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeControllersInfoWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeControllersInfoWork.java
index c36b887..f08e209 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeControllersInfoWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeControllersInfoWork.java
@@ -24,10 +24,12 @@
import org.apache.hyracks.control.cc.cluster.INodeManager;
import org.apache.hyracks.control.common.work.AbstractWork;
import org.apache.hyracks.control.common.work.IResultCallback;
+import org.apache.logging.log4j.Level;
public class GetNodeControllersInfoWork extends AbstractWork {
+
private final INodeManager nodeManager;
- private IResultCallback<Map<String, NodeControllerInfo>> callback;
+ private final IResultCallback<Map<String, NodeControllerInfo>> callback;
public GetNodeControllersInfoWork(INodeManager nodeManager,
IResultCallback<Map<String, NodeControllerInfo>> callback) {
@@ -39,4 +41,9 @@
public void run() {
callback.setValue(nodeManager.getNodeControllerInfoMap());
}
+
+ @Override
+ public Level logLevel() {
+ return Level.TRACE;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetResultDirectoryAddressWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetResultDirectoryAddressWork.java
index bf95ff2..b7dbd75 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetResultDirectoryAddressWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetResultDirectoryAddressWork.java
@@ -22,8 +22,10 @@
import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.control.common.work.IResultCallback;
import org.apache.hyracks.control.common.work.SynchronizableWork;
+import org.apache.logging.log4j.Level;
public class GetResultDirectoryAddressWork extends SynchronizableWork {
+
private final ClusterControllerService ccs;
private final IResultCallback<NetworkAddress> callback;
@@ -42,4 +44,9 @@
callback.setException(e);
}
}
+
+ @Override
+ public Level logLevel() {
+ return Level.TRACE;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java
index 77d2f82..6262c47 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java
@@ -32,13 +32,14 @@
import org.apache.logging.log4j.Logger;
public class JobCleanupWork extends AbstractWork {
+
private static final Logger LOGGER = LogManager.getLogger();
- private IJobManager jobManager;
- private JobId jobId;
- private JobStatus status;
- private List<Exception> exceptions;
- private IResultCallback<Void> callback;
+ private final IJobManager jobManager;
+ private final JobId jobId;
+ private final JobStatus status;
+ private final List<Exception> exceptions;
+ private final IResultCallback<Void> callback;
public JobCleanupWork(IJobManager jobManager, JobId jobId, JobStatus status, List<Exception> exceptions,
IResultCallback<Void> callback) {
@@ -51,12 +52,10 @@
@Override
public void run() {
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info("Cleanup for job: {}", jobId);
- }
+ LOGGER.info("cleaning up {} on NCs, status={}", jobId, status);
final JobRun jobRun = jobManager.get(jobId);
if (jobRun == null) {
- LOGGER.debug("Ignoring cleanup for unknown job: {}", jobId);
+ LOGGER.debug("ignoring cleanup for unknown {}", jobId);
return;
}
try {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
index cfedfc9..86e36d6 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
@@ -35,8 +35,13 @@
import org.apache.hyracks.control.common.deployment.DeploymentUtils;
import org.apache.hyracks.control.common.work.IResultCallback;
import org.apache.hyracks.control.common.work.SynchronizableWork;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
public class JobStartWork extends SynchronizableWork {
+
+ private static final Logger LOGGER = LogManager.getLogger();
private final ClusterControllerService ccs;
private final byte[] acggfBytes;
private final Set<JobFlag> jobFlags;
@@ -67,6 +72,7 @@
JobId jobId;
JobRun run;
jobId = jobIdFactory.create();
+ LOGGER.debug("created {}", jobId);
if (deployedJobSpecId == null) {
//Need to create the ActivityClusterGraph
IActivityClusterGraphGeneratorFactory acggf = (IActivityClusterGraphGeneratorFactory) DeploymentUtils
@@ -85,4 +91,9 @@
callback.setException(e);
}
}
+
+ @Override
+ public Level logLevel() {
+ return Level.TRACE;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobletCleanupNotificationWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobletCleanupNotificationWork.java
index c3a09f9..76a72c0 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobletCleanupNotificationWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobletCleanupNotificationWork.java
@@ -35,6 +35,7 @@
import org.apache.logging.log4j.Logger;
public class JobletCleanupNotificationWork extends AbstractHeartbeatWork {
+
private static final Logger LOGGER = LogManager.getLogger();
private final JobId jobId;
@@ -46,6 +47,7 @@
@Override
public void runWork() {
+ LOGGER.debug("node {} finished job clean-up {}", nodeId, jobId);
IJobManager jobManager = ccs.getJobManager();
final JobRun run = jobManager.get(jobId);
if (run == null) {
@@ -82,4 +84,9 @@
public String toString() {
return getName() + " jobId:" + jobId + ", nodeId:" + nodeId;
}
+
+ @Override
+ public Level logLevel() {
+ return Level.TRACE;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
index ec21785..810fda2 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
@@ -31,6 +31,7 @@
import org.apache.hyracks.control.common.controllers.NodeRegistration;
import org.apache.hyracks.control.common.ipc.NodeControllerRemoteProxy;
import org.apache.hyracks.control.common.work.SynchronizableWork;
+import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -83,4 +84,9 @@
nc.sendRegistrationResult(params, e);
}
}
+
+ @Override
+ public Level logLevel() {
+ return Level.TRACE;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RemoveDeadNodesWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RemoveDeadNodesWork.java
index ee10669..cd9b6d0 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RemoveDeadNodesWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RemoveDeadNodesWork.java
@@ -50,9 +50,7 @@
Collection<JobId> affectedJobIds = result.getRight();
int size = affectedJobIds.size();
if (size > 0) {
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info("Number of affected jobs: " + size);
- }
+ LOGGER.info("number of affected jobs due to dead nodes removal {}", size);
IJobManager jobManager = ccs.getJobManager();
for (JobId jobId : affectedJobIds) {
JobRun run = jobManager.get(jobId);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskCompleteWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskCompleteWork.java
index 0c53142..869caa5 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskCompleteWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskCompleteWork.java
@@ -29,8 +29,13 @@
import org.apache.hyracks.control.common.job.profiling.om.JobProfile;
import org.apache.hyracks.control.common.job.profiling.om.JobletProfile;
import org.apache.hyracks.control.common.job.profiling.om.TaskProfile;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
public class TaskCompleteWork extends AbstractTaskLifecycleWork {
+
+ private static final Logger LOGGER = LogManager.getLogger();
private final TaskProfile statistics;
public TaskCompleteWork(ClusterControllerService ccs, JobId jobId, TaskAttemptId taId, String nodeId,
@@ -44,8 +49,10 @@
IJobManager jobManager = ccs.getJobManager();
JobRun run = jobManager.get(jobId);
if (run == null) {
+ LOGGER.debug("node completed task for unknown job {}:{}:{}", nodeId, jobId, taId);
return;
}
+ LOGGER.debug("node completed task {}:{}:{}", nodeId, jobId, taId);
if (statistics != null) {
JobProfile jobProfile = run.getJobProfile();
Map<String, JobletProfile> jobletProfiles = jobProfile.getJobletProfiles();
@@ -63,4 +70,9 @@
public String toString() {
return getName() + ": [" + nodeId + "[" + jobId + ":" + taId + "]";
}
+
+ @Override
+ public Level logLevel() {
+ return Level.TRACE;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/ApplicationMessageWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/ApplicationMessageWork.java
index 6d4f173..f386a89 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/ApplicationMessageWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/ApplicationMessageWork.java
@@ -29,11 +29,12 @@
import org.apache.logging.log4j.Logger;
public class ApplicationMessageWork extends AbstractWork {
+
private static final Logger LOGGER = LogManager.getLogger();
- private byte[] message;
- private DeploymentId deploymentId;
- private String nodeId;
- private NodeControllerService ncs;
+ private final byte[] message;
+ private final DeploymentId deploymentId;
+ private final String nodeId;
+ private final NodeControllerService ncs;
public ApplicationMessageWork(NodeControllerService ncs, byte[] message, DeploymentId deploymentId, String nodeId) {
this.ncs = ncs;
@@ -59,6 +60,11 @@
}
@Override
+ public Level logLevel() {
+ return Level.TRACE;
+ }
+
+ @Override
public String toString() {
return getName() + ": nodeId: " + nodeId;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java
index 75edd38..efc8467 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java
@@ -25,10 +25,12 @@
import org.apache.hyracks.control.common.work.AbstractWork;
import org.apache.hyracks.control.nc.Joblet;
import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class CleanupJobletWork extends AbstractWork {
+
private static final Logger LOGGER = LogManager.getLogger();
private final NodeControllerService ncs;
@@ -45,7 +47,7 @@
@Override
public void run() {
- LOGGER.debug("cleaning up after job: {}", jobId);
+ LOGGER.debug("cleaning up {}", jobId);
ncs.removeJobParameterByteStore(jobId);
ncs.getPartitionManager().jobCompleted(jobId, status);
Map<JobId, Joblet> jobletMap = ncs.getJobletMap();
@@ -59,4 +61,9 @@
public String toString() {
return getName() + " jobId:" + jobId + ", status:" + status;
}
+
+ @Override
+ public Level logLevel() {
+ return Level.TRACE;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskCompleteWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskCompleteWork.java
index 60860c5..7e1b6f5 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskCompleteWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskCompleteWork.java
@@ -18,6 +18,8 @@
*/
package org.apache.hyracks.control.nc.work;
+import org.apache.hyracks.api.dataflow.TaskAttemptId;
+import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.control.common.job.profiling.om.TaskProfile;
import org.apache.hyracks.control.common.work.AbstractWork;
import org.apache.hyracks.control.nc.NodeControllerService;
@@ -38,20 +40,27 @@
@Override
public void run() {
- TaskProfile taskProfile = new TaskProfile(task.getTaskAttemptId(), task.getPartitionSendProfile(),
+ JobId jobId = task.getJoblet().getJobId();
+ TaskAttemptId taskAttemptId = task.getTaskAttemptId();
+ LOGGER.debug("notifying CC of task complete {}:{}", jobId, taskAttemptId);
+ TaskProfile taskProfile = new TaskProfile(taskAttemptId, task.getPartitionSendProfile(),
task.getStatsCollector(), task.getWarnings(), task.getWarningCollector().getTotalWarningsCount());
try {
- ncs.getClusterController(task.getJobletContext().getJobId().getCcId()).notifyTaskComplete(
- task.getJobletContext().getJobId(), task.getTaskAttemptId(), ncs.getId(), taskProfile);
+ ncs.getClusterController(task.getJobletContext().getJobId().getCcId())
+ .notifyTaskComplete(task.getJobletContext().getJobId(), taskAttemptId, ncs.getId(), taskProfile);
} catch (Exception e) {
- LOGGER.log(Level.ERROR, "Failed notifying task complete for " + task.getTaskAttemptId(), e);
+ LOGGER.log(Level.ERROR, "Failed notifying task complete for {}", taskAttemptId, e);
}
task.getJoblet().removeTask(task);
}
@Override
public String toString() {
- return getName() + ": [" + ncs.getId() + "[" + task.getJoblet().getJobId() + ":" + task.getTaskAttemptId()
- + "]";
+ return getName() + ": [" + task.getJoblet().getJobId() + ":" + task.getTaskAttemptId() + "]";
+ }
+
+ @Override
+ public Level logLevel() {
+ return Level.TRACE;
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
index f277046..dd4a956 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
@@ -318,6 +318,6 @@
@Override
public String toString() {
- return getName() + " jobId:" + jobId;
+ return getName() + " jobId:" + jobId + " tasks:" + taskDescriptors.size();
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java
index 16461de..b79c3b1 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java
@@ -73,8 +73,7 @@
// Get local resource
LocalResource lr = getResource();
if (lr == null) {
- LOGGER.error("index {} does not exist", resourceRef.getRelativePath());
- throw HyracksDataException.create(ErrorCode.INDEX_DOES_NOT_EXIST);
+ throw HyracksDataException.create(ErrorCode.INDEX_DOES_NOT_EXIST, resourceRef.getRelativePath());
}
IResource resource = lr.getResource();
index = resource.createInstance(ctx);
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexLifecycleManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexLifecycleManager.java
index c9505a6..ab301a9 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexLifecycleManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexLifecycleManager.java
@@ -196,7 +196,7 @@
public void unregister(String resourcePath) throws HyracksDataException {
IndexInfo info = indexInfos.get(resourcePath);
if (info == null) {
- throw HyracksDataException.create(ErrorCode.INDEX_DOES_NOT_EXIST);
+ throw HyracksDataException.create(ErrorCode.INDEX_DOES_NOT_EXIST, resourcePath);
}
if (info.referenceCount != 0) {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
index 4ee1245..966d6d1 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
@@ -145,7 +145,7 @@
// However, we cannot throw an exception here to be compatible with legacy datasets.
// In this case, the disk component would always get a garbage Id [-1, -1], which makes the
// component Id-based optimization useless but still correct.
- LOGGER.warn("Component Id not found from disk component metadata");
+ LOGGER.warn("component id {} not found from disk component metadata {}", componentId, getIndex());
}
return componentId;
}
@@ -160,9 +160,7 @@
@Override
public void markAsValid(boolean persist, IPageWriteFailureCallback callback) throws HyracksDataException {
ComponentUtils.markAsValid(getMetadataHolder(), persist, callback);
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("Marked as valid component with id: " + getId());
- }
+ LOGGER.debug("marked {} as valid component with id {}", getIndex(), getId());
}
@Override
@@ -247,6 +245,7 @@
@Override
public String toString() {
- return "{\"class\":" + getClass().getSimpleName() + "\", \"index\":" + getIndex().toString() + "}";
+ return "{\"class\":" + getClass().getSimpleName() + "\", \"id\":" + componentId + ", \"index\":" + getIndex()
+ + "}";
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
index d9a3371..dfce00f 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
@@ -715,7 +715,8 @@
return "{\"class\" : \"" + getClass().getSimpleName() + "\", \"dir\" : \"" + fileManager.getBaseDir()
+ "\", \"memory\" : " + (memoryComponents == null ? 0 : memoryComponents) + ", \"disk\" : "
+ diskComponents.size() + ", \"num-scheduled-flushes\":" + numScheduledFlushes
- + ", \"current-memory-component\":" + currentMutableComponentId.get() + "}";
+ + ", \"current-memory-component\":"
+ + (currentMutableComponentId == null ? "" : currentMutableComponentId.get()) + "}";
}
@Override
@@ -897,11 +898,8 @@
if (!memoryComponent.isModified() || opCtx.getOperation() == IndexOperation.DELETE_COMPONENTS) {
return EmptyComponent.INSTANCE;
}
- if (LOGGER.isInfoEnabled()) {
- FlushOperation flushOp = (FlushOperation) operation;
- LOGGER.log(Level.INFO,
- "Flushing component with id: " + flushOp.getFlushingComponent().getId() + " in the index " + this);
- }
+ LOGGER.debug("flushing component with id {} in the index {}",
+ ((FlushOperation) operation).getFlushingComponent().getId(), this);
return doFlush(operation);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java
index 8d37d97..7088791 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java
@@ -337,9 +337,7 @@
throw new IllegalStateException(
this + " receives illegal id. Old id " + this.componentId + ", new id " + componentId);
}
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("Component Id was reset from " + this.componentId + " to " + componentId);
- }
+ LOGGER.debug("component id of {} was reset from {} to {}", getIndex(), this.componentId, componentId);
this.componentId = componentId;
if (componentId != null) {
LSMComponentIdUtils.persist(this.componentId, metadata);
@@ -355,6 +353,6 @@
public String toString() {
return "{\"class\":\"" + getClass().getSimpleName() + "\", \"state\":\"" + state + "\", \"writers\":"
+ writerCount + ", \"readers\":" + readerCount + ", \"pendingFlushes\":" + pendingFlushes
- + ", \"id\":\"" + componentId + "\"}";
+ + ", \"id\":\"" + componentId + "\", \"index\":" + getIndex() + "}";
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java
index 3ea0f49..9a112ee 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java
@@ -152,4 +152,9 @@
public int getReaderCount() {
return 0;
}
+
+ @Override
+ public String toString() {
+ return "EmptyComponent";
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index 461d416..cdf7ad7 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -532,18 +532,19 @@
}
}
}
+ ILSMDiskComponent newComponent;
try {
doIo(operation);
} finally {
- exitComponents(operation.getAccessor().getOpContext(), LSMOperationType.FLUSH, operation.getNewComponent(),
+ newComponent = operation.getNewComponent();
+ exitComponents(operation.getAccessor().getOpContext(), LSMOperationType.FLUSH, newComponent,
operation.getStatus() == LSMIOOperationStatus.FAILURE);
opTracker.completeOperation(lsmIndex, LSMOperationType.FLUSH,
operation.getAccessor().getOpContext().getSearchOperationCallback(),
operation.getAccessor().getOpContext().getModificationCallback());
}
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("Finished the flush operation for index: {}. Result: {}", lsmIndex, operation.getStatus());
- }
+ LOGGER.debug("Finished the flush operation for {}. Result: {}",
+ (newComponent == null ? lsmIndex : newComponent), operation.getStatus());
}
public void doIo(ILSMIOOperation operation) {
@@ -584,24 +585,25 @@
public void merge(ILSMIOOperation operation) throws HyracksDataException {
if (LOGGER.isDebugEnabled()) {
MergeOperation mergeOp = (MergeOperation) operation;
- LOGGER.debug("Started a merge operation (number of merging components {}) for index: {}",
+ LOGGER.debug("Started a merge operation (number of merging components {}) for index {}",
mergeOp.getMergingComponents().size(), lsmIndex);
}
synchronized (opTracker) {
enterComponents(operation.getAccessor().getOpContext(), LSMOperationType.MERGE);
}
+ ILSMDiskComponent newComponent;
try {
doIo(operation);
} finally {
- exitComponents(operation.getAccessor().getOpContext(), LSMOperationType.MERGE, operation.getNewComponent(),
+ newComponent = operation.getNewComponent();
+ exitComponents(operation.getAccessor().getOpContext(), LSMOperationType.MERGE, newComponent,
operation.getStatus() == LSMIOOperationStatus.FAILURE);
opTracker.completeOperation(lsmIndex, LSMOperationType.MERGE,
operation.getAccessor().getOpContext().getSearchOperationCallback(),
operation.getAccessor().getOpContext().getModificationCallback());
}
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("Finished the merge operation for index: {}. Result: {}", lsmIndex, operation.getStatus());
- }
+ LOGGER.debug("Finished the merge operation for {}. Result: {}",
+ (newComponent == null ? lsmIndex : newComponent), operation.getStatus());
}
@Override
@@ -785,6 +787,8 @@
ioOperation = scheduleFlush(ctx);
} else {
// since we're not deleting the memory component, we can't delete any previous component
+ LOGGER.debug("not deleting any components of {} since memory component {} won't be deleted", lsmIndex,
+ memComponent);
return;
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/ComponentUtils.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/ComponentUtils.java
index 1ff9fa8..842ec61 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/ComponentUtils.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/ComponentUtils.java
@@ -34,7 +34,6 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
import org.apache.hyracks.storage.common.buffercache.IPageWriteFailureCallback;
-import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -80,11 +79,7 @@
*/
public static void get(ILSMIndex index, IValueReference key, ArrayBackedValueStorage value)
throws HyracksDataException {
- boolean loggable = LOGGER.isDebugEnabled();
value.reset();
- if (loggable) {
- LOGGER.log(Level.DEBUG, "Getting " + key + " from index " + index);
- }
// Lock the opTracker to ensure index components don't change
synchronized (index.getOperationTracker()) {
ILSMMemoryComponent cmc = index.getCurrentMemoryComponent();
@@ -92,33 +87,17 @@
index.getCurrentMemoryComponent().getMetadata().get(key, value);
}
if (value.getLength() == 0) {
- if (loggable) {
- LOGGER.log(Level.DEBUG, key + " was not found in mutable memory component of " + index);
- }
- // was not found in the in current mutable component, search in the other in memory components
+ // was not found in the in current mutable component, search in the other in-memory components
fromImmutableMemoryComponents(index, key, value);
if (value.getLength() == 0) {
- if (loggable) {
- LOGGER.log(Level.DEBUG, key + " was not found in all immmutable memory components of " + index);
- }
- // was not found in the in all in memory components, search in the disk components
+ // was not found in all in-memory components, search in the disk components
fromDiskComponents(index, key, value);
- if (loggable) {
- if (value.getLength() == 0) {
- LOGGER.log(Level.DEBUG, key + " was not found in all disk components of " + index);
- } else {
- LOGGER.log(Level.DEBUG, key + " was found in disk components of " + index);
- }
- }
- } else {
- if (loggable) {
- LOGGER.log(Level.DEBUG, key + " was found in the immutable memory components of " + index);
+ if (value.getLength() == 0) {
+ LOGGER.debug("{} was NOT found", key);
}
}
} else {
- if (loggable) {
- LOGGER.log(Level.DEBUG, key + " was found in mutable memory component of " + index);
- }
+ LOGGER.debug("{} was found in mutable memory component {}", key, cmc);
}
}
}
@@ -143,17 +122,11 @@
private static void fromDiskComponents(ILSMIndex index, IValueReference key, ArrayBackedValueStorage value)
throws HyracksDataException {
- boolean loggable = LOGGER.isDebugEnabled();
- if (loggable) {
- LOGGER.log(Level.DEBUG, "Getting " + key + " from disk components of " + index);
- }
for (ILSMDiskComponent c : index.getDiskComponents()) {
- if (loggable) {
- LOGGER.log(Level.DEBUG, "Getting " + key + " from disk components " + c);
- }
c.getMetadata().get(key, value);
if (value.getLength() != 0) {
// Found
+ LOGGER.debug("{} was found in disk component {}", key, c);
return;
}
}
@@ -161,21 +134,10 @@
private static void fromImmutableMemoryComponents(ILSMIndex index, IValueReference key,
ArrayBackedValueStorage value) throws HyracksDataException {
- boolean loggable = LOGGER.isDebugEnabled();
- if (loggable) {
- LOGGER.log(Level.DEBUG, "Getting " + key + " from immutable memory components of " + index);
- }
List<ILSMMemoryComponent> memComponents = index.getMemoryComponents();
int numOtherMemComponents = memComponents.size() - 1;
int next = index.getCurrentMemoryComponentIndex();
- if (loggable) {
- LOGGER.log(Level.DEBUG, index + " has " + numOtherMemComponents + " immutable memory components");
- }
for (int i = 0; i < numOtherMemComponents; i++) {
- if (loggable) {
- LOGGER.log(Level.DEBUG,
- "trying to get " + key + " from immutable memory components number: " + (i + 1));
- }
next = next - 1;
if (next < 0) {
next = memComponents.size() - 1;
@@ -185,6 +147,7 @@
c.getMetadata().get(key, value);
if (value.getLength() != 0) {
// Found
+ LOGGER.debug("{} was found in immutable memory component {}", key, c);
return;
}
}