[NO ISSUE][ING] Cleanup active listener
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Merge STOPPED and PERMANENT_FAILURE into a single
state since they are equivalent.
- Allow timeout of start and stop of active entities.
- Add test cases for timeout scenarios.
- Refactor common code into the abstract listener class.
Change-Id: I7f3b14aec46728fbe8b256b915d0e30992b2fe47
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2618
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActivityState.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActivityState.java
index eb43d10..1f3daac 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActivityState.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActivityState.java
@@ -24,11 +24,6 @@
*/
STOPPED,
/**
- * Failure to recover from a temporary faliure caused the activity to fail permanantly.
- * No further recovery attempts will be made.
- */
- PERMANENTLY_FAILED,
- /**
* An unexpected failure caused the activity to fail but recovery attempts will start taking place
*/
TEMPORARILY_FAILED,
@@ -41,6 +36,10 @@
*/
STARTING,
/**
+ * The activity was started but is being cancelled. Waiting for the job cancellation to complete
+ */
+ CANCELLING,
+ /**
* The activity has been started successfully and is running
*/
RUNNING,
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventSubscriber.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventSubscriber.java
index 142e84b..e01d0a7 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventSubscriber.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventSubscriber.java
@@ -31,7 +31,7 @@
* @param event
* @throws HyracksDataException
*/
- void notify(ActiveEvent event) throws HyracksDataException;
+ void notify(ActiveEvent event);
/**
* Checkcs whether the subscriber is done receiving events
@@ -53,5 +53,5 @@
* @param eventsListener
* @throws HyracksDataException
*/
- void subscribed(IActiveEntityEventsListener eventsListener) throws HyracksDataException;
+ void subscribed(IActiveEntityEventsListener eventsListener);
}
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
index 37120e4..2d36bb2 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
@@ -56,9 +56,8 @@
* subscribe to events. subscription ends when subscriber.done() returns true
*
* @param subscriber
- * @throws HyracksDataException
*/
- void subscribe(IActiveEntityEventSubscriber subscriber) throws HyracksDataException;
+ void subscribe(IActiveEntityEventSubscriber subscriber);
/**
* The most recent acquired stats for the active entity
@@ -99,6 +98,7 @@
/**
* Get the stats name that's used to form the stats JSON for the active entity
+ *
* @return the customized stats name for current active entity
*/
String getDisplayName() throws HyracksDataException;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
index 2a214e3..66d3e81 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
@@ -23,8 +23,10 @@
import java.util.EnumSet;
import java.util.Iterator;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import org.apache.asterix.active.ActiveEvent;
import org.apache.asterix.active.ActiveEvent.Kind;
@@ -34,6 +36,7 @@
import org.apache.asterix.active.IActiveEntityEventSubscriber;
import org.apache.asterix.active.IRetryPolicyFactory;
import org.apache.asterix.active.NoRetryPolicyFactory;
+import org.apache.asterix.active.message.ActiveManagerMessage;
import org.apache.asterix.active.message.ActivePartitionMessage;
import org.apache.asterix.active.message.ActivePartitionMessage.Event;
import org.apache.asterix.active.message.ActiveStatsRequestMessage;
@@ -55,6 +58,9 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.util.InvokeUtil;
+import org.apache.hyracks.util.ExitUtil;
+import org.apache.hyracks.util.Span;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -65,7 +71,7 @@
private static final Level level = Level.INFO;
private static final ActiveEvent STATE_CHANGED = new ActiveEvent(null, Kind.STATE_CHANGED, null, null);
private static final EnumSet<ActivityState> TRANSITION_STATES = EnumSet.of(ActivityState.RESUMING,
- ActivityState.STARTING, ActivityState.STOPPING, ActivityState.RECOVERING);
+ ActivityState.STARTING, ActivityState.STOPPING, ActivityState.RECOVERING, ActivityState.CANCELLING);
private static final String DEFAULT_ACTIVE_STATS = "{\"Stats\":\"N/A\"}";
// finals
protected final IClusterStateManager clusterStateManager;
@@ -129,7 +135,10 @@
LOGGER.log(level, "State of " + getEntityId() + "is being set to " + newState + " from " + state);
this.prevState = state;
this.state = newState;
- if (newState == ActivityState.SUSPENDED) {
+ if (newState == ActivityState.STARTING || newState == ActivityState.RECOVERING
+ || newState == ActivityState.RESUMING) {
+ jobFailure = null;
+ } else if (newState == ActivityState.SUSPENDED) {
suspended = true;
}
notifySubscribers(STATE_CHANGED);
@@ -142,10 +151,7 @@
ActiveEvent.Kind eventKind = event.getEventKind();
switch (eventKind) {
case JOB_CREATED:
- jobCreated(event);
- break;
case JOB_STARTED:
- start(event);
break;
case JOB_FINISHED:
finish(event);
@@ -163,14 +169,10 @@
}
}
- protected void jobCreated(ActiveEvent event) {
- // Do nothing
- }
-
protected synchronized void handle(ActivePartitionMessage message) {
if (message.getEvent() == Event.RUNTIME_REGISTERED) {
numRegistered++;
- if (numRegistered == locations.getLocations().length) {
+ if (allPartitionsRegisteredAndNotCancelling()) {
setState(ActivityState.RUNNING);
}
} else if (message.getEvent() == Event.RUNTIME_DEREGISTERED) {
@@ -178,6 +180,10 @@
}
}
+ private boolean allPartitionsRegisteredAndNotCancelling() {
+ return numRegistered == locations.getLocations().length && state != ActivityState.CANCELLING;
+ }
+
@SuppressWarnings("unchecked")
protected void finish(ActiveEvent event) throws HyracksDataException {
LOGGER.log(level, "the job " + jobId + " finished");
@@ -191,10 +197,11 @@
JobStatus jobStatus = status.getLeft();
List<Exception> exceptions = status.getRight();
LOGGER.log(level, "The job finished with status: " + jobStatus);
- if (jobStatus.equals(JobStatus.FAILURE)) {
+ if (!jobSuccessfullyTerminated(jobStatus)) {
jobFailure = exceptions.isEmpty() ? new RuntimeDataException(ErrorCode.UNREPORTED_TASK_FAILURE_EXCEPTION)
: exceptions.get(0);
- setState((state == ActivityState.STOPPING) ? ActivityState.STOPPED : ActivityState.TEMPORARILY_FAILED);
+ setState((state == ActivityState.STOPPING || state == ActivityState.CANCELLING) ? ActivityState.STOPPED
+ : ActivityState.TEMPORARILY_FAILED);
if (prevState == ActivityState.RUNNING) {
recover();
}
@@ -203,14 +210,12 @@
}
}
- protected void start(ActiveEvent event) {
- jobId = event.getJobId();
- numRegistered = 0;
- numDeRegistered = 0;
+ private boolean jobSuccessfullyTerminated(JobStatus jobStatus) {
+ return jobStatus.equals(JobStatus.TERMINATED);
}
@Override
- public synchronized void subscribe(IActiveEntityEventSubscriber subscriber) throws HyracksDataException {
+ public synchronized void subscribe(IActiveEntityEventSubscriber subscriber) {
subscriber.subscribed(this);
if (!subscriber.isDone()) {
subscribers.add(subscriber);
@@ -315,11 +320,7 @@
if (subscriber.isDone()) {
it.remove();
} else {
- try {
- subscriber.notify(event);
- } catch (HyracksDataException e) {
- LOGGER.log(Level.WARN, "Failed to notify subscriber", e);
- }
+ subscriber.notify(event);
if (subscriber.isDone()) {
it.remove();
}
@@ -342,22 +343,12 @@
}
}
- /**
- * this method is called before an action call is returned. It ensures that the request didn't fail
- *
- */
- protected synchronized void checkNoFailure() throws HyracksDataException {
- if (state == ActivityState.PERMANENTLY_FAILED) {
- throw HyracksDataException.create(jobFailure);
- }
- }
-
@Override
public synchronized void recover() {
LOGGER.log(level, "Recover is called on " + entityId);
if (retryPolicyFactory == NoRetryPolicyFactory.INSTANCE) {
LOGGER.log(level, "But it has no recovery policy, so it is set to permanent failure");
- setState(ActivityState.PERMANENTLY_FAILED);
+ setState(ActivityState.STOPPED);
} else {
ExecutorService executor = appCtx.getServiceContext().getControllerService().getExecutor();
setState(ActivityState.TEMPORARILY_FAILED);
@@ -371,7 +362,7 @@
public synchronized void start(MetadataProvider metadataProvider)
throws HyracksDataException, InterruptedException {
waitForNonTransitionState();
- if (state != ActivityState.PERMANENTLY_FAILED && state != ActivityState.STOPPED) {
+ if (state != ActivityState.STOPPED) {
throw new RuntimeDataException(ErrorCode.ACTIVE_ENTITY_ALREADY_STARTED, entityId, state);
}
try {
@@ -379,30 +370,131 @@
doStart(metadataProvider);
setRunning(metadataProvider, true);
} catch (Exception e) {
- setState(ActivityState.PERMANENTLY_FAILED);
+ setState(ActivityState.STOPPED);
LOGGER.log(Level.ERROR, "Failed to start the entity " + entityId, e);
throw HyracksDataException.create(e);
}
}
- protected abstract void doStart(MetadataProvider metadataProvider) throws HyracksDataException;
+ @SuppressWarnings("squid:S1181")
+ protected synchronized void doStart(MetadataProvider metadataProvider) throws HyracksDataException {
+ WaitForStateSubscriber subscriber = new WaitForStateSubscriber(this,
+ EnumSet.of(ActivityState.RUNNING, ActivityState.TEMPORARILY_FAILED, ActivityState.STOPPED));
+ jobId = compileAndStartJob(metadataProvider);
+ numRegistered = 0;
+ numDeRegistered = 0;
+ try {
+ subscriber.sync();
+ if (subscriber.getFailure() != null) {
+ throw subscriber.getFailure();
+ }
+ } catch (InterruptedException ie) {
+ // interrupted.. check if the subscriber is done
+ if (subscriber.isDone()) {
+ if (subscriber.getFailure() != null) {
+ throw HyracksDataException.create(subscriber.getFailure());
+ }
+ Thread.currentThread().interrupt();
+ } else {
+ // Subscriber is not done yet. so, we need to cancel, we have the jobId
+ setState(ActivityState.CANCELLING);
+ cancelJob(ie);
+ throw HyracksDataException.create(ie);
+ }
+ } catch (Throwable e) {
+ throw HyracksDataException.create(e);
+ }
+ }
- protected abstract Void doStop(MetadataProvider metadataProvider) throws HyracksDataException;
+ private void cancelJob(Throwable th) {
+ cancelJobSafely(metadataProvider, th);
+ final WaitForStateSubscriber cancelSubscriber =
+ new WaitForStateSubscriber(this, EnumSet.of(ActivityState.STOPPED));
+ final Span span = Span.start(2, TimeUnit.MINUTES);
+ InvokeUtil.doUninterruptibly(() -> {
+ if (!cancelSubscriber.sync(span)) {
+ ExitUtil.halt(ExitUtil.EC_FAILED_TO_CANCEL_ACTIVE_START_STOP);
+ }
+ });
+ }
- protected abstract Void doSuspend(MetadataProvider metadataProvider) throws HyracksDataException;
+ @SuppressWarnings("squid:S1181")
+ protected void cancelJobSafely(MetadataProvider metadataProvider, Throwable e) {
+ try {
+ metadataProvider.getApplicationContext().getHcc().cancelJob(jobId);
+ } catch (Throwable th) {
+ LOGGER.warn("Failed to cancel active job", th);
+ e.addSuppressed(th);
+ }
+ }
+
+ protected abstract JobId compileAndStartJob(MetadataProvider metadataProvider) throws HyracksDataException;
+
+ @SuppressWarnings("squid:S1181")
+ protected synchronized void doStop(MetadataProvider metadataProvider) throws HyracksDataException {
+ ActivityState intention = state;
+ Set<ActivityState> waitFor;
+ if (intention == ActivityState.STOPPING) {
+ waitFor = EnumSet.of(ActivityState.STOPPED);
+ } else if (intention == ActivityState.SUSPENDING) {
+ waitFor = EnumSet.of(ActivityState.SUSPENDED, ActivityState.TEMPORARILY_FAILED);
+ } else {
+ throw new IllegalStateException("stop with what intention?? Current state is " + intention);
+ }
+ WaitForStateSubscriber subscriber = new WaitForStateSubscriber(this, waitFor);
+ // Note: once we start sending stop messages, we can't go back until the entity is stopped
+ try {
+ sendStopMessages(metadataProvider);
+ LOGGER.log(Level.DEBUG, "Waiting for its state to become " + waitFor);
+ subscriber.sync();
+ LOGGER.log(Level.DEBUG, "Disconnect has been completed " + waitFor);
+ } catch (InterruptedException ie) {
+ forceStop(subscriber, ie);
+ Thread.currentThread().interrupt();
+ } catch (Throwable e) {
+ forceStop(subscriber, e);
+ }
+ }
+
+ private void forceStop(WaitForStateSubscriber subscriber, Throwable e) {
+ if (!subscriber.isDone()) {
+ cancelJob(e);
+ }
+ // Stop should not through an exception if the entity was stopped..
+ // Simply log
+ LOGGER.warn("Failure encountered while stopping {}", this, e);
+ }
+
+ protected void sendStopMessages(MetadataProvider metadataProvider) throws Exception {
+ ICcApplicationContext applicationCtx = metadataProvider.getApplicationContext();
+ ICCMessageBroker messageBroker = (ICCMessageBroker) applicationCtx.getServiceContext().getMessageBroker();
+ AlgebricksAbsolutePartitionConstraint runtimeLocations = getLocations();
+ int partition = 0;
+ LOGGER.log(Level.INFO, "Sending stop messages to " + runtimeLocations);
+ for (String location : runtimeLocations.getLocations()) {
+ LOGGER.log(Level.INFO, "Sending to " + location);
+ messageBroker.sendApplicationMessageToNC(
+ new ActiveManagerMessage(ActiveManagerMessage.Kind.STOP_ACTIVITY, getActiveRuntimeId(partition++)),
+ location);
+ }
+ }
+
+ protected abstract ActiveRuntimeId getActiveRuntimeId(int partition);
+
+ protected abstract void doSuspend(MetadataProvider metadataProvider) throws HyracksDataException;
protected abstract void doResume(MetadataProvider metadataProvider) throws HyracksDataException;
- protected abstract void setRunning(MetadataProvider metadataProvider, boolean running) throws HyracksDataException;
+ protected abstract void setRunning(MetadataProvider metadataProvider, boolean running);
@Override
- public synchronized void stop(MetadataProvider metadataProvider) throws HyracksDataException, InterruptedException {
+ public final synchronized void stop(MetadataProvider metadataProvider)
+ throws HyracksDataException, InterruptedException {
waitForNonTransitionState();
- if (state != ActivityState.RUNNING && state != ActivityState.PERMANENTLY_FAILED
- && state != ActivityState.TEMPORARILY_FAILED) {
+ if (state != ActivityState.RUNNING && state != ActivityState.TEMPORARILY_FAILED) {
throw new RuntimeDataException(ErrorCode.ACTIVE_ENTITY_CANNOT_BE_STOPPED, entityId, state);
}
- if (state == ActivityState.TEMPORARILY_FAILED || state == ActivityState.PERMANENTLY_FAILED) {
+ if (state == ActivityState.TEMPORARILY_FAILED) {
if (rt != null) {
setState(ActivityState.STOPPING);
rt.cancel();
@@ -419,11 +511,12 @@
setState(ActivityState.STOPPING);
try {
doStop(metadataProvider);
- setRunning(metadataProvider, false);
} catch (Exception e) {
- setState(ActivityState.PERMANENTLY_FAILED);
+ setState(ActivityState.STOPPED);
LOGGER.log(Level.ERROR, "Failed to stop the entity " + entityId, e);
throw HyracksDataException.create(e);
+ } finally {
+ setRunning(metadataProvider, false);
}
} else {
throw new RuntimeDataException(ErrorCode.ACTIVE_ENTITY_CANNOT_BE_STOPPED, entityId, state);
@@ -445,7 +538,7 @@
LOGGER.log(level, "Waiting for ongoing activities");
waitForNonTransitionState();
LOGGER.log(level, "Proceeding with suspension. Current state is " + state);
- if (state == ActivityState.STOPPED || state == ActivityState.PERMANENTLY_FAILED) {
+ if (state == ActivityState.STOPPED) {
suspended = true;
return;
}
@@ -461,7 +554,10 @@
subscriber = new WaitForStateSubscriber(this,
EnumSet.of(ActivityState.SUSPENDED, ActivityState.TEMPORARILY_FAILED));
suspendTask = metadataProvider.getApplicationContext().getServiceContext().getControllerService()
- .getExecutor().submit(() -> doSuspend(metadataProvider));
+ .getExecutor().submit(() -> {
+ doSuspend(metadataProvider);
+ return null;
+ });
LOGGER.log(level, "Suspension task has been submitted");
}
try {
@@ -479,7 +575,7 @@
// restore state
setState(prevState);
} else {
- setState(ActivityState.PERMANENTLY_FAILED);
+ setState(ActivityState.STOPPED);
}
}
throw HyracksDataException.create(e);
@@ -489,7 +585,7 @@
@Override
public synchronized void resume(MetadataProvider metadataProvider) throws HyracksDataException {
- if (state == ActivityState.STOPPED || state == ActivityState.PERMANENTLY_FAILED) {
+ if (state == ActivityState.STOPPED) {
suspended = false;
notifyAll();
return;
@@ -517,7 +613,7 @@
@Override
public boolean isActive() {
- return state != ActivityState.STOPPED && state != ActivityState.PERMANENTLY_FAILED;
+ return state != ActivityState.STOPPED && state != ActivityState.CANCELLING;
}
@Override
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
index fbf644f..99ed42d 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
@@ -21,21 +21,17 @@
import java.util.EnumSet;
import java.util.List;
+import org.apache.asterix.active.ActiveRuntimeId;
import org.apache.asterix.active.ActivityState;
import org.apache.asterix.active.EntityId;
import org.apache.asterix.active.IActiveEntityEventSubscriber;
import org.apache.asterix.active.IRetryPolicyFactory;
-import org.apache.asterix.app.translator.DefaultStatementExecutorFactory;
-import org.apache.asterix.app.translator.QueryTranslator;
-import org.apache.asterix.common.context.IStorageComponentProvider;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.common.utils.JobUtils;
-import org.apache.asterix.compiler.provider.AqlCompilationProvider;
-import org.apache.asterix.compiler.provider.ILangCompilationProvider;
import org.apache.asterix.external.feed.watch.WaitForStateSubscriber;
-import org.apache.asterix.file.StorageComponentProvider;
+import org.apache.asterix.external.operators.FeedIntakeOperatorNodePushable;
import org.apache.asterix.lang.common.statement.StartFeedStatement;
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.metadata.entities.Dataset;
@@ -45,9 +41,9 @@
import org.apache.asterix.utils.FeedOperations;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
public class FeedEventsListener extends ActiveEntityEventsListener {
@@ -80,58 +76,43 @@
}
@Override
- protected void doStart(MetadataProvider mdProvider) throws HyracksDataException {
+ public synchronized void start(MetadataProvider metadataProvider)
+ throws HyracksDataException, InterruptedException {
+ super.start(metadataProvider);
+ // Note: The current implementation of the wait for completion flag is problematic due to locking issues:
+ // Locks obtained during the start of the feed are not released, and so, the feed can't be stopped
+ // and also, read locks over dataverses, datasets, etc, are never released.
+ boolean wait = Boolean.parseBoolean(metadataProvider.getConfig().get(StartFeedStatement.WAIT_FOR_COMPLETION));
+ if (wait) {
+ IActiveEntityEventSubscriber stoppedSubscriber =
+ new WaitForStateSubscriber(this, EnumSet.of(ActivityState.STOPPED));
+ stoppedSubscriber.sync();
+ }
+ }
+
+ @Override
+ protected JobId compileAndStartJob(MetadataProvider mdProvider) throws HyracksDataException {
try {
Pair<JobSpecification, AlgebricksAbsolutePartitionConstraint> jobInfo =
FeedOperations.buildStartFeedJob(mdProvider, feed, feedConnections, statementExecutor, hcc);
JobSpecification feedJob = jobInfo.getLeft();
- WaitForStateSubscriber eventSubscriber = new WaitForStateSubscriber(this, EnumSet.of(ActivityState.RUNNING,
- ActivityState.TEMPORARILY_FAILED, ActivityState.PERMANENTLY_FAILED));
feedJob.setProperty(ActiveNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME, entityId);
// TODO(Yingyi): currently we do not check IFrameWriter protocol violations for Feed jobs.
// We will need to design general exception handling mechanism for feeds.
setLocations(jobInfo.getRight());
- boolean wait = Boolean.parseBoolean(mdProvider.getConfig().get(StartFeedStatement.WAIT_FOR_COMPLETION));
- JobUtils.runJob(hcc, feedJob, false);
- eventSubscriber.sync();
- if (eventSubscriber.getFailure() != null) {
- throw eventSubscriber.getFailure();
- }
- if (wait) {
- IActiveEntityEventSubscriber stoppedSubscriber = new WaitForStateSubscriber(this,
- EnumSet.of(ActivityState.STOPPED, ActivityState.PERMANENTLY_FAILED));
- stoppedSubscriber.sync();
- }
+ return JobUtils.runJob(hcc, feedJob, false);
} catch (Exception e) {
throw HyracksDataException.create(e);
}
}
@Override
- protected Void doStop(MetadataProvider metadataProvider) throws HyracksDataException {
- IActiveEntityEventSubscriber eventSubscriber =
- new WaitForStateSubscriber(this, EnumSet.of(ActivityState.STOPPED, ActivityState.PERMANENTLY_FAILED));
- try {
- // Construct ActiveMessage
- for (int i = 0; i < getLocations().getLocations().length; i++) {
- String intakeLocation = getLocations().getLocations()[i];
- FeedOperations.SendStopMessageToNode(metadataProvider.getApplicationContext(), entityId, intakeLocation,
- i);
- }
- eventSubscriber.sync();
- } catch (Exception e) {
- throw HyracksDataException.create(e);
- }
- return null;
- }
-
- @Override
- protected void setRunning(MetadataProvider metadataProvider, boolean running) throws HyracksDataException {
+ protected void setRunning(MetadataProvider metadataProvider, boolean running) {
// No op
}
@Override
- protected Void doSuspend(MetadataProvider metadataProvider) throws HyracksDataException {
+ protected void doSuspend(MetadataProvider metadataProvider) throws HyracksDataException {
throw new RuntimeDataException(ErrorCode.OPERATION_NOT_SUPPORTED);
}
@@ -139,4 +120,9 @@
protected void doResume(MetadataProvider metadataProvider) throws HyracksDataException {
throw new RuntimeDataException(ErrorCode.OPERATION_NOT_SUPPORTED);
}
+
+ @Override
+ protected ActiveRuntimeId getActiveRuntimeId(int partition) {
+ return new ActiveRuntimeId(entityId, FeedIntakeOperatorNodePushable.class.getSimpleName(), partition);
+ }
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java
index 3024dc6..9531b63 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java
@@ -92,7 +92,7 @@
if (retryPolicyFactory == NoRetryPolicyFactory.INSTANCE) {
synchronized (listener) {
if (!cancelRecovery) {
- listener.setState(ActivityState.PERMANENTLY_FAILED);
+ listener.setState(ActivityState.STOPPED);
listener.setRunning(metadataProvider, false);
}
}
@@ -172,7 +172,7 @@
}
if (listener.getState() == ActivityState.TEMPORARILY_FAILED) {
LOGGER.warn("Recovery for {} permanently failed", listener.getEntityId());
- listener.setState(ActivityState.PERMANENTLY_FAILED);
+ listener.setState(ActivityState.STOPPED);
listener.setRunning(metadataProvider, false);
}
listener.notifyAll();
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
index dc057cb..593d7ce 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
@@ -29,10 +29,6 @@
import java.util.Set;
import java.util.TreeSet;
-import org.apache.asterix.active.ActiveRuntimeId;
-import org.apache.asterix.active.EntityId;
-import org.apache.asterix.active.message.ActiveManagerMessage;
-import org.apache.asterix.active.message.ActiveManagerMessage.Kind;
import org.apache.asterix.app.translator.DefaultStatementExecutorFactory;
import org.apache.asterix.app.translator.QueryTranslator;
import org.apache.asterix.common.cluster.IClusterStateManager;
@@ -42,7 +38,6 @@
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.functions.FunctionSignature;
-import org.apache.asterix.common.messaging.api.ICCMessageBroker;
import org.apache.asterix.common.transactions.TxnId;
import org.apache.asterix.common.utils.StoragePathUtil;
import org.apache.asterix.compiler.provider.SqlppCompilationProvider;
@@ -52,7 +47,6 @@
import org.apache.asterix.external.feed.watch.FeedActivityDetails;
import org.apache.asterix.external.operators.FeedCollectOperatorDescriptor;
import org.apache.asterix.external.operators.FeedIntakeOperatorDescriptor;
-import org.apache.asterix.external.operators.FeedIntakeOperatorNodePushable;
import org.apache.asterix.external.operators.FeedMetaOperatorDescriptor;
import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.asterix.external.util.FeedUtils;
@@ -485,17 +479,4 @@
return Pair.of(combineIntakeCollectJobs(metadataProvider, feed, intakeJob, jobsList, feedConnections,
ingestionLocations), intakeInfo.getRight().getPartitionConstraint());
}
-
- public static void SendStopMessageToNode(ICcApplicationContext appCtx, EntityId feedId, String intakeNodeLocation,
- Integer partition) throws Exception {
- ActiveManagerMessage stopFeedMessage = new ActiveManagerMessage(Kind.STOP_ACTIVITY,
- new ActiveRuntimeId(feedId, FeedIntakeOperatorNodePushable.class.getSimpleName(), partition));
- SendActiveMessage(appCtx, stopFeedMessage, intakeNodeLocation);
- }
-
- private static void SendActiveMessage(ICcApplicationContext appCtx, ActiveManagerMessage activeManagerMessage,
- String nodeId) throws Exception {
- ICCMessageBroker messageBroker = (ICCMessageBroker) appCtx.getServiceContext().getMessageBroker();
- messageBroker.sendApplicationMessageToNC(activeManagerMessage, nodeId);
- }
}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java
index 9612ead..6c95958 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java
@@ -180,7 +180,7 @@
Action action = users[0].startActivity(listener);
action.sync();
assertFailure(action, 0);
- Assert.assertEquals(ActivityState.PERMANENTLY_FAILED, listener.getState());
+ Assert.assertEquals(ActivityState.STOPPED, listener.getState());
}
@Test
@@ -190,7 +190,42 @@
Action action = users[0].startActivity(listener);
action.sync();
assertFailure(action, 0);
- Assert.assertEquals(ActivityState.PERMANENTLY_FAILED, listener.getState());
+ Assert.assertEquals(ActivityState.STOPPED, listener.getState());
+ }
+
+ @Test
+ public void testStartWhenStartSucceedButTimesout() throws Exception {
+ Assert.assertEquals(ActivityState.STOPPED, listener.getState());
+ listener.onStart(Behavior.FAIL_START_TIMEOUT_OP_SUCCEED);
+ Action action = users[0].startActivity(listener);
+ action.sync();
+ assertSuccess(action);
+ Assert.assertEquals(ActivityState.RUNNING, listener.getState());
+ }
+
+ @Test
+ public void testStartWhenStartStuckTimesout() throws Exception {
+ Assert.assertEquals(ActivityState.STOPPED, listener.getState());
+ listener.onStart(Behavior.FAIL_START_TIMEOUT_STUCK);
+ Action action = users[0].startActivity(listener);
+ action.sync();
+ assertFailure(action, 0);
+ Assert.assertEquals(ActivityState.STOPPED, listener.getState());
+ }
+
+ @Test
+ public void testStopWhenStopTimesout() throws Exception {
+ Assert.assertEquals(ActivityState.STOPPED, listener.getState());
+ listener.onStart(Behavior.SUCCEED);
+ Action action = users[0].startActivity(listener);
+ action.sync();
+ assertSuccess(action);
+ Assert.assertEquals(ActivityState.RUNNING, listener.getState());
+ listener.onStop(Behavior.FAIL_STOP_TIMEOUT);
+ action = users[0].stopActivity(listener);
+ action.sync();
+ assertSuccess(action);
+ Assert.assertEquals(ActivityState.STOPPED, listener.getState());
}
@Test
@@ -336,14 +371,18 @@
}
@Test
- public void testSuspendFromRunningAndStopFail() throws Exception {
+ public void testSuspendFromRunningAndStopFailThenResumeSucceeds() throws Exception {
testStartWhenStartSucceed();
// suspend
Assert.assertEquals(ActivityState.RUNNING, listener.getState());
listener.onStop(Behavior.FAIL_COMPILE);
Action suspension = users[1].suspendActivity(listener);
suspension.sync();
- Assert.assertTrue(suspension.hasFailed());
+ Assert.assertFalse(suspension.hasFailed());
+ Assert.assertEquals(ActivityState.TEMPORARILY_FAILED, listener.getState());
+ Action resumption = users[1].resumeActivity(listener);
+ resumption.sync();
+ assertSuccess(resumption);
Assert.assertEquals(ActivityState.RUNNING, listener.getState());
}
@@ -492,19 +531,19 @@
WaitForStateSubscriber tempFailSubscriber =
new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED));
WaitForStateSubscriber permFailSubscriber =
- new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.PERMANENTLY_FAILED));
+ new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.STOPPED));
listener.onStart(Behavior.FAIL_COMPILE);
clusterController.jobFinish(listener.getJobId(), JobStatus.FAILURE,
Collections.singletonList(new HyracksDataException("Compilation Failure")));
tempFailSubscriber.sync();
permFailSubscriber.sync();
- Assert.assertEquals(ActivityState.PERMANENTLY_FAILED, listener.getState());
+ Assert.assertEquals(ActivityState.STOPPED, listener.getState());
}
@Test
public void testStartAfterPermenantFailure() throws Exception {
testRecoveryFailureAfterOneAttemptCompilationFailure();
- Assert.assertEquals(ActivityState.PERMANENTLY_FAILED, listener.getState());
+ Assert.assertEquals(ActivityState.STOPPED, listener.getState());
listener.onStart(Behavior.SUCCEED);
WaitForStateSubscriber subscriber = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RUNNING));
users[1].startActivity(listener);
@@ -536,13 +575,13 @@
WaitForStateSubscriber tempFailSubscriber =
new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED));
WaitForStateSubscriber permFailSubscriber =
- new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.PERMANENTLY_FAILED));
+ new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.STOPPED));
listener.onStart(Behavior.FAIL_RUNTIME);
clusterController.jobFinish(listener.getJobId(), JobStatus.FAILURE,
Collections.singletonList(new HyracksDataException("Runtime Failure")));
tempFailSubscriber.sync();
permFailSubscriber.sync();
- Assert.assertEquals(ActivityState.PERMANENTLY_FAILED, listener.getState());
+ Assert.assertEquals(ActivityState.STOPPED, listener.getState());
}
@SuppressWarnings("deprecation")
@@ -555,12 +594,12 @@
WaitForStateSubscriber tempFailSubscriber =
new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED));
WaitForStateSubscriber permFailSubscriber =
- new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.PERMANENTLY_FAILED));
+ new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.STOPPED));
clusterController.jobFinish(listener.getJobId(), JobStatus.FAILURE,
Collections.singletonList(new HyracksDataException("Runtime Failure")));
tempFailSubscriber.sync();
permFailSubscriber.sync();
- Assert.assertEquals(ActivityState.PERMANENTLY_FAILED, listener.getState());
+ Assert.assertEquals(ActivityState.STOPPED, listener.getState());
}
@SuppressWarnings("deprecation")
@@ -925,7 +964,7 @@
@SuppressWarnings("deprecation")
@Test
- public void testCreateNewShadowDuringRecoveryAttemptThatSucceeds() throws Exception {
+ public void testCreateNewDatasetDuringRecoveryAttemptThatSucceeds() throws Exception {
testStartWhenStartSucceed();
listener.onStart(Behavior.FAIL_COMPILE);
WaitForStateSubscriber tempFailSubscriber =
@@ -953,7 +992,7 @@
@SuppressWarnings("deprecation")
@Test
- public void testCreateNewShadowDuringRecoveryAttemptThatFailsCompile() throws Exception {
+ public void testCreateNewDatasetDuringRecoveryAttemptThatFailsCompile() throws Exception {
testStartWhenStartSucceed();
listener.onStart(Behavior.FAIL_COMPILE);
WaitForStateSubscriber tempFailSubscriber =
@@ -980,7 +1019,7 @@
@SuppressWarnings("deprecation")
@Test
- public void testCreateNewShadowDuringRecoveryAttemptThatFailsRuntime() throws Exception {
+ public void testCreateNewDatasetDuringRecoveryAttemptThatFailsRuntime() throws Exception {
testStartWhenStartSucceed();
listener.onStart(Behavior.FAIL_COMPILE);
WaitForStateSubscriber tempFailSubscriber =
@@ -1006,7 +1045,7 @@
}
@Test
- public void testCreateNewShadowWhileStarting() throws Exception {
+ public void testCreateNewDatasetWhileStarting() throws Exception {
Assert.assertEquals(ActivityState.STOPPED, listener.getState());
listener.onStart(Behavior.STEP_SUCCEED);
Action startAction = users[0].startActivity(listener);
@@ -1027,7 +1066,7 @@
}
@Test
- public void testCreateNewShadowWhileRunning() throws Exception {
+ public void testCreateNewDatasetWhileRunning() throws Exception {
testStartWhenStartSucceed();
Dataset newDataset =
new Dataset(dataverseName, "newDataset", null, null, null, null, null, null, null, null, 0, 0);
@@ -1040,7 +1079,7 @@
}
@Test
- public void testCreateNewShadowWhileSuspended() throws Exception {
+ public void testCreateNewDatasetWhileSuspended() throws Exception {
testStartWhenStartSucceed();
// suspend
Assert.assertEquals(ActivityState.RUNNING, listener.getState());
@@ -1065,22 +1104,22 @@
}
@Test
- public void testCreateNewShadowWhilePermanentFailure() throws Exception {
+ public void testCreateNewDatasetWhilePermanentFailure() throws Exception {
testRecoveryFailureAfterOneAttemptCompilationFailure();
- Assert.assertEquals(ActivityState.PERMANENTLY_FAILED, listener.getState());
+ Assert.assertEquals(ActivityState.STOPPED, listener.getState());
Dataset newDataset =
new Dataset(dataverseName, "newDataset", null, null, null, null, null, null, null, null, 0, 0);
Action createDatasetAction = users[0].addDataset(newDataset, listener);
createDatasetAction.sync();
assertSuccess(createDatasetAction);
- Assert.assertEquals(ActivityState.PERMANENTLY_FAILED, listener.getState());
+ Assert.assertEquals(ActivityState.STOPPED, listener.getState());
Assert.assertEquals(3, listener.getDatasets().size());
Assert.assertEquals(clusterController.getAllDatasets().size(), listener.getDatasets().size());
}
@SuppressWarnings("deprecation")
@Test
- public void testDeleteShadowDuringRecoveryAttemptThatSucceeds() throws Exception {
+ public void testDeleteDatasetDuringRecoveryAttemptThatSucceeds() throws Exception {
testStartWhenStartSucceed();
listener.onStart(Behavior.FAIL_COMPILE);
WaitForStateSubscriber tempFailSubscriber =
@@ -1106,7 +1145,7 @@
@SuppressWarnings("deprecation")
@Test
- public void testDeleteShadowDuringRecoveryAttemptThatFailsCompile() throws Exception {
+ public void testDeleteDatasetDuringRecoveryAttemptThatFailsCompile() throws Exception {
testStartWhenStartSucceed();
listener.onStart(Behavior.FAIL_COMPILE);
WaitForStateSubscriber tempFailSubscriber =
@@ -1131,7 +1170,7 @@
@SuppressWarnings("deprecation")
@Test
- public void testDeleteShadowDuringRecoveryAttemptThatFailsRuntime() throws Exception {
+ public void testDeleteDatasetDuringRecoveryAttemptThatFailsRuntime() throws Exception {
testStartWhenStartSucceed();
listener.onStart(Behavior.FAIL_COMPILE);
WaitForStateSubscriber tempFailSubscriber =
@@ -1155,7 +1194,7 @@
}
@Test
- public void testDeleteShadowWhileStarting() throws Exception {
+ public void testDeleteDatasetWhileStarting() throws Exception {
Assert.assertEquals(ActivityState.STOPPED, listener.getState());
listener.onStart(Behavior.STEP_SUCCEED);
Action startAction = users[0].startActivity(listener);
@@ -1174,7 +1213,7 @@
}
@Test
- public void testDeleteShadowWhileRunning() throws Exception {
+ public void testDeleteDatasetWhileRunning() throws Exception {
testStartWhenStartSucceed();
Action dropDatasetAction = users[1].dropDataset(firstDataset, listener);
dropDatasetAction.sync();
@@ -1185,19 +1224,19 @@
}
@Test
- public void testDeleteShadowWhilePermanentFailure() throws Exception {
+ public void testDeleteDatasetWhilePermanentFailure() throws Exception {
testRecoveryFailureAfterOneAttemptCompilationFailure();
- Assert.assertEquals(ActivityState.PERMANENTLY_FAILED, listener.getState());
+ Assert.assertEquals(ActivityState.STOPPED, listener.getState());
Action dropDatasetAction = users[0].dropDataset(secondDataset, listener);
dropDatasetAction.sync();
assertSuccess(dropDatasetAction);
- Assert.assertEquals(ActivityState.PERMANENTLY_FAILED, listener.getState());
+ Assert.assertEquals(ActivityState.STOPPED, listener.getState());
Assert.assertEquals(1, listener.getDatasets().size());
Assert.assertEquals(clusterController.getAllDatasets().size(), listener.getDatasets().size());
}
@Test
- public void testDeleteShadowWhileSuspended() throws Exception {
+ public void testDeleteDatasetWhileSuspended() throws Exception {
testStartWhenStartSucceed();
// suspend
Assert.assertEquals(ActivityState.RUNNING, listener.getState());
@@ -1317,7 +1356,7 @@
@Test
public void testCreateNewIndexWhilePermanentFailure() throws Exception {
testRecoveryFailureAfterOneAttemptCompilationFailure();
- Assert.assertEquals(ActivityState.PERMANENTLY_FAILED, listener.getState());
+ Assert.assertEquals(ActivityState.STOPPED, listener.getState());
Action add = users[1].addIndex(firstDataset, listener);
add.sync();
assertSuccess(add);
@@ -1442,7 +1481,7 @@
@Test
public void testDeleteIndexWhilePermanentFailure() throws Exception {
testRecoveryFailureAfterOneAttemptCompilationFailure();
- Assert.assertEquals(ActivityState.PERMANENTLY_FAILED, listener.getState());
+ Assert.assertEquals(ActivityState.STOPPED, listener.getState());
Action drop = users[1].dropIndex(firstDataset, listener);
drop.sync();
assertSuccess(drop);
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/DummyFeedEventsListener.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/DummyFeedEventsListener.java
index c269803..47fc46f 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/DummyFeedEventsListener.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/DummyFeedEventsListener.java
@@ -19,7 +19,6 @@
package org.apache.asterix.test.active;
import java.util.Collections;
-import java.util.EnumSet;
import java.util.List;
import org.apache.asterix.active.ActivityState;
@@ -35,7 +34,6 @@
import org.apache.asterix.metadata.entities.FeedConnection;
import org.apache.asterix.translator.IStatementExecutor;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -64,14 +62,13 @@
}
@Override
- protected Void doStop(MetadataProvider metadataProvider) throws HyracksDataException {
+ protected void doStop(MetadataProvider metadataProvider) throws HyracksDataException {
IActiveEntityEventSubscriber eventSubscriber =
- new WaitForStateSubscriber(this, EnumSet.of(ActivityState.RUNNING, ActivityState.PERMANENTLY_FAILED));
+ new WaitForStateSubscriber(this, Collections.singleton(ActivityState.STOPPED));
try {
eventSubscriber.sync();
} catch (Exception e) {
throw HyracksDataException.create(e);
}
- return null;
}
}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestEventsListener.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestEventsListener.java
index 7e1bc37..c771a94 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestEventsListener.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestEventsListener.java
@@ -19,11 +19,10 @@
package org.apache.asterix.test.active;
import java.util.Collections;
-import java.util.EnumSet;
import java.util.List;
-import java.util.Set;
import java.util.concurrent.Semaphore;
+import org.apache.asterix.active.ActiveRuntimeId;
import org.apache.asterix.active.ActivityState;
import org.apache.asterix.active.EntityId;
import org.apache.asterix.active.IRetryPolicyFactory;
@@ -31,17 +30,19 @@
import org.apache.asterix.common.api.IMetadataLockManager;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.metadata.LockList;
-import org.apache.asterix.external.feed.watch.WaitForStateSubscriber;
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.metadata.entities.Dataset;
import org.apache.asterix.translator.IStatementExecutor;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobIdFactory;
import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.util.ExitUtil;
public class TestEventsListener extends ActiveEntityEventsListener {
@@ -50,6 +51,9 @@
RUNNING_JOB_FAIL,
FAIL_COMPILE,
FAIL_RUNTIME,
+ FAIL_START_TIMEOUT_OP_SUCCEED,
+ FAIL_START_TIMEOUT_STUCK,
+ FAIL_STOP_TIMEOUT,
STEP_SUCCEED,
STEP_FAIL_COMPILE,
STEP_FAIL_RUNTIME
@@ -103,9 +107,8 @@
}
}
- @SuppressWarnings("deprecation")
@Override
- protected void doStart(MetadataProvider metadataProvider) throws HyracksDataException {
+ protected JobId compileAndStartJob(MetadataProvider metadataProvider) throws HyracksDataException {
step(onStart);
try {
metadataProvider.getApplicationContext().getMetadataLockManager()
@@ -119,60 +122,72 @@
try {
startJob.sync();
} catch (InterruptedException e) {
- throw HyracksDataException.create(e);
+ ExitUtil.halt(ExitUtil.EC_ABNORMAL_TERMINATION);
}
- WaitForStateSubscriber subscriber = new WaitForStateSubscriber(this,
- EnumSet.of(ActivityState.RUNNING, ActivityState.TEMPORARILY_FAILED, ActivityState.PERMANENTLY_FAILED));
if (onStart == Behavior.FAIL_RUNTIME || onStart == Behavior.STEP_FAIL_RUNTIME) {
clusterController.jobFinish(jobId, JobStatus.FAILURE,
Collections.singletonList(new HyracksDataException("RuntimeFailure")));
- } else {
+ } else if (onStart != Behavior.FAIL_START_TIMEOUT_OP_SUCCEED && onStart != Behavior.FAIL_START_TIMEOUT_STUCK) {
for (int i = 0; i < nodeControllers.length; i++) {
TestNodeControllerActor nodeController = nodeControllers[i];
nodeController.registerRuntime(jobId, entityId, i);
}
}
- try {
- subscriber.sync();
- if (subscriber.getFailure() != null) {
- throw subscriber.getFailure();
+ if (onStart == Behavior.FAIL_START_TIMEOUT_OP_SUCCEED) {
+ for (int i = 0; i < nodeControllers.length; i++) {
+ TestNodeControllerActor nodeController = nodeControllers[i];
+ try {
+ nodeController.registerRuntime(jobId, entityId, i).sync();
+ } catch (InterruptedException e) {
+ ExitUtil.halt(ExitUtil.EC_ABNORMAL_TERMINATION);
+ }
}
- } catch (Exception e) {
- throw HyracksDataException.create(e);
+ // At this point, the job has started and both nodes reported that they started.
+ // but since we're holding the lock on the listener (this is a synchronized method), the state
+ // didn't change yet
+ while (state != ActivityState.RUNNING) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ ExitUtil.halt(ExitUtil.EC_ABNORMAL_TERMINATION);
+ }
+ }
+ Thread.currentThread().interrupt();
+ } else if (onStart == Behavior.FAIL_START_TIMEOUT_STUCK) {
+ TestNodeControllerActor nodeController = nodeControllers[0];
+ try {
+ nodeController.registerRuntime(jobId, entityId, 0).sync();
+ } catch (InterruptedException e) {
+ ExitUtil.halt(ExitUtil.EC_ABNORMAL_TERMINATION);
+ }
+ Thread.currentThread().interrupt();
}
+ return jobId;
}
- @SuppressWarnings("deprecation")
@Override
- protected Void doStop(MetadataProvider metadataProvider) throws HyracksDataException {
- ActivityState intention = state;
+ protected void cancelJobSafely(MetadataProvider metadataProvider, Throwable th) {
+ clusterController.jobFinish(jobId, JobStatus.FAILURE,
+ Collections.singletonList(HyracksException.create(ErrorCode.JOB_CANCELED, jobId)));
+ }
+
+ @Override
+ protected void sendStopMessages(MetadataProvider metadataProvider) throws Exception {
step(onStop);
failCompile(onStop);
- try {
- Set<ActivityState> waitFor;
- if (intention == ActivityState.STOPPING) {
- waitFor = EnumSet.of(ActivityState.STOPPED, ActivityState.PERMANENTLY_FAILED);
- } else if (intention == ActivityState.SUSPENDING) {
- waitFor = EnumSet.of(ActivityState.SUSPENDED, ActivityState.TEMPORARILY_FAILED);
- } else {
- throw new IllegalStateException("stop with what intention??");
+ if (onStop == Behavior.RUNNING_JOB_FAIL) {
+ clusterController.jobFinish(jobId, JobStatus.FAILURE,
+ Collections.singletonList(new HyracksDataException("RuntimeFailure")));
+ } else if (onStop == Behavior.FAIL_STOP_TIMEOUT) {
+ // Nothing happens.
+ Thread.currentThread().interrupt();
+ } else {
+ for (int i = 0; i < nodeControllers.length; i++) {
+ TestNodeControllerActor nodeController = nodeControllers[0];
+ nodeController.deRegisterRuntime(jobId, entityId, i).sync();
}
- WaitForStateSubscriber subscriber = new WaitForStateSubscriber(this, waitFor);
- if (onStop == Behavior.RUNNING_JOB_FAIL) {
- clusterController.jobFinish(jobId, JobStatus.FAILURE,
- Collections.singletonList(new HyracksDataException("RuntimeFailure")));
- } else {
- for (int i = 0; i < nodeControllers.length; i++) {
- TestNodeControllerActor nodeController = nodeControllers[0];
- nodeController.deRegisterRuntime(jobId, entityId, i).sync();
- }
- clusterController.jobFinish(jobId, JobStatus.TERMINATED, Collections.emptyList());
- }
- subscriber.sync();
- } catch (Exception e) {
- throw HyracksDataException.create(e);
+ clusterController.jobFinish(jobId, JobStatus.TERMINATED, Collections.emptyList());
}
- return null;
}
public void onStart(Behavior behavior) {
@@ -184,25 +199,31 @@
}
@Override
- protected void setRunning(MetadataProvider metadataProvider, boolean running) throws HyracksDataException {
+ protected void setRunning(MetadataProvider metadataProvider, boolean running) {
try {
IMetadataLockManager lockManager = metadataProvider.getApplicationContext().getMetadataLockManager();
LockList locks = metadataProvider.getLocks();
lockManager.acquireDataverseReadLock(locks, entityId.getDataverse());
lockManager.acquireActiveEntityWriteLock(locks, entityId.getDataverse() + '.' + entityId.getEntityName());
// persist entity
- } catch (Exception e) {
- throw HyracksDataException.create(e);
+ } catch (Throwable th) {
+ // This failure puts the system in a bad state.
+ throw new IllegalStateException(th);
}
}
@Override
- protected Void doSuspend(MetadataProvider metadataProvider) throws HyracksDataException {
- return doStop(metadataProvider);
+ protected void doSuspend(MetadataProvider metadataProvider) throws HyracksDataException {
+ doStop(metadataProvider);
}
@Override
protected void doResume(MetadataProvider metadataProvider) throws HyracksDataException {
doStart(metadataProvider);
}
+
+ @Override
+ protected ActiveRuntimeId getActiveRuntimeId(int partition) {
+ throw new UnsupportedOperationException();
+ }
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/AbstractSubscriber.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/AbstractSubscriber.java
index 880c4b4..37b157e 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/AbstractSubscriber.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/AbstractSubscriber.java
@@ -20,6 +20,7 @@
import org.apache.asterix.active.IActiveEntityEventSubscriber;
import org.apache.asterix.active.IActiveEntityEventsListener;
+import org.apache.hyracks.util.Span;
public abstract class AbstractSubscriber implements IActiveEntityEventSubscriber {
@@ -55,6 +56,18 @@
}
}
+ public boolean sync(Span span) throws InterruptedException {
+ synchronized (listener) {
+ while (!done) {
+ span.wait(listener);
+ if (done || span.elapsed()) {
+ return done;
+ }
+ }
+ return done;
+ }
+ }
+
public Exception getFailure() {
return failure;
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NoOpSubscriber.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NoOpSubscriber.java
index 8230b48..627e563 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NoOpSubscriber.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NoOpSubscriber.java
@@ -21,7 +21,6 @@
import org.apache.asterix.active.ActiveEvent;
import org.apache.asterix.active.IActiveEntityEventSubscriber;
import org.apache.asterix.active.IActiveEntityEventsListener;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
/**
* An event subscriber that does not listen to any events
@@ -49,7 +48,7 @@
}
@Override
- public void subscribed(IActiveEntityEventsListener eventsListener) throws HyracksDataException {
+ public void subscribed(IActiveEntityEventsListener eventsListener) {
// no op
}
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/StatsSubscriber.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/StatsSubscriber.java
index a571904..31459d1 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/StatsSubscriber.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/StatsSubscriber.java
@@ -30,24 +30,16 @@
}
@Override
- public void notify(ActiveEvent event) throws HyracksDataException {
+ public void notify(ActiveEvent event) {
if (event.getEventKind() == ActiveEvent.Kind.STATS_UPDATED) {
- try {
- complete(null);
- } catch (Exception e) {
- throw HyracksDataException.create(e);
- }
+ complete(null);
} else if (event.getEventKind() == ActiveEvent.Kind.FAILURE) {
- try {
- complete((Exception) event.getEventObject());
- } catch (Exception e) {
- throw HyracksDataException.create(e);
- }
+ complete((Exception) event.getEventObject());
}
}
@Override
- public void subscribed(IActiveEntityEventsListener eventsListener) throws HyracksDataException {
+ public void subscribed(IActiveEntityEventsListener eventsListener) {
//Does nothing upon subscription
}
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/WaitForStateSubscriber.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/WaitForStateSubscriber.java
index a1cdfb0..818d826 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/WaitForStateSubscriber.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/WaitForStateSubscriber.java
@@ -23,39 +23,28 @@
import org.apache.asterix.active.ActiveEvent;
import org.apache.asterix.active.ActivityState;
import org.apache.asterix.active.IActiveEntityEventsListener;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
public class WaitForStateSubscriber extends AbstractSubscriber {
private final Set<ActivityState> targetStates;
- public WaitForStateSubscriber(IActiveEntityEventsListener listener, Set<ActivityState> targetStates)
- throws HyracksDataException {
+ public WaitForStateSubscriber(IActiveEntityEventsListener listener, Set<ActivityState> targetStates) {
super(listener);
this.targetStates = targetStates;
listener.subscribe(this);
}
@Override
- public void notify(ActiveEvent event) throws HyracksDataException {
+ public void notify(ActiveEvent event) {
if (targetStates.contains(listener.getState())) {
- if (listener.getState() == ActivityState.PERMANENTLY_FAILED
- || listener.getState() == ActivityState.TEMPORARILY_FAILED) {
- complete(listener.getJobFailure());
- } else {
- complete(null);
- }
+ complete(listener.getJobFailure());
} else if (event != null && event.getEventKind() == ActiveEvent.Kind.FAILURE) {
- try {
- complete((Exception) event.getEventObject());
- } catch (Exception e) {
- throw HyracksDataException.create(e);
- }
+ complete((Exception) event.getEventObject());
}
}
@Override
- public void subscribed(IActiveEntityEventsListener eventsListener) throws HyracksDataException {
+ public void subscribed(IActiveEntityEventsListener eventsListener) {
if (targetStates.contains(listener.getState())) {
complete(null);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
index 5492919..79d3fac 100644
--- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
@@ -39,6 +39,7 @@
public static final int EC_FAILED_TO_ABORT_METADATA_TXN = 7;
public static final int EC_INCONSISTENT_METADATA = 8;
public static final int EC_UNHANDLED_EXCEPTION = 11;
+ public static final int EC_FAILED_TO_CANCEL_ACTIVE_START_STOP = 22;
public static final int EC_IMMEDIATE_HALT = 33;
public static final int EC_HALT_ABNORMAL_RESERVED_44 = 44;
public static final int EC_IO_SCHEDULER_FAILED = 55;