[NO ISSUE][ING] Stop Succeeds When Feed Fails

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Before this change, if a feed fails after stop is issued, it
  goes to a failure state and the stop request will be stuck
  waiting for the feed to go to a stopped state.
- With this change, a failure in the feed will cause it to go
  to a stopped state if a stop request was issued.
- Test case is added

Change-Id: I486e11a339c1dfadc6a2a357c95a83dfc05772a1
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2258
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
index 8cbc109..12bcc0d2 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
@@ -193,9 +193,9 @@
         if (jobStatus.equals(JobStatus.FAILURE)) {
             jobFailure = exceptions.isEmpty() ? new RuntimeDataException(ErrorCode.UNREPORTED_TASK_FAILURE_EXCEPTION)
                     : exceptions.get(0);
-            setState(ActivityState.TEMPORARILY_FAILED);
+            setState((state == ActivityState.STOPPING) ? ActivityState.STOPPED : ActivityState.TEMPORARILY_FAILED);
             if (prevState != ActivityState.SUSPENDING && prevState != ActivityState.RECOVERING
-                    && prevState != ActivityState.RESUMING) {
+                    && prevState != ActivityState.RESUMING && prevState != ActivityState.STOPPING) {
                 recover();
             }
         } else {
@@ -551,4 +551,10 @@
     public String getDisplayName() throws HyracksDataException {
         return this.getEntityId().toString();
     }
+
+    @Override
+    public String toString() {
+        return "{\"class\":\"" + getClass().getSimpleName() + "\"" + "\"entityId\":\"" + entityId + "\""
+                + "\"state\":\"" + state + "\"" + "}";
+    }
 }
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java
index 40d4f6a..093d150 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java
@@ -346,6 +346,27 @@
         Assert.assertEquals(ActivityState.RUNNING, listener.getState());
     }
 
+    @Test
+    public void testStopFromRunningAndJobFails() throws Exception {
+        testStartWhenStartSucceed();
+        Assert.assertEquals(ActivityState.RUNNING, listener.getState());
+        listener.onStop(Behavior.STEP_SUCCEED);
+        Action stopping = users[1].stopActivity(listener);
+        // wait for notification from listener
+        synchronized (listener) {
+            listener.wait();
+        }
+        clusterController.jobFinish(listener.getJobId(), JobStatus.FAILURE,
+                Collections.singletonList(new HyracksDataException("Runtime Failure")));
+        Assert.assertNull(listener.getRecoveryTask());
+        listener.allowStep();
+        stopping.sync();
+        Assert.assertFalse(stopping.hasFailed());
+        Assert.assertEquals(ActivityState.STOPPED, listener.getState());
+        Assert.assertNull(listener.getRecoveryTask());
+
+    }
+
     @SuppressWarnings("deprecation")
     @Test
     public void testRecovery() throws Exception {
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Actor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Actor.java
index c50a4a2..fcd18d0 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Actor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Actor.java
@@ -34,4 +34,9 @@
     protected void handle(Action action) throws Exception {
         action.execute(actorMdProvider);
     }
+
+    @Override
+    public String toString() {
+        return "{\"name\":\"" + name + "\"}";
+    }
 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestEventsListener.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestEventsListener.java
index 1cedc96..7e1bc37 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestEventsListener.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestEventsListener.java
@@ -145,13 +145,14 @@
     @SuppressWarnings("deprecation")
     @Override
     protected Void doStop(MetadataProvider metadataProvider) throws HyracksDataException {
+        ActivityState intention = state;
         step(onStop);
         failCompile(onStop);
         try {
             Set<ActivityState> waitFor;
-            if (state == ActivityState.STOPPING) {
+            if (intention == ActivityState.STOPPING) {
                 waitFor = EnumSet.of(ActivityState.STOPPED, ActivityState.PERMANENTLY_FAILED);
-            } else if (state == ActivityState.SUSPENDING) {
+            } else if (intention == ActivityState.SUSPENDING) {
                 waitFor = EnumSet.of(ActivityState.SUSPENDED, ActivityState.TEMPORARILY_FAILED);
             } else {
                 throw new IllegalStateException("stop with what intention??");
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/SingleThreadEventProcessor.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/SingleThreadEventProcessor.java
index 9eb3b8e..ccf0163 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/SingleThreadEventProcessor.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/SingleThreadEventProcessor.java
@@ -29,7 +29,7 @@
 public abstract class SingleThreadEventProcessor<T> implements Runnable {
 
     private static final Logger LOGGER = LogManager.getLogger();
-    private final String name;
+    protected final String name;
     private final LinkedBlockingQueue<T> eventInbox;
     private volatile Thread executorThread;
     private volatile boolean stopped = false;