[ASTERIXDB-2064][ING] Timeout Stop Feed

- user model changes: no
- storage format changes: no
- interface changes: no

details:
- The abort feed message stops the reader and wait for the
  dataflow controller to signal end of life.
- If the reader returns true to stop but the dataflow controller
  never signal ends, it can get stuck.
- This change adds a timeout after which, the task thread is
  interrupted.

Change-Id: If609a8343767ee7a80689a58af35a1b3fca2964b
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1964
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataFlowController.java
index 7412338..f59b82e 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataFlowController.java
@@ -40,7 +40,7 @@
         throw new RuntimeDataException(ErrorCode.OPERATION_NOT_SUPPORTED);
     }
 
-    public default boolean stop() throws HyracksDataException {
+    public default boolean stop(long timeout) throws HyracksDataException {
         throw new RuntimeDataException(ErrorCode.OPERATION_NOT_SUPPORTED);
     }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
index e24c26d..4ed1b08 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
@@ -213,16 +213,24 @@
         }
     }
 
-    private void waitForSignal() throws InterruptedException {
+    private void waitForSignal(long timeout) throws InterruptedException, HyracksDataException {
+        if (timeout <= 0) {
+            throw new IllegalArgumentException("timeout must be greater than 0");
+        }
         synchronized (closed) {
             while (!closed.get()) {
-                closed.wait();
+                long before = System.currentTimeMillis();
+                closed.wait(timeout);
+                timeout -= System.currentTimeMillis() - before;
+                if (!closed.get() && timeout <= 0) {
+                    throw HyracksDataException.create(org.apache.hyracks.api.exceptions.ErrorCode.TIMEOUT);
+                }
             }
         }
     }
 
     @Override
-    public boolean stop() throws HyracksDataException {
+    public boolean stop(long timeout) throws HyracksDataException {
         synchronized (this) {
             switch (state) {
                 case CREATED:
@@ -238,7 +246,7 @@
         }
         if (recordReader.stop()) {
             try {
-                waitForSignal();
+                waitForSignal(timeout);
             } catch (InterruptedException e) {
                 Thread.currentThread().interrupt();
                 throw HyracksDataException.create(e);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java
index 1f1f545..025520e 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java
@@ -71,7 +71,7 @@
     }
 
     @Override
-    public boolean stop() throws HyracksDataException {
+    public boolean stop(long timeout) throws HyracksDataException {
         try {
             if (stream.stop()) {
                 return true;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
index eeda80c..fd9db7e 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
@@ -35,8 +35,8 @@
         controller.start(writer);
     }
 
-    public boolean stop() throws HyracksDataException {
-        return controller.stop();
+    public boolean stop(long timeout) throws HyracksDataException {
+        return controller.stop(timeout);
     }
 
     public boolean pause() throws HyracksDataException {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
index 16b8fba..7907e69 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
@@ -30,6 +30,7 @@
 import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.util.HyracksConstants;
 import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender;
@@ -42,6 +43,8 @@
  */
 public class FeedIntakeOperatorNodePushable extends ActiveSourceOperatorNodePushable {
     private static final Logger LOGGER = Logger.getLogger(FeedIntakeOperatorNodePushable.class.getName());
+    // TODO: Make configurable https://issues.apache.org/jira/browse/ASTERIXDB-2065
+    public static final int DEFAULT_ABORT_TIMEOUT = 10000;
     private final FeedIntakeOperatorDescriptor opDesc;
     private final FeedAdapter adapter;
     private boolean poisoned = false;
@@ -121,9 +124,19 @@
         LOGGER.info(runtimeId + " aborting...");
         synchronized (this) {
             poisoned = true;
-            if (!adapter.stop()) {
-                LOGGER.info(runtimeId + " failed to stop adapter. interrupting the thread...");
-                taskThread.interrupt();
+            try {
+                if (!adapter.stop(DEFAULT_ABORT_TIMEOUT)) {
+                    LOGGER.info(runtimeId + " failed to stop adapter. interrupting the thread...");
+                    taskThread.interrupt();
+                }
+            } catch (HyracksDataException hde) {
+                if (hde.getComponent() == ErrorCode.HYRACKS && hde.getErrorCode() == ErrorCode.TIMEOUT) {
+                    LOGGER.log(Level.WARNING, runtimeId + " stop adapter timed out. interrupting the thread...", hde);
+                    taskThread.interrupt();
+                } else {
+                    LOGGER.log(Level.WARNING, "Failure during attempt to stop " + runtimeId, hde);
+                    throw hde;
+                }
             }
         }
     }
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapter.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapter.java
index 2273bea..fcd010d 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapter.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapter.java
@@ -137,7 +137,7 @@
     }
 
     @Override
-    public boolean stop() {
+    public boolean stop(long timeout) {
         generator.stop();
         return true;
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index b054faf..ff98efa 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -124,6 +124,7 @@
     public static final int CANNOT_MODIFY_INDEX_DISK_IS_FULL = 88;
     public static final int GROUP_BY_MEMORY_BUDGET_EXCEEDS = 89;
     public static final int ILLEGAL_MEMORY_BUDGET = 90;
+    public static final int TIMEOUT = 91;
 
     // Compilation error codes.
     public static final int RULECOLLECTION_NOT_INSTANCE_OF_LIST = 10000;
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index 1d2143b..6d4ccdb 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -107,5 +107,6 @@
 88 = Cannot modify index (Disk is full)
 89 = The byte size of a single group (%1$s bytes) exceeds the budget for a group by operator (%2$s bytes)
 90 = Memory budget for the %1$s operator (%2$s bytes) is lower than the minimum (%3$s bytes)
+91 = Operation timed out
 
 10000 = The given rule collection %1$s is not an instance of the List class.