[ASTERIXDB-1970][ING] Fix Active Stats Test

- user model changes: no
- storage format changes: no
- interface changes: no

details:
- Active Stats Test fails because it issues many async
  calls and doesn't wait until the calls complete. In
  addition, it bypass the active event inbox incorrectly.

Change-Id: I518a6b1f7d8e86703ee5537869d207e609a7c293
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1865
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Xikui Wang <xkkwww@gmail.com>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java
index b4ed8e5..d2b8a89 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java
@@ -107,6 +107,7 @@
         LOGGER.log(Level.FINER, "Job was found to be: " + (found ? "Active" : "Inactive"));
         IActiveEntityEventsListener listener = entityEventListeners.get(entityId);
         if (listener != null) {
+            // It is okay to bypass the event inbox in this case because we know this is the first event for this entity
             listener.notify(new ActiveEvent(jobId, Kind.JOB_CREATED, entityId, jobSpecification));
         }
         LOGGER.log(Level.FINER, "Listener was notified" + jobId);
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveMessageTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
similarity index 82%
rename from asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveMessageTest.java
rename to asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
index 2dc1782..e932006 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveMessageTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
@@ -19,11 +19,14 @@
 
 package org.apache.asterix.test.active;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.asterix.active.ActiveEvent;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
 import org.apache.asterix.active.ActiveJobNotificationHandler;
 import org.apache.asterix.active.ActiveLifecycleListener;
 import org.apache.asterix.active.ActiveRuntimeId;
+import org.apache.asterix.active.ActivityState;
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.active.IActiveRuntime;
 import org.apache.asterix.active.message.ActivePartitionMessage;
@@ -31,12 +34,12 @@
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.metadata.IDataset;
 import org.apache.asterix.external.feed.management.ActiveEntityEventsListener;
+import org.apache.asterix.external.feed.watch.WaitForStateSubscriber;
 import org.apache.asterix.external.operators.FeedIntakeOperatorNodePushable;
 import org.apache.asterix.runtime.utils.CcApplicationContext;
 import org.apache.asterix.test.runtime.ExecutionTestUtil;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.junit.Assert;
@@ -44,11 +47,9 @@
 import org.junit.Test;
 import org.mockito.Mockito;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
+import com.fasterxml.jackson.databind.ObjectMapper;
 
-public class ActiveMessageTest {
+public class ActiveStatsTest {
 
     protected boolean cleanUp = true;
     private static String EXPECTED_STATS = "Mock stats";
@@ -59,7 +60,7 @@
     }
 
     @Test
-    public void refreshStatsTest() throws HyracksException {
+    public void refreshStatsTest() throws Exception {
         // Entities to be used
         EntityId entityId = new EntityId("MockExtension", "MockDataverse", "MockEntity");
         ActiveRuntimeId activeRuntimeId =
@@ -85,9 +86,8 @@
                 .thenReturn(entityId);
 
         // Add event listener
-        ActiveEntityEventsListener eventsListener =
-                new ActiveEntityEventsListener(appCtx, entityId, datasetList, partitionConstraint,
-                        FeedIntakeOperatorNodePushable.class.getSimpleName());
+        ActiveEntityEventsListener eventsListener = new ActiveEntityEventsListener(appCtx, entityId, datasetList,
+                partitionConstraint, FeedIntakeOperatorNodePushable.class.getSimpleName());
         activeJobNotificationHandler.registerListener(eventsListener);
 
         // Register mock runtime
@@ -103,19 +103,22 @@
         eventsListener.refreshStats(1000);
         requestedStats = eventsListener.getStats();
         Assert.assertTrue(requestedStats.equals("N/A"));
-
+        WaitForStateSubscriber startingSubscriber = new WaitForStateSubscriber(eventsListener, ActivityState.STARTING);
+        eventsListener.subscribe(startingSubscriber);
         // Update stats of created/started job without joined partition
         activeJobNotificationHandler.notifyJobCreation(jobId, jobSpec);
         activeLifecycleListener.notifyJobStart(jobId);
+        startingSubscriber.sync();
         eventsListener.refreshStats(1000);
         requestedStats = eventsListener.getStats();
         Assert.assertTrue(requestedStats.equals("N/A"));
-
         // Fake partition message and notify eventListener
-        ActivePartitionMessage partitionMessage =
-                new ActivePartitionMessage(activeRuntimeId, jobId, ActivePartitionMessage.ACTIVE_RUNTIME_REGISTERED,
-                        null);
-        eventsListener.notify(new ActiveEvent(jobId, ActiveEvent.Kind.PARTITION_EVENT, entityId, partitionMessage));
+        WaitForStateSubscriber startedSubscriber = new WaitForStateSubscriber(eventsListener, ActivityState.STARTED);
+        eventsListener.subscribe(startedSubscriber);
+        ActivePartitionMessage partitionMessage = new ActivePartitionMessage(activeRuntimeId, jobId,
+                ActivePartitionMessage.ACTIVE_RUNTIME_REGISTERED, null);
+        partitionMessage.handle(appCtx);
+        startedSubscriber.sync();
         eventsListener.refreshStats(100000);
         requestedStats = eventsListener.getStats();
         Assert.assertTrue(requestedStats.contains(EXPECTED_STATS));
@@ -134,8 +137,8 @@
         } catch (HyracksDataException e) {
             expectedException = e;
         }
-        Assert.assertTrue(expectedException != null
-                && expectedException.getErrorCode() == ErrorCode.ACTIVE_MANAGER_INVALID_RUNTIME);
+        Assert.assertNotNull(expectedException);
+        Assert.assertEquals(ErrorCode.ACTIVE_MANAGER_INVALID_RUNTIME, expectedException.getErrorCode());
     }
 
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index d264008..7c82ca3 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -68,7 +68,6 @@
     public static final int POLYGON_3_POINTS = 25;
     public static final int POLYGON_INVALID = 26;
 
-
     public static final int INSTANTIATION_ERROR = 100;
 
     // Compilation errors
@@ -195,12 +194,11 @@
     public static final int FEED_METADATA_SOCKET_ADAPTOR_SOCKET_NOT_PROPERLY_CONFIGURED = 3081;
     public static final int FEED_METADATA_SOCKET_ADAPTOR_SOCKET_INVALID_HOST_NC = 3082;
     public static final int PROVIDER_DATASOURCE_FACTORY_DUPLICATE_FORMAT_MAPPING = 3083;
-    public static final int CANNOT_WAIT_FOR_STATE = 3084;
+    public static final int CANNOT_SUBSCRIBE_TO_FAILED_ACTIVE_ENTITY = 3084;
     public static final int FEED_UNKNOWN_ADAPTER_NAME = 3085;
     public static final int PROVIDER_STREAM_RECORD_READER_WRONG_CONFIGURATION = 3086;
     public static final int FEED_CONNECT_FEED_APPLIED_INVALID_FUNCTION = 3087;
     public static final int ACTIVE_MANAGER_INVALID_RUNTIME = 3088;
-    public static final int CANNOT_SUBSCRIBE_TO_FAILED_ACTIVE_ENTITY = 3089;
 
     // Lifecycle management errors
     public static final int DUPLICATE_PARTITION_ID = 4000;
diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index 11334f1..facf1a9 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -183,12 +183,11 @@
 3081 = socket is not properly configured
 3082 = Invalid %1$s %2$s as it is not part of the AsterixDB cluster. Valid choices are %3$s
 3083 = Duplicate feed adaptor name: %1$s
-3084 = Cannot wait for state %1$s. The only states that can be waited for are STARTED or STOPPED
+3084 = Cannot subscribe to events of a failed active entity
 3085 = Unknown Adapter Name.
 3086 = Cannot find record reader %1$s with specified configuration
 3087 = Cannot find function %1$s
 3088 = %1$s is not a valid runtime Id
-3089 = Cannot subscribe to events of a failed active entity
 
 # Lifecycle management errors
 4000 = Partition id %1$d for node %2$s already in use by node %3$s
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ActiveEntityEventsListener.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ActiveEntityEventsListener.java
index cee6fa9..409c297 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ActiveEntityEventsListener.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ActiveEntityEventsListener.java
@@ -88,6 +88,7 @@
         this.numRegistered = 0;
     }
 
+    @Override
     public synchronized void notify(ActiveEvent event) {
         try {
             LOGGER.finer("EventListener is notified.");
@@ -191,8 +192,11 @@
     @SuppressWarnings("unchecked")
     @Override
     public void refreshStats(long timeout) throws HyracksDataException {
+        LOGGER.log(Level.INFO, "refreshStats called");
         synchronized (this) {
             if (state != ActivityState.STARTED || statsRequestState == RequestState.STARTED) {
+                LOGGER.log(Level.INFO, "returning immediately since state = " + state + " and statsRequestState = "
+                        + statsRequestState);
                 return;
             } else {
                 statsRequestState = RequestState.STARTED;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/WaitForStateSubscriber.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/WaitForStateSubscriber.java
index ea7e3ae..7bab421 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/WaitForStateSubscriber.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/WaitForStateSubscriber.java
@@ -33,9 +33,6 @@
             throws HyracksDataException {
         super(listener);
         this.targetState = targetState;
-        if (targetState != ActivityState.STARTED && targetState != ActivityState.STOPPED) {
-            throw new RuntimeDataException(ErrorCode.CANNOT_WAIT_FOR_STATE, targetState);
-        }
         listener.subscribe(this);
     }