[NO ISSUE][ING] Close and return on interrupt of ingestion
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Previously, we would still attempt to write through the network
in case an ingestion task is interrupted.
- The goal was to try and get as much data in as possible but in
the case where the cluster was in a bad state, this could
lead to hanging threads.
- After this change, each record reader must implement the stop
method correctly to allow for graceful stop while interrupts
will always mean abort the task and return as soon as possible.
Change-Id: I6119617d133fb161a48b39f9812ec79e0189975b
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2428
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index 1a83603..04054f1 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -250,7 +250,6 @@
public static final int ACTIVE_RUNTIME_IS_ALREADY_REGISTERED = 3105;
public static final int ACTIVE_RUNTIME_IS_NOT_REGISTERED = 3106;
public static final int ACTIVE_EVENT_HANDLER_ALREADY_SUSPENDED = 3107;
- public static final int FEED_STOPPED_WHILE_WAITING_FOR_A_NEW_RECORD = 3108;
public static final int METADATA_DROP_FUCTION_IN_USE = 3109;
public static final int FEED_FAILED_WHILE_GETTING_A_NEW_RECORD = 3110;
public static final int FEED_START_FEED_WITHOUT_CONNECTION = 3111;
diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index 995b541..1fb8480 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -239,7 +239,6 @@
3105 = %1$s is already registered
3106 = %1$s is not registered
3107 = Active Notification Handler is already suspended
-3108 = Feed stopped while waiting for a new record
3109 = Function %1$s is being used. It cannot be dropped
3110 = Feed failed while reading a new record
3111 = Feed %1$s is not connected to any dataset
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
index 306a2a5..9826be7 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
@@ -95,11 +95,7 @@
}
} catch (HyracksDataException e) {
LOGGER.log(Level.WARN, "Exception during ingestion", e);
- //if interrupted while waiting for a new record, then it is safe to not fail forward
if (e.getComponent() == ErrorCode.ASTERIX
- && (e.getErrorCode() == ErrorCode.FEED_STOPPED_WHILE_WAITING_FOR_A_NEW_RECORD)) {
- // Do nothing. interrupted by the active manager
- } else if (e.getComponent() == ErrorCode.ASTERIX
&& (e.getErrorCode() == ErrorCode.FEED_FAILED_WHILE_GETTING_A_NEW_RECORD)) {
// Failure but we know we can for sure push the previously parsed records safely
failure = e;
@@ -141,11 +137,8 @@
private IRawRecord<? extends T> next() throws Exception {
try {
return recordReader.next();
- } catch (InterruptedException e) { // NOSONAR Gracefully handling interrupt to push records in the pipeline
- if (flushing) {
- throw e;
- }
- throw new RuntimeDataException(ErrorCode.FEED_STOPPED_WHILE_WAITING_FOR_A_NEW_RECORD, e);
+ } catch (InterruptedException e) {
+ throw e;
} catch (Exception e) {
if (flushing) {
throw e;
@@ -161,11 +154,8 @@
while (true) {
try {
return recordReader.hasNext();
- } catch (InterruptedException e) { // NOSONAR Gracefully handling interrupt to push records in the pipeline
- if (flushing) {
- throw e;
- }
- throw new RuntimeDataException(ErrorCode.FEED_STOPPED_WHILE_WAITING_FOR_A_NEW_RECORD, e);
+ } catch (InterruptedException e) {
+ throw e;
} catch (Exception e) {
if (flushing) {
throw e;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPullRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPullRecordReader.java
index 5a7b4b9..bb9d8c9 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPullRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPullRecordReader.java
@@ -27,6 +27,7 @@
import org.apache.asterix.external.input.record.GenericRecord;
import org.apache.asterix.external.util.FeedLogManager;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+
import twitter4j.Query;
import twitter4j.QueryResult;
import twitter4j.Status;
@@ -43,6 +44,7 @@
private int nextTweetIndex = 0;
private long lastTweetIdReceived = 0;
private GenericRecord<String> record;
+ private boolean stopped = false;
public TwitterPullRecordReader(Twitter twitter, String keywords, int requestInterval) {
this.twitter = twitter;
@@ -59,7 +61,7 @@
@Override
public boolean hasNext() throws Exception {
- return true;
+ return !stopped;
}
@Override
@@ -90,7 +92,8 @@
@Override
public boolean stop() {
- return false;
+ stopped = true;
+ return true;
}
@Override
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
index e1c339c..6c95cc2 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
@@ -44,7 +44,7 @@
public class FeedIntakeOperatorNodePushable extends ActiveSourceOperatorNodePushable {
private static final Logger LOGGER = LogManager.getLogger();
// TODO: Make configurable https://issues.apache.org/jira/browse/ASTERIXDB-2065
- public static final int DEFAULT_ABORT_TIMEOUT = 10000;
+ public static final int DEFAULT_ABORT_TIMEOUT = 60000;
private final FeedIntakeOperatorDescriptor opDesc;
private final FeedAdapter adapter;
private boolean poisoned = false;