[ASTERIXDB-1970][ING] Fix Active Stats Test
- user model changes: no
- storage format changes: no
- interface changes: no
details:
- Active Stats Test fails because it issues many async
calls and doesn't wait until the calls complete. In
addition, it bypass the active event inbox incorrectly.
Change-Id: I518a6b1f7d8e86703ee5537869d207e609a7c293
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1865
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Xikui Wang <xkkwww@gmail.com>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java
index b4ed8e5..d2b8a89 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java
@@ -107,6 +107,7 @@
LOGGER.log(Level.FINER, "Job was found to be: " + (found ? "Active" : "Inactive"));
IActiveEntityEventsListener listener = entityEventListeners.get(entityId);
if (listener != null) {
+ // It is okay to bypass the event inbox in this case because we know this is the first event for this entity
listener.notify(new ActiveEvent(jobId, Kind.JOB_CREATED, entityId, jobSpecification));
}
LOGGER.log(Level.FINER, "Listener was notified" + jobId);
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveMessageTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
similarity index 82%
rename from asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveMessageTest.java
rename to asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
index 2dc1782..e932006 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveMessageTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
@@ -19,11 +19,14 @@
package org.apache.asterix.test.active;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.asterix.active.ActiveEvent;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
import org.apache.asterix.active.ActiveJobNotificationHandler;
import org.apache.asterix.active.ActiveLifecycleListener;
import org.apache.asterix.active.ActiveRuntimeId;
+import org.apache.asterix.active.ActivityState;
import org.apache.asterix.active.EntityId;
import org.apache.asterix.active.IActiveRuntime;
import org.apache.asterix.active.message.ActivePartitionMessage;
@@ -31,12 +34,12 @@
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.metadata.IDataset;
import org.apache.asterix.external.feed.management.ActiveEntityEventsListener;
+import org.apache.asterix.external.feed.watch.WaitForStateSubscriber;
import org.apache.asterix.external.operators.FeedIntakeOperatorNodePushable;
import org.apache.asterix.runtime.utils.CcApplicationContext;
import org.apache.asterix.test.runtime.ExecutionTestUtil;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
import org.junit.Assert;
@@ -44,11 +47,9 @@
import org.junit.Test;
import org.mockito.Mockito;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
+import com.fasterxml.jackson.databind.ObjectMapper;
-public class ActiveMessageTest {
+public class ActiveStatsTest {
protected boolean cleanUp = true;
private static String EXPECTED_STATS = "Mock stats";
@@ -59,7 +60,7 @@
}
@Test
- public void refreshStatsTest() throws HyracksException {
+ public void refreshStatsTest() throws Exception {
// Entities to be used
EntityId entityId = new EntityId("MockExtension", "MockDataverse", "MockEntity");
ActiveRuntimeId activeRuntimeId =
@@ -85,9 +86,8 @@
.thenReturn(entityId);
// Add event listener
- ActiveEntityEventsListener eventsListener =
- new ActiveEntityEventsListener(appCtx, entityId, datasetList, partitionConstraint,
- FeedIntakeOperatorNodePushable.class.getSimpleName());
+ ActiveEntityEventsListener eventsListener = new ActiveEntityEventsListener(appCtx, entityId, datasetList,
+ partitionConstraint, FeedIntakeOperatorNodePushable.class.getSimpleName());
activeJobNotificationHandler.registerListener(eventsListener);
// Register mock runtime
@@ -103,19 +103,22 @@
eventsListener.refreshStats(1000);
requestedStats = eventsListener.getStats();
Assert.assertTrue(requestedStats.equals("N/A"));
-
+ WaitForStateSubscriber startingSubscriber = new WaitForStateSubscriber(eventsListener, ActivityState.STARTING);
+ eventsListener.subscribe(startingSubscriber);
// Update stats of created/started job without joined partition
activeJobNotificationHandler.notifyJobCreation(jobId, jobSpec);
activeLifecycleListener.notifyJobStart(jobId);
+ startingSubscriber.sync();
eventsListener.refreshStats(1000);
requestedStats = eventsListener.getStats();
Assert.assertTrue(requestedStats.equals("N/A"));
-
// Fake partition message and notify eventListener
- ActivePartitionMessage partitionMessage =
- new ActivePartitionMessage(activeRuntimeId, jobId, ActivePartitionMessage.ACTIVE_RUNTIME_REGISTERED,
- null);
- eventsListener.notify(new ActiveEvent(jobId, ActiveEvent.Kind.PARTITION_EVENT, entityId, partitionMessage));
+ WaitForStateSubscriber startedSubscriber = new WaitForStateSubscriber(eventsListener, ActivityState.STARTED);
+ eventsListener.subscribe(startedSubscriber);
+ ActivePartitionMessage partitionMessage = new ActivePartitionMessage(activeRuntimeId, jobId,
+ ActivePartitionMessage.ACTIVE_RUNTIME_REGISTERED, null);
+ partitionMessage.handle(appCtx);
+ startedSubscriber.sync();
eventsListener.refreshStats(100000);
requestedStats = eventsListener.getStats();
Assert.assertTrue(requestedStats.contains(EXPECTED_STATS));
@@ -134,8 +137,8 @@
} catch (HyracksDataException e) {
expectedException = e;
}
- Assert.assertTrue(expectedException != null
- && expectedException.getErrorCode() == ErrorCode.ACTIVE_MANAGER_INVALID_RUNTIME);
+ Assert.assertNotNull(expectedException);
+ Assert.assertEquals(ErrorCode.ACTIVE_MANAGER_INVALID_RUNTIME, expectedException.getErrorCode());
}
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index d264008..7c82ca3 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -68,7 +68,6 @@
public static final int POLYGON_3_POINTS = 25;
public static final int POLYGON_INVALID = 26;
-
public static final int INSTANTIATION_ERROR = 100;
// Compilation errors
@@ -195,12 +194,11 @@
public static final int FEED_METADATA_SOCKET_ADAPTOR_SOCKET_NOT_PROPERLY_CONFIGURED = 3081;
public static final int FEED_METADATA_SOCKET_ADAPTOR_SOCKET_INVALID_HOST_NC = 3082;
public static final int PROVIDER_DATASOURCE_FACTORY_DUPLICATE_FORMAT_MAPPING = 3083;
- public static final int CANNOT_WAIT_FOR_STATE = 3084;
+ public static final int CANNOT_SUBSCRIBE_TO_FAILED_ACTIVE_ENTITY = 3084;
public static final int FEED_UNKNOWN_ADAPTER_NAME = 3085;
public static final int PROVIDER_STREAM_RECORD_READER_WRONG_CONFIGURATION = 3086;
public static final int FEED_CONNECT_FEED_APPLIED_INVALID_FUNCTION = 3087;
public static final int ACTIVE_MANAGER_INVALID_RUNTIME = 3088;
- public static final int CANNOT_SUBSCRIBE_TO_FAILED_ACTIVE_ENTITY = 3089;
// Lifecycle management errors
public static final int DUPLICATE_PARTITION_ID = 4000;
diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index 11334f1..facf1a9 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -183,12 +183,11 @@
3081 = socket is not properly configured
3082 = Invalid %1$s %2$s as it is not part of the AsterixDB cluster. Valid choices are %3$s
3083 = Duplicate feed adaptor name: %1$s
-3084 = Cannot wait for state %1$s. The only states that can be waited for are STARTED or STOPPED
+3084 = Cannot subscribe to events of a failed active entity
3085 = Unknown Adapter Name.
3086 = Cannot find record reader %1$s with specified configuration
3087 = Cannot find function %1$s
3088 = %1$s is not a valid runtime Id
-3089 = Cannot subscribe to events of a failed active entity
# Lifecycle management errors
4000 = Partition id %1$d for node %2$s already in use by node %3$s
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ActiveEntityEventsListener.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ActiveEntityEventsListener.java
index cee6fa9..409c297 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ActiveEntityEventsListener.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ActiveEntityEventsListener.java
@@ -88,6 +88,7 @@
this.numRegistered = 0;
}
+ @Override
public synchronized void notify(ActiveEvent event) {
try {
LOGGER.finer("EventListener is notified.");
@@ -191,8 +192,11 @@
@SuppressWarnings("unchecked")
@Override
public void refreshStats(long timeout) throws HyracksDataException {
+ LOGGER.log(Level.INFO, "refreshStats called");
synchronized (this) {
if (state != ActivityState.STARTED || statsRequestState == RequestState.STARTED) {
+ LOGGER.log(Level.INFO, "returning immediately since state = " + state + " and statsRequestState = "
+ + statsRequestState);
return;
} else {
statsRequestState = RequestState.STARTED;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/WaitForStateSubscriber.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/WaitForStateSubscriber.java
index ea7e3ae..7bab421 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/WaitForStateSubscriber.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/WaitForStateSubscriber.java
@@ -33,9 +33,6 @@
throws HyracksDataException {
super(listener);
this.targetState = targetState;
- if (targetState != ActivityState.STARTED && targetState != ActivityState.STOPPED) {
- throw new RuntimeDataException(ErrorCode.CANNOT_WAIT_FOR_STATE, targetState);
- }
listener.subscribe(this);
}