[NO ISSUE][*DB][ACT] Active stats synchronization
Avoid locks on stats refresh requests on non-running active entities
Change-Id: I458f15cd4b199576b3236762c6da904f086147fd
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/8185
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Tested-by: Michael Blow <mblow@apache.org>
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventSubscriber.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventSubscriber.java
index e01d0a7..3c2f8e8 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventSubscriber.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventSubscriber.java
@@ -34,7 +34,7 @@
void notify(ActiveEvent event);
/**
- * Checkcs whether the subscriber is done receiving events
+ * Checks whether the subscriber is done receiving events
*
* @return
*/
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
index ca610aa..8338b2b 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
@@ -75,7 +75,7 @@
* refresh the stats
*
* @param timeout
- * @throws HyracksDataException
+ * @throws HyracksDataException throws ASX3118 if active entity is not currently running
*/
void refreshStats(long timeout) throws HyracksDataException;
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
index b8c44a6..1a2af13 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
@@ -56,6 +56,6 @@
@Override
public String toString() {
- return ActiveManagerMessage.class.getSimpleName();
+ return getClass().getSimpleName();
}
}
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveStatsRequestMessage.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveStatsRequestMessage.java
index 0dbba52..117a68c 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveStatsRequestMessage.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveStatsRequestMessage.java
@@ -29,7 +29,17 @@
this.reqId = reqId;
}
+ @Override
+ public boolean isWhispered() {
+ return true;
+ }
+
public long getReqId() {
return reqId;
}
+
+ @Override
+ public String toString() {
+ return "ActiveStatsRequestMessage{" + "reqId=" + reqId + '}';
+ }
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
index 882afc5..39ebdf7 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
@@ -18,6 +18,8 @@
*/
package org.apache.asterix.app.active;
+import static org.apache.asterix.common.exceptions.ErrorCode.ACTIVE_ENTITY_NOT_RUNNING;
+
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
@@ -93,7 +95,7 @@
protected ActivityState prevState;
protected JobId jobId;
protected volatile long statsTimestamp;
- protected String stats;
+ protected volatile String stats;
protected volatile boolean isFetchingStats;
protected int numRegistered;
protected int numDeRegistered;
@@ -292,12 +294,17 @@
@Override
public void refreshStats(long timeout) throws HyracksDataException {
LOGGER.log(level, "refreshStats called");
+ // first check state & if we are fetching outside of the lock- in the event we are recovering it may take some
+ // time to obtain the lock...
+ ensureRunning();
+ if (isFetchingStats) {
+ LOGGER.log(level, "returning immediately since fetchingStats = " + isFetchingStats);
+ return;
+ }
synchronized (this) {
- if (state != ActivityState.RUNNING) {
- LOGGER.log(level, "returning immediately since state = " + state);
- notifySubscribers(statsUpdatedEvent);
- return;
- } else if (isFetchingStats) {
+ // now that we have the lock, again verify the state & ensure we are not already fetching new stats
+ ensureRunning();
+ if (isFetchingStats) {
LOGGER.log(level, "returning immediately since fetchingStats = " + isFetchingStats);
return;
} else {
@@ -323,6 +330,12 @@
isFetchingStats = false;
}
+ protected void ensureRunning() throws RuntimeDataException {
+ if (state != ActivityState.RUNNING) {
+ throw new RuntimeDataException(ACTIVE_ENTITY_NOT_RUNNING, runtimeName, String.valueOf(state).toLowerCase());
+ }
+ }
+
protected synchronized void notifySubscribers(ActiveEvent event) {
notifyAll();
Iterator<IActiveEntityEventSubscriber> it = subscribers.iterator();
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
index 80dde8a..ca5e1e6 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
@@ -19,6 +19,9 @@
package org.apache.asterix.test.active;
+import static org.apache.asterix.common.exceptions.ErrorCode.ACTIVE_ENTITY_NOT_RUNNING;
+import static org.apache.asterix.common.exceptions.ErrorCode.ASTERIX;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@@ -116,7 +119,13 @@
Assert.assertTrue(requestedStats.contains("N/A"));
// Update stats of not-started job
- eventsListener.refreshStats(1000);
+ try {
+ eventsListener.refreshStats(1000);
+ Assert.fail("expected exception on refresh stats on not-started job");
+ } catch (HyracksDataException e) {
+ Assert.assertTrue("incorrect exception thrown (expected: ACTIVE_ENTITY_NOT_RUNNING, was: " + e,
+ e.matches(ASTERIX, ACTIVE_ENTITY_NOT_RUNNING));
+ }
requestedStats = eventsListener.getStats();
Assert.assertTrue(requestedStats.contains("N/A"));
WaitForStateSubscriber startingSubscriber =
@@ -127,8 +136,12 @@
startingSubscriber.sync();
activeJobNotificationHandler.notifyJobCreation(jobId, jobSpec);
activeJobNotificationHandler.notifyJobStart(jobId);
- eventsListener.refreshStats(1000);
- requestedStats = eventsListener.getStats();
+ try {
+ eventsListener.refreshStats(1000);
+ } catch (HyracksDataException e) {
+ Assert.assertTrue("incorrect exception thrown (expected: ACTIVE_ENTITY_NOT_RUNNING, was: " + e,
+ e.matches(ASTERIX, ACTIVE_ENTITY_NOT_RUNNING));
+ }
Assert.assertTrue(requestedStats.contains("N/A"));
// Fake partition message and notify eventListener
ActivePartitionMessage partitionMessage =
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index d57c433..1d18cf6 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -316,6 +316,7 @@
public static final int FAILED_TO_PARSE_METADATA = 3115;
public static final int INPUT_DECODE_FAILURE = 3116;
public static final int FAILED_TO_PARSE_MALFORMED_LOG_RECORD = 3117;
+ public static final int ACTIVE_ENTITY_NOT_RUNNING = 3118;
// Lifecycle management errors
public static final int DUPLICATE_PARTITION_ID = 4000;
diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index 5cf3968..a2780ba 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -314,6 +314,7 @@
3115 = Failed to parse record metadata
3116 = Failed to decode input
3117 = Failed to parse record, malformed log record
+3118 = Active Entity %1$s is not running (it is %2$s)
# Lifecycle management errors
4000 = Partition id %1$s for node %2$s already in use by node %3$s
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/AbstractSubscriber.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/AbstractSubscriber.java
index 37b157e..726880d 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/AbstractSubscriber.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/AbstractSubscriber.java
@@ -18,6 +18,8 @@
*/
package org.apache.asterix.external.feed.watch;
+import java.util.Objects;
+
import org.apache.asterix.active.IActiveEntityEventSubscriber;
import org.apache.asterix.active.IActiveEntityEventsListener;
import org.apache.hyracks.util.Span;
@@ -25,11 +27,19 @@
public abstract class AbstractSubscriber implements IActiveEntityEventSubscriber {
protected final IActiveEntityEventsListener listener;
+ private final Object lockObject;
private volatile boolean done = false;
private volatile Exception failure = null;
+ public AbstractSubscriber(IActiveEntityEventsListener listener, Object lockObject) {
+ Objects.requireNonNull(lockObject);
+ this.listener = listener;
+ this.lockObject = lockObject;
+ }
+
public AbstractSubscriber(IActiveEntityEventsListener listener) {
this.listener = listener;
+ this.lockObject = this;
}
@Override
@@ -38,28 +48,28 @@
}
public void complete(Exception failure) {
- synchronized (listener) {
+ synchronized (lockObject) {
if (failure != null) {
this.failure = failure;
}
done = true;
- listener.notifyAll();
+ lockObject.notifyAll();
}
}
@Override
public void sync() throws InterruptedException {
- synchronized (listener) {
+ synchronized (lockObject) {
while (!done) {
- listener.wait();
+ lockObject.wait();
}
}
}
public boolean sync(Span span) throws InterruptedException {
- synchronized (listener) {
+ synchronized (lockObject) {
while (!done) {
- span.wait(listener);
+ span.wait(lockObject);
if (done || span.elapsed()) {
return done;
}
@@ -71,4 +81,11 @@
public Exception getFailure() {
return failure;
}
+
+ protected void reset() {
+ synchronized (lockObject) {
+ done = false;
+ failure = null;
+ }
+ }
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/WaitForStateSubscriber.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/WaitForStateSubscriber.java
index 818d826..4dc86ac 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/WaitForStateSubscriber.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/WaitForStateSubscriber.java
@@ -29,7 +29,7 @@
private final Set<ActivityState> targetStates;
public WaitForStateSubscriber(IActiveEntityEventsListener listener, Set<ActivityState> targetStates) {
- super(listener);
+ super(listener, listener);
this.targetStates = targetStates;
listener.subscribe(this);
}