[ASTERIXDB-2064][ING] Timeout Stop Feed
- user model changes: no
- storage format changes: no
- interface changes: no
details:
- The abort feed message stops the reader and wait for the
dataflow controller to signal end of life.
- If the reader returns true to stop but the dataflow controller
never signal ends, it can get stuck.
- This change adds a timeout after which, the task thread is
interrupted.
Change-Id: If609a8343767ee7a80689a58af35a1b3fca2964b
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1964
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataFlowController.java
index 7412338..f59b82e 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataFlowController.java
@@ -40,7 +40,7 @@
throw new RuntimeDataException(ErrorCode.OPERATION_NOT_SUPPORTED);
}
- public default boolean stop() throws HyracksDataException {
+ public default boolean stop(long timeout) throws HyracksDataException {
throw new RuntimeDataException(ErrorCode.OPERATION_NOT_SUPPORTED);
}
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
index e24c26d..4ed1b08 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
@@ -213,16 +213,24 @@
}
}
- private void waitForSignal() throws InterruptedException {
+ private void waitForSignal(long timeout) throws InterruptedException, HyracksDataException {
+ if (timeout <= 0) {
+ throw new IllegalArgumentException("timeout must be greater than 0");
+ }
synchronized (closed) {
while (!closed.get()) {
- closed.wait();
+ long before = System.currentTimeMillis();
+ closed.wait(timeout);
+ timeout -= System.currentTimeMillis() - before;
+ if (!closed.get() && timeout <= 0) {
+ throw HyracksDataException.create(org.apache.hyracks.api.exceptions.ErrorCode.TIMEOUT);
+ }
}
}
}
@Override
- public boolean stop() throws HyracksDataException {
+ public boolean stop(long timeout) throws HyracksDataException {
synchronized (this) {
switch (state) {
case CREATED:
@@ -238,7 +246,7 @@
}
if (recordReader.stop()) {
try {
- waitForSignal();
+ waitForSignal(timeout);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw HyracksDataException.create(e);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java
index 1f1f545..025520e 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java
@@ -71,7 +71,7 @@
}
@Override
- public boolean stop() throws HyracksDataException {
+ public boolean stop(long timeout) throws HyracksDataException {
try {
if (stream.stop()) {
return true;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
index eeda80c..fd9db7e 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
@@ -35,8 +35,8 @@
controller.start(writer);
}
- public boolean stop() throws HyracksDataException {
- return controller.stop();
+ public boolean stop(long timeout) throws HyracksDataException {
+ return controller.stop(timeout);
}
public boolean pause() throws HyracksDataException {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
index 16b8fba..7907e69 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
@@ -30,6 +30,7 @@
import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.util.HyracksConstants;
import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender;
@@ -42,6 +43,8 @@
*/
public class FeedIntakeOperatorNodePushable extends ActiveSourceOperatorNodePushable {
private static final Logger LOGGER = Logger.getLogger(FeedIntakeOperatorNodePushable.class.getName());
+ // TODO: Make configurable https://issues.apache.org/jira/browse/ASTERIXDB-2065
+ public static final int DEFAULT_ABORT_TIMEOUT = 10000;
private final FeedIntakeOperatorDescriptor opDesc;
private final FeedAdapter adapter;
private boolean poisoned = false;
@@ -121,9 +124,19 @@
LOGGER.info(runtimeId + " aborting...");
synchronized (this) {
poisoned = true;
- if (!adapter.stop()) {
- LOGGER.info(runtimeId + " failed to stop adapter. interrupting the thread...");
- taskThread.interrupt();
+ try {
+ if (!adapter.stop(DEFAULT_ABORT_TIMEOUT)) {
+ LOGGER.info(runtimeId + " failed to stop adapter. interrupting the thread...");
+ taskThread.interrupt();
+ }
+ } catch (HyracksDataException hde) {
+ if (hde.getComponent() == ErrorCode.HYRACKS && hde.getErrorCode() == ErrorCode.TIMEOUT) {
+ LOGGER.log(Level.WARNING, runtimeId + " stop adapter timed out. interrupting the thread...", hde);
+ taskThread.interrupt();
+ } else {
+ LOGGER.log(Level.WARNING, "Failure during attempt to stop " + runtimeId, hde);
+ throw hde;
+ }
}
}
}
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapter.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapter.java
index 2273bea..fcd010d 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapter.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapter.java
@@ -137,7 +137,7 @@
}
@Override
- public boolean stop() {
+ public boolean stop(long timeout) {
generator.stop();
return true;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index b054faf..ff98efa 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -124,6 +124,7 @@
public static final int CANNOT_MODIFY_INDEX_DISK_IS_FULL = 88;
public static final int GROUP_BY_MEMORY_BUDGET_EXCEEDS = 89;
public static final int ILLEGAL_MEMORY_BUDGET = 90;
+ public static final int TIMEOUT = 91;
// Compilation error codes.
public static final int RULECOLLECTION_NOT_INSTANCE_OF_LIST = 10000;
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index 1d2143b..6d4ccdb 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -107,5 +107,6 @@
88 = Cannot modify index (Disk is full)
89 = The byte size of a single group (%1$s bytes) exceeds the budget for a group by operator (%2$s bytes)
90 = Memory budget for the %1$s operator (%2$s bytes) is lower than the minimum (%3$s bytes)
+91 = Operation timed out
10000 = The given rule collection %1$s is not an instance of the List class.