[NO ISSUE][ING] Stop Succeeds When Feed Fails
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Before this change, if a feed fails after stop is issued, it
goes to a failure state and the stop request will be stuck
waiting for the feed to go to a stopped state.
- With this change, a failure in the feed will cause it to go
to a stopped state if a stop request was issued.
- Test case is added
Change-Id: I486e11a339c1dfadc6a2a357c95a83dfc05772a1
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2258
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
index 8cbc109..12bcc0d2 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
@@ -193,9 +193,9 @@
if (jobStatus.equals(JobStatus.FAILURE)) {
jobFailure = exceptions.isEmpty() ? new RuntimeDataException(ErrorCode.UNREPORTED_TASK_FAILURE_EXCEPTION)
: exceptions.get(0);
- setState(ActivityState.TEMPORARILY_FAILED);
+ setState((state == ActivityState.STOPPING) ? ActivityState.STOPPED : ActivityState.TEMPORARILY_FAILED);
if (prevState != ActivityState.SUSPENDING && prevState != ActivityState.RECOVERING
- && prevState != ActivityState.RESUMING) {
+ && prevState != ActivityState.RESUMING && prevState != ActivityState.STOPPING) {
recover();
}
} else {
@@ -551,4 +551,10 @@
public String getDisplayName() throws HyracksDataException {
return this.getEntityId().toString();
}
+
+ @Override
+ public String toString() {
+ return "{\"class\":\"" + getClass().getSimpleName() + "\"" + "\"entityId\":\"" + entityId + "\""
+ + "\"state\":\"" + state + "\"" + "}";
+ }
}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java
index 40d4f6a..093d150 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java
@@ -346,6 +346,27 @@
Assert.assertEquals(ActivityState.RUNNING, listener.getState());
}
+ @Test
+ public void testStopFromRunningAndJobFails() throws Exception {
+ testStartWhenStartSucceed();
+ Assert.assertEquals(ActivityState.RUNNING, listener.getState());
+ listener.onStop(Behavior.STEP_SUCCEED);
+ Action stopping = users[1].stopActivity(listener);
+ // wait for notification from listener
+ synchronized (listener) {
+ listener.wait();
+ }
+ clusterController.jobFinish(listener.getJobId(), JobStatus.FAILURE,
+ Collections.singletonList(new HyracksDataException("Runtime Failure")));
+ Assert.assertNull(listener.getRecoveryTask());
+ listener.allowStep();
+ stopping.sync();
+ Assert.assertFalse(stopping.hasFailed());
+ Assert.assertEquals(ActivityState.STOPPED, listener.getState());
+ Assert.assertNull(listener.getRecoveryTask());
+
+ }
+
@SuppressWarnings("deprecation")
@Test
public void testRecovery() throws Exception {
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Actor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Actor.java
index c50a4a2..fcd18d0 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Actor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Actor.java
@@ -34,4 +34,9 @@
protected void handle(Action action) throws Exception {
action.execute(actorMdProvider);
}
+
+ @Override
+ public String toString() {
+ return "{\"name\":\"" + name + "\"}";
+ }
}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestEventsListener.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestEventsListener.java
index 1cedc96..7e1bc37 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestEventsListener.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestEventsListener.java
@@ -145,13 +145,14 @@
@SuppressWarnings("deprecation")
@Override
protected Void doStop(MetadataProvider metadataProvider) throws HyracksDataException {
+ ActivityState intention = state;
step(onStop);
failCompile(onStop);
try {
Set<ActivityState> waitFor;
- if (state == ActivityState.STOPPING) {
+ if (intention == ActivityState.STOPPING) {
waitFor = EnumSet.of(ActivityState.STOPPED, ActivityState.PERMANENTLY_FAILED);
- } else if (state == ActivityState.SUSPENDING) {
+ } else if (intention == ActivityState.SUSPENDING) {
waitFor = EnumSet.of(ActivityState.SUSPENDED, ActivityState.TEMPORARILY_FAILED);
} else {
throw new IllegalStateException("stop with what intention??");
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/SingleThreadEventProcessor.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/SingleThreadEventProcessor.java
index 9eb3b8e..ccf0163 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/SingleThreadEventProcessor.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/SingleThreadEventProcessor.java
@@ -29,7 +29,7 @@
public abstract class SingleThreadEventProcessor<T> implements Runnable {
private static final Logger LOGGER = LogManager.getLogger();
- private final String name;
+ protected final String name;
private final LinkedBlockingQueue<T> eventInbox;
private volatile Thread executorThread;
private volatile boolean stopped = false;