[NO ISSUE][ING] Cleanup active listener

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Merge STOPPED and PERMANENT_FAILURE into a single
  state since they are equivalent.
- Allow timeout of start and stop of active entities.
- Add test cases for timeout scenarios.
- Refactor common code into the abstract listener class.

Change-Id: I7f3b14aec46728fbe8b256b915d0e30992b2fe47
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2618
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActivityState.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActivityState.java
index eb43d10..1f3daac 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActivityState.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActivityState.java
@@ -24,11 +24,6 @@
      */
     STOPPED,
     /**
-     * Failure to recover from a temporary faliure caused the activity to fail permanantly.
-     * No further recovery attempts will be made.
-     */
-    PERMANENTLY_FAILED,
-    /**
      * An unexpected failure caused the activity to fail but recovery attempts will start taking place
      */
     TEMPORARILY_FAILED,
@@ -41,6 +36,10 @@
      */
     STARTING,
     /**
+     * The activity was started but is being cancelled. Waiting for the job cancellation to complete
+     */
+    CANCELLING,
+    /**
      * The activity has been started successfully and is running
      */
     RUNNING,
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventSubscriber.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventSubscriber.java
index 142e84b..e01d0a7 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventSubscriber.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventSubscriber.java
@@ -31,7 +31,7 @@
      * @param event
      * @throws HyracksDataException
      */
-    void notify(ActiveEvent event) throws HyracksDataException;
+    void notify(ActiveEvent event);
 
     /**
      * Checkcs whether the subscriber is done receiving events
@@ -53,5 +53,5 @@
      * @param eventsListener
      * @throws HyracksDataException
      */
-    void subscribed(IActiveEntityEventsListener eventsListener) throws HyracksDataException;
+    void subscribed(IActiveEntityEventsListener eventsListener);
 }
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
index 37120e4..2d36bb2 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
@@ -56,9 +56,8 @@
      * subscribe to events. subscription ends when subscriber.done() returns true
      *
      * @param subscriber
-     * @throws HyracksDataException
      */
-    void subscribe(IActiveEntityEventSubscriber subscriber) throws HyracksDataException;
+    void subscribe(IActiveEntityEventSubscriber subscriber);
 
     /**
      * The most recent acquired stats for the active entity
@@ -99,6 +98,7 @@
 
     /**
      * Get the stats name that's used to form the stats JSON for the active entity
+     *
      * @return the customized stats name for current active entity
      */
     String getDisplayName() throws HyracksDataException;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
index 2a214e3..66d3e81 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
@@ -23,8 +23,10 @@
 import java.util.EnumSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.asterix.active.ActiveEvent;
 import org.apache.asterix.active.ActiveEvent.Kind;
@@ -34,6 +36,7 @@
 import org.apache.asterix.active.IActiveEntityEventSubscriber;
 import org.apache.asterix.active.IRetryPolicyFactory;
 import org.apache.asterix.active.NoRetryPolicyFactory;
+import org.apache.asterix.active.message.ActiveManagerMessage;
 import org.apache.asterix.active.message.ActivePartitionMessage;
 import org.apache.asterix.active.message.ActivePartitionMessage.Event;
 import org.apache.asterix.active.message.ActiveStatsRequestMessage;
@@ -55,6 +58,9 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.util.InvokeUtil;
+import org.apache.hyracks.util.ExitUtil;
+import org.apache.hyracks.util.Span;
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -65,7 +71,7 @@
     private static final Level level = Level.INFO;
     private static final ActiveEvent STATE_CHANGED = new ActiveEvent(null, Kind.STATE_CHANGED, null, null);
     private static final EnumSet<ActivityState> TRANSITION_STATES = EnumSet.of(ActivityState.RESUMING,
-            ActivityState.STARTING, ActivityState.STOPPING, ActivityState.RECOVERING);
+            ActivityState.STARTING, ActivityState.STOPPING, ActivityState.RECOVERING, ActivityState.CANCELLING);
     private static final String DEFAULT_ACTIVE_STATS = "{\"Stats\":\"N/A\"}";
     // finals
     protected final IClusterStateManager clusterStateManager;
@@ -129,7 +135,10 @@
         LOGGER.log(level, "State of " + getEntityId() + "is being set to " + newState + " from " + state);
         this.prevState = state;
         this.state = newState;
-        if (newState == ActivityState.SUSPENDED) {
+        if (newState == ActivityState.STARTING || newState == ActivityState.RECOVERING
+                || newState == ActivityState.RESUMING) {
+            jobFailure = null;
+        } else if (newState == ActivityState.SUSPENDED) {
             suspended = true;
         }
         notifySubscribers(STATE_CHANGED);
@@ -142,10 +151,7 @@
             ActiveEvent.Kind eventKind = event.getEventKind();
             switch (eventKind) {
                 case JOB_CREATED:
-                    jobCreated(event);
-                    break;
                 case JOB_STARTED:
-                    start(event);
                     break;
                 case JOB_FINISHED:
                     finish(event);
@@ -163,14 +169,10 @@
         }
     }
 
-    protected void jobCreated(ActiveEvent event) {
-        // Do nothing
-    }
-
     protected synchronized void handle(ActivePartitionMessage message) {
         if (message.getEvent() == Event.RUNTIME_REGISTERED) {
             numRegistered++;
-            if (numRegistered == locations.getLocations().length) {
+            if (allPartitionsRegisteredAndNotCancelling()) {
                 setState(ActivityState.RUNNING);
             }
         } else if (message.getEvent() == Event.RUNTIME_DEREGISTERED) {
@@ -178,6 +180,10 @@
         }
     }
 
+    private boolean allPartitionsRegisteredAndNotCancelling() {
+        return numRegistered == locations.getLocations().length && state != ActivityState.CANCELLING;
+    }
+
     @SuppressWarnings("unchecked")
     protected void finish(ActiveEvent event) throws HyracksDataException {
         LOGGER.log(level, "the job " + jobId + " finished");
@@ -191,10 +197,11 @@
         JobStatus jobStatus = status.getLeft();
         List<Exception> exceptions = status.getRight();
         LOGGER.log(level, "The job finished with status: " + jobStatus);
-        if (jobStatus.equals(JobStatus.FAILURE)) {
+        if (!jobSuccessfullyTerminated(jobStatus)) {
             jobFailure = exceptions.isEmpty() ? new RuntimeDataException(ErrorCode.UNREPORTED_TASK_FAILURE_EXCEPTION)
                     : exceptions.get(0);
-            setState((state == ActivityState.STOPPING) ? ActivityState.STOPPED : ActivityState.TEMPORARILY_FAILED);
+            setState((state == ActivityState.STOPPING || state == ActivityState.CANCELLING) ? ActivityState.STOPPED
+                    : ActivityState.TEMPORARILY_FAILED);
             if (prevState == ActivityState.RUNNING) {
                 recover();
             }
@@ -203,14 +210,12 @@
         }
     }
 
-    protected void start(ActiveEvent event) {
-        jobId = event.getJobId();
-        numRegistered = 0;
-        numDeRegistered = 0;
+    private boolean jobSuccessfullyTerminated(JobStatus jobStatus) {
+        return jobStatus.equals(JobStatus.TERMINATED);
     }
 
     @Override
-    public synchronized void subscribe(IActiveEntityEventSubscriber subscriber) throws HyracksDataException {
+    public synchronized void subscribe(IActiveEntityEventSubscriber subscriber) {
         subscriber.subscribed(this);
         if (!subscriber.isDone()) {
             subscribers.add(subscriber);
@@ -315,11 +320,7 @@
             if (subscriber.isDone()) {
                 it.remove();
             } else {
-                try {
-                    subscriber.notify(event);
-                } catch (HyracksDataException e) {
-                    LOGGER.log(Level.WARN, "Failed to notify subscriber", e);
-                }
+                subscriber.notify(event);
                 if (subscriber.isDone()) {
                     it.remove();
                 }
@@ -342,22 +343,12 @@
         }
     }
 
-    /**
-     * this method is called before an action call is returned. It ensures that the request didn't fail
-     *
-     */
-    protected synchronized void checkNoFailure() throws HyracksDataException {
-        if (state == ActivityState.PERMANENTLY_FAILED) {
-            throw HyracksDataException.create(jobFailure);
-        }
-    }
-
     @Override
     public synchronized void recover() {
         LOGGER.log(level, "Recover is called on " + entityId);
         if (retryPolicyFactory == NoRetryPolicyFactory.INSTANCE) {
             LOGGER.log(level, "But it has no recovery policy, so it is set to permanent failure");
-            setState(ActivityState.PERMANENTLY_FAILED);
+            setState(ActivityState.STOPPED);
         } else {
             ExecutorService executor = appCtx.getServiceContext().getControllerService().getExecutor();
             setState(ActivityState.TEMPORARILY_FAILED);
@@ -371,7 +362,7 @@
     public synchronized void start(MetadataProvider metadataProvider)
             throws HyracksDataException, InterruptedException {
         waitForNonTransitionState();
-        if (state != ActivityState.PERMANENTLY_FAILED && state != ActivityState.STOPPED) {
+        if (state != ActivityState.STOPPED) {
             throw new RuntimeDataException(ErrorCode.ACTIVE_ENTITY_ALREADY_STARTED, entityId, state);
         }
         try {
@@ -379,30 +370,131 @@
             doStart(metadataProvider);
             setRunning(metadataProvider, true);
         } catch (Exception e) {
-            setState(ActivityState.PERMANENTLY_FAILED);
+            setState(ActivityState.STOPPED);
             LOGGER.log(Level.ERROR, "Failed to start the entity " + entityId, e);
             throw HyracksDataException.create(e);
         }
     }
 
-    protected abstract void doStart(MetadataProvider metadataProvider) throws HyracksDataException;
+    @SuppressWarnings("squid:S1181")
+    protected synchronized void doStart(MetadataProvider metadataProvider) throws HyracksDataException {
+        WaitForStateSubscriber subscriber = new WaitForStateSubscriber(this,
+                EnumSet.of(ActivityState.RUNNING, ActivityState.TEMPORARILY_FAILED, ActivityState.STOPPED));
+        jobId = compileAndStartJob(metadataProvider);
+        numRegistered = 0;
+        numDeRegistered = 0;
+        try {
+            subscriber.sync();
+            if (subscriber.getFailure() != null) {
+                throw subscriber.getFailure();
+            }
+        } catch (InterruptedException ie) {
+            // interrupted.. check if the subscriber is done
+            if (subscriber.isDone()) {
+                if (subscriber.getFailure() != null) {
+                    throw HyracksDataException.create(subscriber.getFailure());
+                }
+                Thread.currentThread().interrupt();
+            } else {
+                // Subscriber is not done yet. so, we need to cancel, we have the jobId
+                setState(ActivityState.CANCELLING);
+                cancelJob(ie);
+                throw HyracksDataException.create(ie);
+            }
+        } catch (Throwable e) {
+            throw HyracksDataException.create(e);
+        }
+    }
 
-    protected abstract Void doStop(MetadataProvider metadataProvider) throws HyracksDataException;
+    private void cancelJob(Throwable th) {
+        cancelJobSafely(metadataProvider, th);
+        final WaitForStateSubscriber cancelSubscriber =
+                new WaitForStateSubscriber(this, EnumSet.of(ActivityState.STOPPED));
+        final Span span = Span.start(2, TimeUnit.MINUTES);
+        InvokeUtil.doUninterruptibly(() -> {
+            if (!cancelSubscriber.sync(span)) {
+                ExitUtil.halt(ExitUtil.EC_FAILED_TO_CANCEL_ACTIVE_START_STOP);
+            }
+        });
+    }
 
-    protected abstract Void doSuspend(MetadataProvider metadataProvider) throws HyracksDataException;
+    @SuppressWarnings("squid:S1181")
+    protected void cancelJobSafely(MetadataProvider metadataProvider, Throwable e) {
+        try {
+            metadataProvider.getApplicationContext().getHcc().cancelJob(jobId);
+        } catch (Throwable th) {
+            LOGGER.warn("Failed to cancel active job", th);
+            e.addSuppressed(th);
+        }
+    }
+
+    protected abstract JobId compileAndStartJob(MetadataProvider metadataProvider) throws HyracksDataException;
+
+    @SuppressWarnings("squid:S1181")
+    protected synchronized void doStop(MetadataProvider metadataProvider) throws HyracksDataException {
+        ActivityState intention = state;
+        Set<ActivityState> waitFor;
+        if (intention == ActivityState.STOPPING) {
+            waitFor = EnumSet.of(ActivityState.STOPPED);
+        } else if (intention == ActivityState.SUSPENDING) {
+            waitFor = EnumSet.of(ActivityState.SUSPENDED, ActivityState.TEMPORARILY_FAILED);
+        } else {
+            throw new IllegalStateException("stop with what intention?? Current state is " + intention);
+        }
+        WaitForStateSubscriber subscriber = new WaitForStateSubscriber(this, waitFor);
+        // Note: once we start sending stop messages, we can't go back until the entity is stopped
+        try {
+            sendStopMessages(metadataProvider);
+            LOGGER.log(Level.DEBUG, "Waiting for its state to become " + waitFor);
+            subscriber.sync();
+            LOGGER.log(Level.DEBUG, "Disconnect has been completed " + waitFor);
+        } catch (InterruptedException ie) {
+            forceStop(subscriber, ie);
+            Thread.currentThread().interrupt();
+        } catch (Throwable e) {
+            forceStop(subscriber, e);
+        }
+    }
+
+    private void forceStop(WaitForStateSubscriber subscriber, Throwable e) {
+        if (!subscriber.isDone()) {
+            cancelJob(e);
+        }
+        // Stop should not through an exception if the entity was stopped..
+        // Simply log
+        LOGGER.warn("Failure encountered while stopping {}", this, e);
+    }
+
+    protected void sendStopMessages(MetadataProvider metadataProvider) throws Exception {
+        ICcApplicationContext applicationCtx = metadataProvider.getApplicationContext();
+        ICCMessageBroker messageBroker = (ICCMessageBroker) applicationCtx.getServiceContext().getMessageBroker();
+        AlgebricksAbsolutePartitionConstraint runtimeLocations = getLocations();
+        int partition = 0;
+        LOGGER.log(Level.INFO, "Sending stop messages to " + runtimeLocations);
+        for (String location : runtimeLocations.getLocations()) {
+            LOGGER.log(Level.INFO, "Sending to " + location);
+            messageBroker.sendApplicationMessageToNC(
+                    new ActiveManagerMessage(ActiveManagerMessage.Kind.STOP_ACTIVITY, getActiveRuntimeId(partition++)),
+                    location);
+        }
+    }
+
+    protected abstract ActiveRuntimeId getActiveRuntimeId(int partition);
+
+    protected abstract void doSuspend(MetadataProvider metadataProvider) throws HyracksDataException;
 
     protected abstract void doResume(MetadataProvider metadataProvider) throws HyracksDataException;
 
-    protected abstract void setRunning(MetadataProvider metadataProvider, boolean running) throws HyracksDataException;
+    protected abstract void setRunning(MetadataProvider metadataProvider, boolean running);
 
     @Override
-    public synchronized void stop(MetadataProvider metadataProvider) throws HyracksDataException, InterruptedException {
+    public final synchronized void stop(MetadataProvider metadataProvider)
+            throws HyracksDataException, InterruptedException {
         waitForNonTransitionState();
-        if (state != ActivityState.RUNNING && state != ActivityState.PERMANENTLY_FAILED
-                && state != ActivityState.TEMPORARILY_FAILED) {
+        if (state != ActivityState.RUNNING && state != ActivityState.TEMPORARILY_FAILED) {
             throw new RuntimeDataException(ErrorCode.ACTIVE_ENTITY_CANNOT_BE_STOPPED, entityId, state);
         }
-        if (state == ActivityState.TEMPORARILY_FAILED || state == ActivityState.PERMANENTLY_FAILED) {
+        if (state == ActivityState.TEMPORARILY_FAILED) {
             if (rt != null) {
                 setState(ActivityState.STOPPING);
                 rt.cancel();
@@ -419,11 +511,12 @@
             setState(ActivityState.STOPPING);
             try {
                 doStop(metadataProvider);
-                setRunning(metadataProvider, false);
             } catch (Exception e) {
-                setState(ActivityState.PERMANENTLY_FAILED);
+                setState(ActivityState.STOPPED);
                 LOGGER.log(Level.ERROR, "Failed to stop the entity " + entityId, e);
                 throw HyracksDataException.create(e);
+            } finally {
+                setRunning(metadataProvider, false);
             }
         } else {
             throw new RuntimeDataException(ErrorCode.ACTIVE_ENTITY_CANNOT_BE_STOPPED, entityId, state);
@@ -445,7 +538,7 @@
             LOGGER.log(level, "Waiting for ongoing activities");
             waitForNonTransitionState();
             LOGGER.log(level, "Proceeding with suspension. Current state is " + state);
-            if (state == ActivityState.STOPPED || state == ActivityState.PERMANENTLY_FAILED) {
+            if (state == ActivityState.STOPPED) {
                 suspended = true;
                 return;
             }
@@ -461,7 +554,10 @@
             subscriber = new WaitForStateSubscriber(this,
                     EnumSet.of(ActivityState.SUSPENDED, ActivityState.TEMPORARILY_FAILED));
             suspendTask = metadataProvider.getApplicationContext().getServiceContext().getControllerService()
-                    .getExecutor().submit(() -> doSuspend(metadataProvider));
+                    .getExecutor().submit(() -> {
+                        doSuspend(metadataProvider);
+                        return null;
+                    });
             LOGGER.log(level, "Suspension task has been submitted");
         }
         try {
@@ -479,7 +575,7 @@
                         // restore state
                         setState(prevState);
                     } else {
-                        setState(ActivityState.PERMANENTLY_FAILED);
+                        setState(ActivityState.STOPPED);
                     }
                 }
                 throw HyracksDataException.create(e);
@@ -489,7 +585,7 @@
 
     @Override
     public synchronized void resume(MetadataProvider metadataProvider) throws HyracksDataException {
-        if (state == ActivityState.STOPPED || state == ActivityState.PERMANENTLY_FAILED) {
+        if (state == ActivityState.STOPPED) {
             suspended = false;
             notifyAll();
             return;
@@ -517,7 +613,7 @@
 
     @Override
     public boolean isActive() {
-        return state != ActivityState.STOPPED && state != ActivityState.PERMANENTLY_FAILED;
+        return state != ActivityState.STOPPED && state != ActivityState.CANCELLING;
     }
 
     @Override
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
index fbf644f..99ed42d 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
@@ -21,21 +21,17 @@
 import java.util.EnumSet;
 import java.util.List;
 
+import org.apache.asterix.active.ActiveRuntimeId;
 import org.apache.asterix.active.ActivityState;
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.active.IActiveEntityEventSubscriber;
 import org.apache.asterix.active.IRetryPolicyFactory;
-import org.apache.asterix.app.translator.DefaultStatementExecutorFactory;
-import org.apache.asterix.app.translator.QueryTranslator;
-import org.apache.asterix.common.context.IStorageComponentProvider;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.common.utils.JobUtils;
-import org.apache.asterix.compiler.provider.AqlCompilationProvider;
-import org.apache.asterix.compiler.provider.ILangCompilationProvider;
 import org.apache.asterix.external.feed.watch.WaitForStateSubscriber;
-import org.apache.asterix.file.StorageComponentProvider;
+import org.apache.asterix.external.operators.FeedIntakeOperatorNodePushable;
 import org.apache.asterix.lang.common.statement.StartFeedStatement;
 import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.asterix.metadata.entities.Dataset;
@@ -45,9 +41,9 @@
 import org.apache.asterix.utils.FeedOperations;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 
 public class FeedEventsListener extends ActiveEntityEventsListener {
@@ -80,58 +76,43 @@
     }
 
     @Override
-    protected void doStart(MetadataProvider mdProvider) throws HyracksDataException {
+    public synchronized void start(MetadataProvider metadataProvider)
+            throws HyracksDataException, InterruptedException {
+        super.start(metadataProvider);
+        // Note: The current implementation of the wait for completion flag is problematic due to locking issues:
+        // Locks obtained during the start of the feed are not released, and so, the feed can't be stopped
+        // and also, read locks over dataverses, datasets, etc, are never released.
+        boolean wait = Boolean.parseBoolean(metadataProvider.getConfig().get(StartFeedStatement.WAIT_FOR_COMPLETION));
+        if (wait) {
+            IActiveEntityEventSubscriber stoppedSubscriber =
+                    new WaitForStateSubscriber(this, EnumSet.of(ActivityState.STOPPED));
+            stoppedSubscriber.sync();
+        }
+    }
+
+    @Override
+    protected JobId compileAndStartJob(MetadataProvider mdProvider) throws HyracksDataException {
         try {
             Pair<JobSpecification, AlgebricksAbsolutePartitionConstraint> jobInfo =
                     FeedOperations.buildStartFeedJob(mdProvider, feed, feedConnections, statementExecutor, hcc);
             JobSpecification feedJob = jobInfo.getLeft();
-            WaitForStateSubscriber eventSubscriber = new WaitForStateSubscriber(this, EnumSet.of(ActivityState.RUNNING,
-                    ActivityState.TEMPORARILY_FAILED, ActivityState.PERMANENTLY_FAILED));
             feedJob.setProperty(ActiveNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME, entityId);
             // TODO(Yingyi): currently we do not check IFrameWriter protocol violations for Feed jobs.
             // We will need to design general exception handling mechanism for feeds.
             setLocations(jobInfo.getRight());
-            boolean wait = Boolean.parseBoolean(mdProvider.getConfig().get(StartFeedStatement.WAIT_FOR_COMPLETION));
-            JobUtils.runJob(hcc, feedJob, false);
-            eventSubscriber.sync();
-            if (eventSubscriber.getFailure() != null) {
-                throw eventSubscriber.getFailure();
-            }
-            if (wait) {
-                IActiveEntityEventSubscriber stoppedSubscriber = new WaitForStateSubscriber(this,
-                        EnumSet.of(ActivityState.STOPPED, ActivityState.PERMANENTLY_FAILED));
-                stoppedSubscriber.sync();
-            }
+            return JobUtils.runJob(hcc, feedJob, false);
         } catch (Exception e) {
             throw HyracksDataException.create(e);
         }
     }
 
     @Override
-    protected Void doStop(MetadataProvider metadataProvider) throws HyracksDataException {
-        IActiveEntityEventSubscriber eventSubscriber =
-                new WaitForStateSubscriber(this, EnumSet.of(ActivityState.STOPPED, ActivityState.PERMANENTLY_FAILED));
-        try {
-            // Construct ActiveMessage
-            for (int i = 0; i < getLocations().getLocations().length; i++) {
-                String intakeLocation = getLocations().getLocations()[i];
-                FeedOperations.SendStopMessageToNode(metadataProvider.getApplicationContext(), entityId, intakeLocation,
-                        i);
-            }
-            eventSubscriber.sync();
-        } catch (Exception e) {
-            throw HyracksDataException.create(e);
-        }
-        return null;
-    }
-
-    @Override
-    protected void setRunning(MetadataProvider metadataProvider, boolean running) throws HyracksDataException {
+    protected void setRunning(MetadataProvider metadataProvider, boolean running) {
         // No op
     }
 
     @Override
-    protected Void doSuspend(MetadataProvider metadataProvider) throws HyracksDataException {
+    protected void doSuspend(MetadataProvider metadataProvider) throws HyracksDataException {
         throw new RuntimeDataException(ErrorCode.OPERATION_NOT_SUPPORTED);
     }
 
@@ -139,4 +120,9 @@
     protected void doResume(MetadataProvider metadataProvider) throws HyracksDataException {
         throw new RuntimeDataException(ErrorCode.OPERATION_NOT_SUPPORTED);
     }
+
+    @Override
+    protected ActiveRuntimeId getActiveRuntimeId(int partition) {
+        return new ActiveRuntimeId(entityId, FeedIntakeOperatorNodePushable.class.getSimpleName(), partition);
+    }
 }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java
index 3024dc6..9531b63 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java
@@ -92,7 +92,7 @@
             if (retryPolicyFactory == NoRetryPolicyFactory.INSTANCE) {
                 synchronized (listener) {
                     if (!cancelRecovery) {
-                        listener.setState(ActivityState.PERMANENTLY_FAILED);
+                        listener.setState(ActivityState.STOPPED);
                         listener.setRunning(metadataProvider, false);
                     }
                 }
@@ -172,7 +172,7 @@
                 }
                 if (listener.getState() == ActivityState.TEMPORARILY_FAILED) {
                     LOGGER.warn("Recovery for {} permanently failed", listener.getEntityId());
-                    listener.setState(ActivityState.PERMANENTLY_FAILED);
+                    listener.setState(ActivityState.STOPPED);
                     listener.setRunning(metadataProvider, false);
                 }
                 listener.notifyAll();
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
index dc057cb..593d7ce 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
@@ -29,10 +29,6 @@
 import java.util.Set;
 import java.util.TreeSet;
 
-import org.apache.asterix.active.ActiveRuntimeId;
-import org.apache.asterix.active.EntityId;
-import org.apache.asterix.active.message.ActiveManagerMessage;
-import org.apache.asterix.active.message.ActiveManagerMessage.Kind;
 import org.apache.asterix.app.translator.DefaultStatementExecutorFactory;
 import org.apache.asterix.app.translator.QueryTranslator;
 import org.apache.asterix.common.cluster.IClusterStateManager;
@@ -42,7 +38,6 @@
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.functions.FunctionSignature;
-import org.apache.asterix.common.messaging.api.ICCMessageBroker;
 import org.apache.asterix.common.transactions.TxnId;
 import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.asterix.compiler.provider.SqlppCompilationProvider;
@@ -52,7 +47,6 @@
 import org.apache.asterix.external.feed.watch.FeedActivityDetails;
 import org.apache.asterix.external.operators.FeedCollectOperatorDescriptor;
 import org.apache.asterix.external.operators.FeedIntakeOperatorDescriptor;
-import org.apache.asterix.external.operators.FeedIntakeOperatorNodePushable;
 import org.apache.asterix.external.operators.FeedMetaOperatorDescriptor;
 import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.asterix.external.util.FeedUtils;
@@ -485,17 +479,4 @@
         return Pair.of(combineIntakeCollectJobs(metadataProvider, feed, intakeJob, jobsList, feedConnections,
                 ingestionLocations), intakeInfo.getRight().getPartitionConstraint());
     }
-
-    public static void SendStopMessageToNode(ICcApplicationContext appCtx, EntityId feedId, String intakeNodeLocation,
-            Integer partition) throws Exception {
-        ActiveManagerMessage stopFeedMessage = new ActiveManagerMessage(Kind.STOP_ACTIVITY,
-                new ActiveRuntimeId(feedId, FeedIntakeOperatorNodePushable.class.getSimpleName(), partition));
-        SendActiveMessage(appCtx, stopFeedMessage, intakeNodeLocation);
-    }
-
-    private static void SendActiveMessage(ICcApplicationContext appCtx, ActiveManagerMessage activeManagerMessage,
-            String nodeId) throws Exception {
-        ICCMessageBroker messageBroker = (ICCMessageBroker) appCtx.getServiceContext().getMessageBroker();
-        messageBroker.sendApplicationMessageToNC(activeManagerMessage, nodeId);
-    }
 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java
index 9612ead..6c95958 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java
@@ -180,7 +180,7 @@
         Action action = users[0].startActivity(listener);
         action.sync();
         assertFailure(action, 0);
-        Assert.assertEquals(ActivityState.PERMANENTLY_FAILED, listener.getState());
+        Assert.assertEquals(ActivityState.STOPPED, listener.getState());
     }
 
     @Test
@@ -190,7 +190,42 @@
         Action action = users[0].startActivity(listener);
         action.sync();
         assertFailure(action, 0);
-        Assert.assertEquals(ActivityState.PERMANENTLY_FAILED, listener.getState());
+        Assert.assertEquals(ActivityState.STOPPED, listener.getState());
+    }
+
+    @Test
+    public void testStartWhenStartSucceedButTimesout() throws Exception {
+        Assert.assertEquals(ActivityState.STOPPED, listener.getState());
+        listener.onStart(Behavior.FAIL_START_TIMEOUT_OP_SUCCEED);
+        Action action = users[0].startActivity(listener);
+        action.sync();
+        assertSuccess(action);
+        Assert.assertEquals(ActivityState.RUNNING, listener.getState());
+    }
+
+    @Test
+    public void testStartWhenStartStuckTimesout() throws Exception {
+        Assert.assertEquals(ActivityState.STOPPED, listener.getState());
+        listener.onStart(Behavior.FAIL_START_TIMEOUT_STUCK);
+        Action action = users[0].startActivity(listener);
+        action.sync();
+        assertFailure(action, 0);
+        Assert.assertEquals(ActivityState.STOPPED, listener.getState());
+    }
+
+    @Test
+    public void testStopWhenStopTimesout() throws Exception {
+        Assert.assertEquals(ActivityState.STOPPED, listener.getState());
+        listener.onStart(Behavior.SUCCEED);
+        Action action = users[0].startActivity(listener);
+        action.sync();
+        assertSuccess(action);
+        Assert.assertEquals(ActivityState.RUNNING, listener.getState());
+        listener.onStop(Behavior.FAIL_STOP_TIMEOUT);
+        action = users[0].stopActivity(listener);
+        action.sync();
+        assertSuccess(action);
+        Assert.assertEquals(ActivityState.STOPPED, listener.getState());
     }
 
     @Test
@@ -336,14 +371,18 @@
     }
 
     @Test
-    public void testSuspendFromRunningAndStopFail() throws Exception {
+    public void testSuspendFromRunningAndStopFailThenResumeSucceeds() throws Exception {
         testStartWhenStartSucceed();
         // suspend
         Assert.assertEquals(ActivityState.RUNNING, listener.getState());
         listener.onStop(Behavior.FAIL_COMPILE);
         Action suspension = users[1].suspendActivity(listener);
         suspension.sync();
-        Assert.assertTrue(suspension.hasFailed());
+        Assert.assertFalse(suspension.hasFailed());
+        Assert.assertEquals(ActivityState.TEMPORARILY_FAILED, listener.getState());
+        Action resumption = users[1].resumeActivity(listener);
+        resumption.sync();
+        assertSuccess(resumption);
         Assert.assertEquals(ActivityState.RUNNING, listener.getState());
     }
 
@@ -492,19 +531,19 @@
         WaitForStateSubscriber tempFailSubscriber =
                 new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED));
         WaitForStateSubscriber permFailSubscriber =
-                new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.PERMANENTLY_FAILED));
+                new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.STOPPED));
         listener.onStart(Behavior.FAIL_COMPILE);
         clusterController.jobFinish(listener.getJobId(), JobStatus.FAILURE,
                 Collections.singletonList(new HyracksDataException("Compilation Failure")));
         tempFailSubscriber.sync();
         permFailSubscriber.sync();
-        Assert.assertEquals(ActivityState.PERMANENTLY_FAILED, listener.getState());
+        Assert.assertEquals(ActivityState.STOPPED, listener.getState());
     }
 
     @Test
     public void testStartAfterPermenantFailure() throws Exception {
         testRecoveryFailureAfterOneAttemptCompilationFailure();
-        Assert.assertEquals(ActivityState.PERMANENTLY_FAILED, listener.getState());
+        Assert.assertEquals(ActivityState.STOPPED, listener.getState());
         listener.onStart(Behavior.SUCCEED);
         WaitForStateSubscriber subscriber = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RUNNING));
         users[1].startActivity(listener);
@@ -536,13 +575,13 @@
         WaitForStateSubscriber tempFailSubscriber =
                 new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED));
         WaitForStateSubscriber permFailSubscriber =
-                new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.PERMANENTLY_FAILED));
+                new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.STOPPED));
         listener.onStart(Behavior.FAIL_RUNTIME);
         clusterController.jobFinish(listener.getJobId(), JobStatus.FAILURE,
                 Collections.singletonList(new HyracksDataException("Runtime Failure")));
         tempFailSubscriber.sync();
         permFailSubscriber.sync();
-        Assert.assertEquals(ActivityState.PERMANENTLY_FAILED, listener.getState());
+        Assert.assertEquals(ActivityState.STOPPED, listener.getState());
     }
 
     @SuppressWarnings("deprecation")
@@ -555,12 +594,12 @@
         WaitForStateSubscriber tempFailSubscriber =
                 new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED));
         WaitForStateSubscriber permFailSubscriber =
-                new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.PERMANENTLY_FAILED));
+                new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.STOPPED));
         clusterController.jobFinish(listener.getJobId(), JobStatus.FAILURE,
                 Collections.singletonList(new HyracksDataException("Runtime Failure")));
         tempFailSubscriber.sync();
         permFailSubscriber.sync();
-        Assert.assertEquals(ActivityState.PERMANENTLY_FAILED, listener.getState());
+        Assert.assertEquals(ActivityState.STOPPED, listener.getState());
     }
 
     @SuppressWarnings("deprecation")
@@ -925,7 +964,7 @@
 
     @SuppressWarnings("deprecation")
     @Test
-    public void testCreateNewShadowDuringRecoveryAttemptThatSucceeds() throws Exception {
+    public void testCreateNewDatasetDuringRecoveryAttemptThatSucceeds() throws Exception {
         testStartWhenStartSucceed();
         listener.onStart(Behavior.FAIL_COMPILE);
         WaitForStateSubscriber tempFailSubscriber =
@@ -953,7 +992,7 @@
 
     @SuppressWarnings("deprecation")
     @Test
-    public void testCreateNewShadowDuringRecoveryAttemptThatFailsCompile() throws Exception {
+    public void testCreateNewDatasetDuringRecoveryAttemptThatFailsCompile() throws Exception {
         testStartWhenStartSucceed();
         listener.onStart(Behavior.FAIL_COMPILE);
         WaitForStateSubscriber tempFailSubscriber =
@@ -980,7 +1019,7 @@
 
     @SuppressWarnings("deprecation")
     @Test
-    public void testCreateNewShadowDuringRecoveryAttemptThatFailsRuntime() throws Exception {
+    public void testCreateNewDatasetDuringRecoveryAttemptThatFailsRuntime() throws Exception {
         testStartWhenStartSucceed();
         listener.onStart(Behavior.FAIL_COMPILE);
         WaitForStateSubscriber tempFailSubscriber =
@@ -1006,7 +1045,7 @@
     }
 
     @Test
-    public void testCreateNewShadowWhileStarting() throws Exception {
+    public void testCreateNewDatasetWhileStarting() throws Exception {
         Assert.assertEquals(ActivityState.STOPPED, listener.getState());
         listener.onStart(Behavior.STEP_SUCCEED);
         Action startAction = users[0].startActivity(listener);
@@ -1027,7 +1066,7 @@
     }
 
     @Test
-    public void testCreateNewShadowWhileRunning() throws Exception {
+    public void testCreateNewDatasetWhileRunning() throws Exception {
         testStartWhenStartSucceed();
         Dataset newDataset =
                 new Dataset(dataverseName, "newDataset", null, null, null, null, null, null, null, null, 0, 0);
@@ -1040,7 +1079,7 @@
     }
 
     @Test
-    public void testCreateNewShadowWhileSuspended() throws Exception {
+    public void testCreateNewDatasetWhileSuspended() throws Exception {
         testStartWhenStartSucceed();
         // suspend
         Assert.assertEquals(ActivityState.RUNNING, listener.getState());
@@ -1065,22 +1104,22 @@
     }
 
     @Test
-    public void testCreateNewShadowWhilePermanentFailure() throws Exception {
+    public void testCreateNewDatasetWhilePermanentFailure() throws Exception {
         testRecoveryFailureAfterOneAttemptCompilationFailure();
-        Assert.assertEquals(ActivityState.PERMANENTLY_FAILED, listener.getState());
+        Assert.assertEquals(ActivityState.STOPPED, listener.getState());
         Dataset newDataset =
                 new Dataset(dataverseName, "newDataset", null, null, null, null, null, null, null, null, 0, 0);
         Action createDatasetAction = users[0].addDataset(newDataset, listener);
         createDatasetAction.sync();
         assertSuccess(createDatasetAction);
-        Assert.assertEquals(ActivityState.PERMANENTLY_FAILED, listener.getState());
+        Assert.assertEquals(ActivityState.STOPPED, listener.getState());
         Assert.assertEquals(3, listener.getDatasets().size());
         Assert.assertEquals(clusterController.getAllDatasets().size(), listener.getDatasets().size());
     }
 
     @SuppressWarnings("deprecation")
     @Test
-    public void testDeleteShadowDuringRecoveryAttemptThatSucceeds() throws Exception {
+    public void testDeleteDatasetDuringRecoveryAttemptThatSucceeds() throws Exception {
         testStartWhenStartSucceed();
         listener.onStart(Behavior.FAIL_COMPILE);
         WaitForStateSubscriber tempFailSubscriber =
@@ -1106,7 +1145,7 @@
 
     @SuppressWarnings("deprecation")
     @Test
-    public void testDeleteShadowDuringRecoveryAttemptThatFailsCompile() throws Exception {
+    public void testDeleteDatasetDuringRecoveryAttemptThatFailsCompile() throws Exception {
         testStartWhenStartSucceed();
         listener.onStart(Behavior.FAIL_COMPILE);
         WaitForStateSubscriber tempFailSubscriber =
@@ -1131,7 +1170,7 @@
 
     @SuppressWarnings("deprecation")
     @Test
-    public void testDeleteShadowDuringRecoveryAttemptThatFailsRuntime() throws Exception {
+    public void testDeleteDatasetDuringRecoveryAttemptThatFailsRuntime() throws Exception {
         testStartWhenStartSucceed();
         listener.onStart(Behavior.FAIL_COMPILE);
         WaitForStateSubscriber tempFailSubscriber =
@@ -1155,7 +1194,7 @@
     }
 
     @Test
-    public void testDeleteShadowWhileStarting() throws Exception {
+    public void testDeleteDatasetWhileStarting() throws Exception {
         Assert.assertEquals(ActivityState.STOPPED, listener.getState());
         listener.onStart(Behavior.STEP_SUCCEED);
         Action startAction = users[0].startActivity(listener);
@@ -1174,7 +1213,7 @@
     }
 
     @Test
-    public void testDeleteShadowWhileRunning() throws Exception {
+    public void testDeleteDatasetWhileRunning() throws Exception {
         testStartWhenStartSucceed();
         Action dropDatasetAction = users[1].dropDataset(firstDataset, listener);
         dropDatasetAction.sync();
@@ -1185,19 +1224,19 @@
     }
 
     @Test
-    public void testDeleteShadowWhilePermanentFailure() throws Exception {
+    public void testDeleteDatasetWhilePermanentFailure() throws Exception {
         testRecoveryFailureAfterOneAttemptCompilationFailure();
-        Assert.assertEquals(ActivityState.PERMANENTLY_FAILED, listener.getState());
+        Assert.assertEquals(ActivityState.STOPPED, listener.getState());
         Action dropDatasetAction = users[0].dropDataset(secondDataset, listener);
         dropDatasetAction.sync();
         assertSuccess(dropDatasetAction);
-        Assert.assertEquals(ActivityState.PERMANENTLY_FAILED, listener.getState());
+        Assert.assertEquals(ActivityState.STOPPED, listener.getState());
         Assert.assertEquals(1, listener.getDatasets().size());
         Assert.assertEquals(clusterController.getAllDatasets().size(), listener.getDatasets().size());
     }
 
     @Test
-    public void testDeleteShadowWhileSuspended() throws Exception {
+    public void testDeleteDatasetWhileSuspended() throws Exception {
         testStartWhenStartSucceed();
         // suspend
         Assert.assertEquals(ActivityState.RUNNING, listener.getState());
@@ -1317,7 +1356,7 @@
     @Test
     public void testCreateNewIndexWhilePermanentFailure() throws Exception {
         testRecoveryFailureAfterOneAttemptCompilationFailure();
-        Assert.assertEquals(ActivityState.PERMANENTLY_FAILED, listener.getState());
+        Assert.assertEquals(ActivityState.STOPPED, listener.getState());
         Action add = users[1].addIndex(firstDataset, listener);
         add.sync();
         assertSuccess(add);
@@ -1442,7 +1481,7 @@
     @Test
     public void testDeleteIndexWhilePermanentFailure() throws Exception {
         testRecoveryFailureAfterOneAttemptCompilationFailure();
-        Assert.assertEquals(ActivityState.PERMANENTLY_FAILED, listener.getState());
+        Assert.assertEquals(ActivityState.STOPPED, listener.getState());
         Action drop = users[1].dropIndex(firstDataset, listener);
         drop.sync();
         assertSuccess(drop);
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/DummyFeedEventsListener.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/DummyFeedEventsListener.java
index c269803..47fc46f 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/DummyFeedEventsListener.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/DummyFeedEventsListener.java
@@ -19,7 +19,6 @@
 package org.apache.asterix.test.active;
 
 import java.util.Collections;
-import java.util.EnumSet;
 import java.util.List;
 
 import org.apache.asterix.active.ActivityState;
@@ -35,7 +34,6 @@
 import org.apache.asterix.metadata.entities.FeedConnection;
 import org.apache.asterix.translator.IStatementExecutor;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
@@ -64,14 +62,13 @@
     }
 
     @Override
-    protected Void doStop(MetadataProvider metadataProvider) throws HyracksDataException {
+    protected void doStop(MetadataProvider metadataProvider) throws HyracksDataException {
         IActiveEntityEventSubscriber eventSubscriber =
-                new WaitForStateSubscriber(this, EnumSet.of(ActivityState.RUNNING, ActivityState.PERMANENTLY_FAILED));
+                new WaitForStateSubscriber(this, Collections.singleton(ActivityState.STOPPED));
         try {
             eventSubscriber.sync();
         } catch (Exception e) {
             throw HyracksDataException.create(e);
         }
-        return null;
     }
 }
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestEventsListener.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestEventsListener.java
index 7e1bc37..c771a94 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestEventsListener.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestEventsListener.java
@@ -19,11 +19,10 @@
 package org.apache.asterix.test.active;
 
 import java.util.Collections;
-import java.util.EnumSet;
 import java.util.List;
-import java.util.Set;
 import java.util.concurrent.Semaphore;
 
+import org.apache.asterix.active.ActiveRuntimeId;
 import org.apache.asterix.active.ActivityState;
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.active.IRetryPolicyFactory;
@@ -31,17 +30,19 @@
 import org.apache.asterix.common.api.IMetadataLockManager;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.metadata.LockList;
-import org.apache.asterix.external.feed.watch.WaitForStateSubscriber;
 import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.translator.IStatementExecutor;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobIdFactory;
 import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.util.ExitUtil;
 
 public class TestEventsListener extends ActiveEntityEventsListener {
 
@@ -50,6 +51,9 @@
         RUNNING_JOB_FAIL,
         FAIL_COMPILE,
         FAIL_RUNTIME,
+        FAIL_START_TIMEOUT_OP_SUCCEED,
+        FAIL_START_TIMEOUT_STUCK,
+        FAIL_STOP_TIMEOUT,
         STEP_SUCCEED,
         STEP_FAIL_COMPILE,
         STEP_FAIL_RUNTIME
@@ -103,9 +107,8 @@
         }
     }
 
-    @SuppressWarnings("deprecation")
     @Override
-    protected void doStart(MetadataProvider metadataProvider) throws HyracksDataException {
+    protected JobId compileAndStartJob(MetadataProvider metadataProvider) throws HyracksDataException {
         step(onStart);
         try {
             metadataProvider.getApplicationContext().getMetadataLockManager()
@@ -119,60 +122,72 @@
         try {
             startJob.sync();
         } catch (InterruptedException e) {
-            throw HyracksDataException.create(e);
+            ExitUtil.halt(ExitUtil.EC_ABNORMAL_TERMINATION);
         }
-        WaitForStateSubscriber subscriber = new WaitForStateSubscriber(this,
-                EnumSet.of(ActivityState.RUNNING, ActivityState.TEMPORARILY_FAILED, ActivityState.PERMANENTLY_FAILED));
         if (onStart == Behavior.FAIL_RUNTIME || onStart == Behavior.STEP_FAIL_RUNTIME) {
             clusterController.jobFinish(jobId, JobStatus.FAILURE,
                     Collections.singletonList(new HyracksDataException("RuntimeFailure")));
-        } else {
+        } else if (onStart != Behavior.FAIL_START_TIMEOUT_OP_SUCCEED && onStart != Behavior.FAIL_START_TIMEOUT_STUCK) {
             for (int i = 0; i < nodeControllers.length; i++) {
                 TestNodeControllerActor nodeController = nodeControllers[i];
                 nodeController.registerRuntime(jobId, entityId, i);
             }
         }
-        try {
-            subscriber.sync();
-            if (subscriber.getFailure() != null) {
-                throw subscriber.getFailure();
+        if (onStart == Behavior.FAIL_START_TIMEOUT_OP_SUCCEED) {
+            for (int i = 0; i < nodeControllers.length; i++) {
+                TestNodeControllerActor nodeController = nodeControllers[i];
+                try {
+                    nodeController.registerRuntime(jobId, entityId, i).sync();
+                } catch (InterruptedException e) {
+                    ExitUtil.halt(ExitUtil.EC_ABNORMAL_TERMINATION);
+                }
             }
-        } catch (Exception e) {
-            throw HyracksDataException.create(e);
+            // At this point, the job has started and both nodes reported that they started.
+            // but since we're holding the lock on the listener (this is a synchronized method), the state
+            // didn't change yet
+            while (state != ActivityState.RUNNING) {
+                try {
+                    wait();
+                } catch (InterruptedException e) {
+                    ExitUtil.halt(ExitUtil.EC_ABNORMAL_TERMINATION);
+                }
+            }
+            Thread.currentThread().interrupt();
+        } else if (onStart == Behavior.FAIL_START_TIMEOUT_STUCK) {
+            TestNodeControllerActor nodeController = nodeControllers[0];
+            try {
+                nodeController.registerRuntime(jobId, entityId, 0).sync();
+            } catch (InterruptedException e) {
+                ExitUtil.halt(ExitUtil.EC_ABNORMAL_TERMINATION);
+            }
+            Thread.currentThread().interrupt();
         }
+        return jobId;
     }
 
-    @SuppressWarnings("deprecation")
     @Override
-    protected Void doStop(MetadataProvider metadataProvider) throws HyracksDataException {
-        ActivityState intention = state;
+    protected void cancelJobSafely(MetadataProvider metadataProvider, Throwable th) {
+        clusterController.jobFinish(jobId, JobStatus.FAILURE,
+                Collections.singletonList(HyracksException.create(ErrorCode.JOB_CANCELED, jobId)));
+    }
+
+    @Override
+    protected void sendStopMessages(MetadataProvider metadataProvider) throws Exception {
         step(onStop);
         failCompile(onStop);
-        try {
-            Set<ActivityState> waitFor;
-            if (intention == ActivityState.STOPPING) {
-                waitFor = EnumSet.of(ActivityState.STOPPED, ActivityState.PERMANENTLY_FAILED);
-            } else if (intention == ActivityState.SUSPENDING) {
-                waitFor = EnumSet.of(ActivityState.SUSPENDED, ActivityState.TEMPORARILY_FAILED);
-            } else {
-                throw new IllegalStateException("stop with what intention??");
+        if (onStop == Behavior.RUNNING_JOB_FAIL) {
+            clusterController.jobFinish(jobId, JobStatus.FAILURE,
+                    Collections.singletonList(new HyracksDataException("RuntimeFailure")));
+        } else if (onStop == Behavior.FAIL_STOP_TIMEOUT) {
+            // Nothing happens.
+            Thread.currentThread().interrupt();
+        } else {
+            for (int i = 0; i < nodeControllers.length; i++) {
+                TestNodeControllerActor nodeController = nodeControllers[0];
+                nodeController.deRegisterRuntime(jobId, entityId, i).sync();
             }
-            WaitForStateSubscriber subscriber = new WaitForStateSubscriber(this, waitFor);
-            if (onStop == Behavior.RUNNING_JOB_FAIL) {
-                clusterController.jobFinish(jobId, JobStatus.FAILURE,
-                        Collections.singletonList(new HyracksDataException("RuntimeFailure")));
-            } else {
-                for (int i = 0; i < nodeControllers.length; i++) {
-                    TestNodeControllerActor nodeController = nodeControllers[0];
-                    nodeController.deRegisterRuntime(jobId, entityId, i).sync();
-                }
-                clusterController.jobFinish(jobId, JobStatus.TERMINATED, Collections.emptyList());
-            }
-            subscriber.sync();
-        } catch (Exception e) {
-            throw HyracksDataException.create(e);
+            clusterController.jobFinish(jobId, JobStatus.TERMINATED, Collections.emptyList());
         }
-        return null;
     }
 
     public void onStart(Behavior behavior) {
@@ -184,25 +199,31 @@
     }
 
     @Override
-    protected void setRunning(MetadataProvider metadataProvider, boolean running) throws HyracksDataException {
+    protected void setRunning(MetadataProvider metadataProvider, boolean running) {
         try {
             IMetadataLockManager lockManager = metadataProvider.getApplicationContext().getMetadataLockManager();
             LockList locks = metadataProvider.getLocks();
             lockManager.acquireDataverseReadLock(locks, entityId.getDataverse());
             lockManager.acquireActiveEntityWriteLock(locks, entityId.getDataverse() + '.' + entityId.getEntityName());
             // persist entity
-        } catch (Exception e) {
-            throw HyracksDataException.create(e);
+        } catch (Throwable th) {
+            // This failure puts the system in a bad state.
+            throw new IllegalStateException(th);
         }
     }
 
     @Override
-    protected Void doSuspend(MetadataProvider metadataProvider) throws HyracksDataException {
-        return doStop(metadataProvider);
+    protected void doSuspend(MetadataProvider metadataProvider) throws HyracksDataException {
+        doStop(metadataProvider);
     }
 
     @Override
     protected void doResume(MetadataProvider metadataProvider) throws HyracksDataException {
         doStart(metadataProvider);
     }
+
+    @Override
+    protected ActiveRuntimeId getActiveRuntimeId(int partition) {
+        throw new UnsupportedOperationException();
+    }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/AbstractSubscriber.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/AbstractSubscriber.java
index 880c4b4..37b157e 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/AbstractSubscriber.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/AbstractSubscriber.java
@@ -20,6 +20,7 @@
 
 import org.apache.asterix.active.IActiveEntityEventSubscriber;
 import org.apache.asterix.active.IActiveEntityEventsListener;
+import org.apache.hyracks.util.Span;
 
 public abstract class AbstractSubscriber implements IActiveEntityEventSubscriber {
 
@@ -55,6 +56,18 @@
         }
     }
 
+    public boolean sync(Span span) throws InterruptedException {
+        synchronized (listener) {
+            while (!done) {
+                span.wait(listener);
+                if (done || span.elapsed()) {
+                    return done;
+                }
+            }
+            return done;
+        }
+    }
+
     public Exception getFailure() {
         return failure;
     }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NoOpSubscriber.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NoOpSubscriber.java
index 8230b48..627e563 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NoOpSubscriber.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NoOpSubscriber.java
@@ -21,7 +21,6 @@
 import org.apache.asterix.active.ActiveEvent;
 import org.apache.asterix.active.IActiveEntityEventSubscriber;
 import org.apache.asterix.active.IActiveEntityEventsListener;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 /**
  * An event subscriber that does not listen to any events
@@ -49,7 +48,7 @@
     }
 
     @Override
-    public void subscribed(IActiveEntityEventsListener eventsListener) throws HyracksDataException {
+    public void subscribed(IActiveEntityEventsListener eventsListener) {
         // no op
     }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/StatsSubscriber.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/StatsSubscriber.java
index a571904..31459d1 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/StatsSubscriber.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/StatsSubscriber.java
@@ -30,24 +30,16 @@
     }
 
     @Override
-    public void notify(ActiveEvent event) throws HyracksDataException {
+    public void notify(ActiveEvent event) {
         if (event.getEventKind() == ActiveEvent.Kind.STATS_UPDATED) {
-            try {
-                complete(null);
-            } catch (Exception e) {
-                throw HyracksDataException.create(e);
-            }
+            complete(null);
         } else if (event.getEventKind() == ActiveEvent.Kind.FAILURE) {
-            try {
-                complete((Exception) event.getEventObject());
-            } catch (Exception e) {
-                throw HyracksDataException.create(e);
-            }
+            complete((Exception) event.getEventObject());
         }
     }
 
     @Override
-    public void subscribed(IActiveEntityEventsListener eventsListener) throws HyracksDataException {
+    public void subscribed(IActiveEntityEventsListener eventsListener) {
         //Does nothing upon subscription
     }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/WaitForStateSubscriber.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/WaitForStateSubscriber.java
index a1cdfb0..818d826 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/WaitForStateSubscriber.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/WaitForStateSubscriber.java
@@ -23,39 +23,28 @@
 import org.apache.asterix.active.ActiveEvent;
 import org.apache.asterix.active.ActivityState;
 import org.apache.asterix.active.IActiveEntityEventsListener;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public class WaitForStateSubscriber extends AbstractSubscriber {
 
     private final Set<ActivityState> targetStates;
 
-    public WaitForStateSubscriber(IActiveEntityEventsListener listener, Set<ActivityState> targetStates)
-            throws HyracksDataException {
+    public WaitForStateSubscriber(IActiveEntityEventsListener listener, Set<ActivityState> targetStates) {
         super(listener);
         this.targetStates = targetStates;
         listener.subscribe(this);
     }
 
     @Override
-    public void notify(ActiveEvent event) throws HyracksDataException {
+    public void notify(ActiveEvent event) {
         if (targetStates.contains(listener.getState())) {
-            if (listener.getState() == ActivityState.PERMANENTLY_FAILED
-                    || listener.getState() == ActivityState.TEMPORARILY_FAILED) {
-                complete(listener.getJobFailure());
-            } else {
-                complete(null);
-            }
+            complete(listener.getJobFailure());
         } else if (event != null && event.getEventKind() == ActiveEvent.Kind.FAILURE) {
-            try {
-                complete((Exception) event.getEventObject());
-            } catch (Exception e) {
-                throw HyracksDataException.create(e);
-            }
+            complete((Exception) event.getEventObject());
         }
     }
 
     @Override
-    public void subscribed(IActiveEntityEventsListener eventsListener) throws HyracksDataException {
+    public void subscribed(IActiveEntityEventsListener eventsListener) {
         if (targetStates.contains(listener.getState())) {
             complete(null);
         }
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
index 5492919..79d3fac 100644
--- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
@@ -39,6 +39,7 @@
     public static final int EC_FAILED_TO_ABORT_METADATA_TXN = 7;
     public static final int EC_INCONSISTENT_METADATA = 8;
     public static final int EC_UNHANDLED_EXCEPTION = 11;
+    public static final int EC_FAILED_TO_CANCEL_ACTIVE_START_STOP = 22;
     public static final int EC_IMMEDIATE_HALT = 33;
     public static final int EC_HALT_ABNORMAL_RESERVED_44 = 44;
     public static final int EC_IO_SCHEDULER_FAILED = 55;