[NO ISSUE][ING] Add timeout to stop active message
Change-Id: Ie0416d76670e945cb958f5a1c235201a3e016009
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2762
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
index 2aebc59..6373d6c 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
@@ -33,6 +33,7 @@
import org.apache.asterix.active.message.ActiveManagerMessage;
import org.apache.asterix.active.message.ActiveStatsRequestMessage;
import org.apache.asterix.active.message.ActiveStatsResponse;
+import org.apache.asterix.active.message.StopRuntimeParameters;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.common.memory.ConcurrentFramePool;
@@ -40,6 +41,7 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.util.JavaSerializationUtils;
import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.hyracks.util.ExitUtil;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -139,7 +141,7 @@
shutdown = true;
runtimes.forEach((runtimeId, runtime) -> stopFutures.put(runtimeId, executor.submit(() -> {
// we may already have been stopped- only stop once
- stopIfRunning(runtimeId, runtime);
+ stopIfRunning(runtime, SHUTDOWN_TIMEOUT_SECS, TimeUnit.SECONDS);
return null;
})));
stopFutures.entrySet().parallelStream().forEach(entry -> {
@@ -157,8 +159,10 @@
LOGGER.warn("Shutdown ActiveManager on node " + nodeId + " complete");
}
+ @SuppressWarnings("squid:S1181") // Catch Error
private void stopRuntime(ActiveManagerMessage message) {
- ActiveRuntimeId runtimeId = (ActiveRuntimeId) message.getPayload();
+ StopRuntimeParameters content = (StopRuntimeParameters) message.getPayload();
+ ActiveRuntimeId runtimeId = content.getRuntimeId();
IActiveRuntime runtime = runtimes.get(runtimeId);
if (runtime == null) {
LOGGER.warn("Request to stop runtime: " + runtimeId
@@ -167,21 +171,23 @@
} else {
executor.execute(() -> {
try {
- stopIfRunning(runtimeId, runtime);
+ stopIfRunning(runtime, content.getTimeout(), content.getUnit());
} catch (Exception e) {
- // TODO(till) Figure out a better way to handle failure to stop a runtime
- LOGGER.log(Level.WARN, "Failed to stop runtime: " + runtimeId, e);
+ LOGGER.warn("Failed to stop runtime: {}", runtimeId, e);
+ } catch (Throwable th) {
+ LOGGER.warn("Failed to stop runtime: {}", runtimeId, th);
+ ExitUtil.halt(ExitUtil.EC_UNCAUGHT_THROWABLE);
}
});
}
}
- private void stopIfRunning(ActiveRuntimeId runtimeId, IActiveRuntime runtime)
+ private void stopIfRunning(IActiveRuntime runtime, long timeout, TimeUnit unit)
throws HyracksDataException, InterruptedException {
- if (runtimes.containsKey(runtimeId)) {
- runtime.stop();
+ if (runtimes.containsKey(runtime.getRuntimeId())) {
+ runtime.stop(timeout, unit);
} else {
- LOGGER.info("Not stopping already stopped runtime " + runtimeId);
+ LOGGER.info("Not stopping already stopped runtime {}", runtime.getRuntimeId());
}
}
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
index 2f30df4..af9d109 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
@@ -18,6 +18,8 @@
*/
package org.apache.asterix.active;
+import java.util.concurrent.TimeUnit;
+
import org.apache.asterix.active.message.ActivePartitionMessage;
import org.apache.asterix.active.message.ActivePartitionMessage.Event;
import org.apache.asterix.common.api.INcApplicationContext;
@@ -62,10 +64,10 @@
protected abstract void start() throws HyracksDataException, InterruptedException;
@Override
- public final void stop() throws HyracksDataException, InterruptedException {
+ public final void stop(long timeout, TimeUnit unit) throws HyracksDataException, InterruptedException {
synchronized (this) {
if (!done) {
- abort();
+ abort(timeout, unit);
}
while (!done) {
wait();
@@ -76,10 +78,13 @@
/**
* called from a different thread. This method stops the active node and force the start() call to return
*
+ * @param unit
+ * @param timeout
+ *
* @throws HyracksDataException
* @throws InterruptedException
*/
- protected abstract void abort() throws HyracksDataException, InterruptedException;
+ protected abstract void abort(long timeout, TimeUnit unit) throws HyracksDataException, InterruptedException;
@Override
public String toString() {
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntime.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntime.java
index f37b2e8..2da7193 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntime.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntime.java
@@ -18,6 +18,8 @@
*/
package org.apache.asterix.active;
+import java.util.concurrent.TimeUnit;
+
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobId;
@@ -31,10 +33,15 @@
/**
* Stops the running activity
*
+ * @param timeout
+ * time for graceful stop. interrupt the runtime after that
+ * @param unit
+ * unit of the timeout
+ *
* @throws HyracksDataException
* @throws InterruptedException
*/
- void stop() throws HyracksDataException, InterruptedException;
+ void stop(long timeout, TimeUnit unit) throws HyracksDataException, InterruptedException;
/**
* @return the job id associated with this active runtime
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StopRuntimeParameters.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StopRuntimeParameters.java
new file mode 100644
index 0000000..fbc41a1
--- /dev/null
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StopRuntimeParameters.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.active.message;
+
+import java.io.Serializable;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.asterix.active.ActiveRuntimeId;
+
+public class StopRuntimeParameters implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+ private final ActiveRuntimeId runtimeId;
+ private final long timeout;
+ private final TimeUnit unit;
+
+ public StopRuntimeParameters(ActiveRuntimeId runtimeId, long timeout, TimeUnit unit) {
+ this.runtimeId = runtimeId;
+ this.timeout = timeout;
+ this.unit = unit;
+ }
+
+ public ActiveRuntimeId getRuntimeId() {
+ return runtimeId;
+ }
+
+ public long getTimeout() {
+ return timeout;
+ }
+
+ public TimeUnit getUnit() {
+ return unit;
+ }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
index 66d3e81..be43a4e 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
@@ -40,6 +40,7 @@
import org.apache.asterix.active.message.ActivePartitionMessage;
import org.apache.asterix.active.message.ActivePartitionMessage.Event;
import org.apache.asterix.active.message.ActiveStatsRequestMessage;
+import org.apache.asterix.active.message.StopRuntimeParameters;
import org.apache.asterix.common.cluster.IClusterStateManager;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.ErrorCode;
@@ -73,7 +74,10 @@
private static final EnumSet<ActivityState> TRANSITION_STATES = EnumSet.of(ActivityState.RESUMING,
ActivityState.STARTING, ActivityState.STOPPING, ActivityState.RECOVERING, ActivityState.CANCELLING);
private static final String DEFAULT_ACTIVE_STATS = "{\"Stats\":\"N/A\"}";
- // finals
+ // TODO: Make configurable https://issues.apache.org/jira/browse/ASTERIXDB-2065
+ protected static final long STOP_MESSAGE_TIMEOUT = 2L;
+ protected static final long SUSPEND_MESSAGE_TIMEOUT = 5L;
+ protected static final TimeUnit TIMEOUT_UNIT = TimeUnit.MINUTES;
protected final IClusterStateManager clusterStateManager;
protected final ActiveNotificationHandler handler;
protected final List<IActiveEntityEventSubscriber> subscribers = new ArrayList<>();
@@ -431,7 +435,8 @@
protected abstract JobId compileAndStartJob(MetadataProvider metadataProvider) throws HyracksDataException;
@SuppressWarnings("squid:S1181")
- protected synchronized void doStop(MetadataProvider metadataProvider) throws HyracksDataException {
+ protected synchronized void doStop(MetadataProvider metadataProvider, long timeout, TimeUnit unit)
+ throws HyracksDataException {
ActivityState intention = state;
Set<ActivityState> waitFor;
if (intention == ActivityState.STOPPING) {
@@ -444,7 +449,7 @@
WaitForStateSubscriber subscriber = new WaitForStateSubscriber(this, waitFor);
// Note: once we start sending stop messages, we can't go back until the entity is stopped
try {
- sendStopMessages(metadataProvider);
+ sendStopMessages(metadataProvider, timeout, unit);
LOGGER.log(Level.DEBUG, "Waiting for its state to become " + waitFor);
subscriber.sync();
LOGGER.log(Level.DEBUG, "Disconnect has been completed " + waitFor);
@@ -465,7 +470,7 @@
LOGGER.warn("Failure encountered while stopping {}", this, e);
}
- protected void sendStopMessages(MetadataProvider metadataProvider) throws Exception {
+ protected void sendStopMessages(MetadataProvider metadataProvider, long timeout, TimeUnit unit) throws Exception {
ICcApplicationContext applicationCtx = metadataProvider.getApplicationContext();
ICCMessageBroker messageBroker = (ICCMessageBroker) applicationCtx.getServiceContext().getMessageBroker();
AlgebricksAbsolutePartitionConstraint runtimeLocations = getLocations();
@@ -473,9 +478,9 @@
LOGGER.log(Level.INFO, "Sending stop messages to " + runtimeLocations);
for (String location : runtimeLocations.getLocations()) {
LOGGER.log(Level.INFO, "Sending to " + location);
- messageBroker.sendApplicationMessageToNC(
- new ActiveManagerMessage(ActiveManagerMessage.Kind.STOP_ACTIVITY, getActiveRuntimeId(partition++)),
- location);
+ ActiveRuntimeId runtimeId = getActiveRuntimeId(partition++);
+ messageBroker.sendApplicationMessageToNC(new ActiveManagerMessage(ActiveManagerMessage.Kind.STOP_ACTIVITY,
+ new StopRuntimeParameters(runtimeId, timeout, unit)), location);
}
}
@@ -510,7 +515,7 @@
} else if (state == ActivityState.RUNNING) {
setState(ActivityState.STOPPING);
try {
- doStop(metadataProvider);
+ doStop(metadataProvider, STOP_MESSAGE_TIMEOUT, TIMEOUT_UNIT);
} catch (Exception e) {
setState(ActivityState.STOPPED);
LOGGER.log(Level.ERROR, "Failed to stop the entity " + entityId, e);
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/DummyFeedEventsListener.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/DummyFeedEventsListener.java
index 47fc46f..88f1332 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/DummyFeedEventsListener.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/DummyFeedEventsListener.java
@@ -20,6 +20,7 @@
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import org.apache.asterix.active.ActivityState;
import org.apache.asterix.active.EntityId;
@@ -62,7 +63,7 @@
}
@Override
- protected void doStop(MetadataProvider metadataProvider) throws HyracksDataException {
+ protected void doStop(MetadataProvider metadataProvider, long timeout, TimeUnit unit) throws HyracksDataException {
IActiveEntityEventSubscriber eventSubscriber =
new WaitForStateSubscriber(this, Collections.singleton(ActivityState.STOPPED));
try {
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestEventsListener.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestEventsListener.java
index c771a94..80806f3 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestEventsListener.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestEventsListener.java
@@ -21,6 +21,7 @@
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
import org.apache.asterix.active.ActiveRuntimeId;
import org.apache.asterix.active.ActivityState;
@@ -172,7 +173,7 @@
}
@Override
- protected void sendStopMessages(MetadataProvider metadataProvider) throws Exception {
+ protected void sendStopMessages(MetadataProvider metadataProvider, long timeout, TimeUnit unit) throws Exception {
step(onStop);
failCompile(onStop);
if (onStop == Behavior.RUNNING_JOB_FAIL) {
@@ -214,7 +215,7 @@
@Override
protected void doSuspend(MetadataProvider metadataProvider) throws HyracksDataException {
- doStop(metadataProvider);
+ doStop(metadataProvider, SUSPEND_MESSAGE_TIMEOUT, TIMEOUT_UNIT);
}
@Override
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
index 867fb60..98f75df 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
@@ -18,6 +18,8 @@
*/
package org.apache.asterix.external.operators;
+import java.util.concurrent.TimeUnit;
+
import org.apache.asterix.active.ActiveRuntimeId;
import org.apache.asterix.active.ActiveSourceOperatorNodePushable;
import org.apache.asterix.active.EntityId;
@@ -44,8 +46,6 @@
*/
public class FeedIntakeOperatorNodePushable extends ActiveSourceOperatorNodePushable {
private static final Logger LOGGER = LogManager.getLogger();
- // TODO: Make configurable https://issues.apache.org/jira/browse/ASTERIXDB-2065
- public static final int DEFAULT_ABORT_TIMEOUT = 60000;
private final FeedIntakeOperatorDescriptor opDesc;
private final FeedAdapter adapter;
private boolean poisoned = false;
@@ -125,12 +125,12 @@
}
@Override
- protected void abort() throws HyracksDataException, InterruptedException {
+ protected void abort(long timeout, TimeUnit unit) throws HyracksDataException, InterruptedException {
LOGGER.info(runtimeId + " aborting...");
synchronized (this) {
poisoned = true;
try {
- if (!adapter.stop(DEFAULT_ABORT_TIMEOUT)) {
+ if (!adapter.stop(unit.toMillis(timeout))) {
LOGGER.info(runtimeId + " failed to stop adapter. interrupting the thread...");
taskThread.interrupt();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/CleanupUtils.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/CleanupUtils.java
index c5a6de2..ea86298 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/CleanupUtils.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/CleanupUtils.java
@@ -65,7 +65,8 @@
writer.close();
} catch (Throwable th) { // NOSONAR Will be suppressed
try {
- LOGGER.log(Level.WARN, "Failure closing a closeable resource", th);
+ LOGGER.log(Level.WARN, "Failure closing a closeable resource of class {}",
+ writer.getClass().getSimpleName(), th);
} catch (Throwable loggingFailure) { // NOSONAR: Ignore catching Throwable
// NOSONAR: Ignore logging failure
}
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
index 79d3fac..14cfc59 100644
--- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
@@ -38,6 +38,7 @@
public static final int EC_FAILED_TO_COMMIT_METADATA_TXN = 6;
public static final int EC_FAILED_TO_ABORT_METADATA_TXN = 7;
public static final int EC_INCONSISTENT_METADATA = 8;
+ public static final int EC_UNCAUGHT_THROWABLE = 9;
public static final int EC_UNHANDLED_EXCEPTION = 11;
public static final int EC_FAILED_TO_CANCEL_ACTIVE_START_STOP = 22;
public static final int EC_IMMEDIATE_HALT = 33;