Prevent hangs on active runtime stop
Change-Id: I2e60f633cac8e835dcc7211e87d104ecbb8947b0
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1608
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
index 577da5e..46da770 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
@@ -20,6 +20,8 @@
import java.util.List;
import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
import org.apache.asterix.common.api.IAppRuntimeContext;
import org.apache.asterix.common.library.ILibraryManager;
@@ -52,6 +54,7 @@
public class GenericAdapterFactory implements IIndexingAdapterFactory, IAdapterFactory {
private static final long serialVersionUID = 1L;
+ private static final Logger LOGGER = Logger.getLogger(GenericAdapterFactory.class.getName());
private IExternalDataSourceFactory dataSourceFactory;
private IDataParserFactory dataParserFactory;
private ARecordType recordType;
@@ -90,6 +93,7 @@
try {
restoreExternalObjects(appCtx.getLibraryManager());
} catch (Exception e) {
+ LOGGER.log(Level.INFO, "Failure restoring external objects", e);
throw HyracksDataException.create(e);
}
if (isFeed) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataFlowController.java
index 33f262a..def0bf1 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataFlowController.java
@@ -24,7 +24,7 @@
public interface IDataFlowController {
//TODO: Refactor this interface. Remove writer from start() signature
- public void start(IFrameWriter writer) throws HyracksDataException;
+ public void start(IFrameWriter writer) throws HyracksDataException, InterruptedException;
public default boolean pause() throws HyracksDataException {
throw new HyracksDataException("Method not implemented");
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataSourceAdapter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataSourceAdapter.java
index e37f2b1..48df79b 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataSourceAdapter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataSourceAdapter.java
@@ -21,6 +21,7 @@
import java.io.Serializable;
import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
/**
* A super interface implemented by a data source adapter. An adapter can be a
@@ -47,5 +48,5 @@
* operator using the instance of IFrameWriter.
* @throws Exception
*/
- public void start(int partition, IFrameWriter writer) throws Exception;
+ public void start(int partition, IFrameWriter writer) throws HyracksDataException, InterruptedException;
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
index 87ee167..213231b 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
@@ -60,5 +60,5 @@
@Override
public abstract boolean stop() throws HyracksDataException;
- public abstract boolean handleException(Throwable th);
+ public abstract boolean handleException(Throwable th) throws HyracksDataException;
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
index 7ba3ae4..1b12dc1 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
@@ -51,8 +51,7 @@
}
@Override
- public void start(IFrameWriter writer) throws HyracksDataException {
- HyracksDataException hde = null;
+ public void start(IFrameWriter writer) throws HyracksDataException, InterruptedException {
try {
failed = false;
tupleForwarder.initialize(ctx, writer);
@@ -69,13 +68,24 @@
} catch (InterruptedException e) {
//TODO: Find out what could cause an interrupted exception beside termination of a job/feed
LOGGER.warn("Feed has been interrupted. Closing the feed", e);
- Thread.currentThread().interrupt();
+ failed = true;
+ try {
+ finish();
+ } catch (HyracksDataException hde) {
+ e.addSuppressed(hde);
+ }
+ throw e;
} catch (Exception e) {
failed = true;
tupleForwarder.flush();
LOGGER.warn("Failure while operating a feed source", e);
throw HyracksDataException.create(e);
}
+ finish();
+ }
+
+ private void finish() throws HyracksDataException {
+ HyracksDataException hde = null;
try {
tupleForwarder.close();
} catch (Throwable th) {
@@ -162,9 +172,12 @@
}
@Override
- public boolean handleException(Throwable th) {
+ public boolean handleException(Throwable th) throws HyracksDataException {
// This is not a parser record. most likely, this error happened in the record reader.
- return recordReader.handleException(th);
+ if (!recordReader.handleException(th)) {
+ finish();
+ }
+ return closed.get();
}
public IRecordReader<T> getReader() {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
index d1bde71..8d80e6f 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
@@ -32,7 +32,7 @@
}
@Override
- public void start(int partition, IFrameWriter writer) throws HyracksDataException {
+ public void start(int partition, IFrameWriter writer) throws HyracksDataException, InterruptedException {
controller.start(writer);
}
@@ -40,7 +40,7 @@
return controller.stop();
}
- public boolean handleException(Throwable e) {
+ public boolean handleException(Throwable e) throws HyracksDataException {
return controller.handleException(e);
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java
index 3ab370e..0681d71 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java
@@ -33,7 +33,7 @@
}
@Override
- public void start(int partition, IFrameWriter writer) throws HyracksDataException {
+ public void start(int partition, IFrameWriter writer) throws HyracksDataException, InterruptedException {
controller.start(writer);
}
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java
index 2adce1c..c71b8a2 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java
@@ -20,6 +20,7 @@
import org.apache.asterix.external.dataset.adapter.FeedAdapter;
import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.log4j.Logger;
/**
@@ -43,31 +44,45 @@
@Override
public void run() {
// Start by getting the partition number from the manager
- int partition = adapterManager.getPartition();
if (LOGGER.isInfoEnabled()) {
- LOGGER.info("Starting ingestion for partition:" + partition);
+ LOGGER.info("Starting ingestion for partition:" + adapterManager.getPartition());
}
+ boolean failed = false;
+ try {
+ failed = doRun();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } catch (Exception e) {
+ failed = true;
+ LOGGER.error("Unhandled Exception", e);
+ } finally {
+ // Done with the adapter. about to close, setting the stage based on the failed ingestion flag and notifying
+ // the runtime manager
+ adapterManager.setFailed(failed);
+ adapterManager.setDone(true);
+ synchronized (adapterManager) {
+ adapterManager.notifyAll();
+ }
+ }
+ }
+
+ private boolean doRun() throws HyracksDataException, InterruptedException {
boolean continueIngestion = true;
boolean failedIngestion = false;
while (continueIngestion) {
try {
// Start the adapter
- adapter.start(partition, writer);
+ adapter.start(adapterManager.getPartition(), writer);
// Adapter has completed execution
continueIngestion = false;
+ } catch (InterruptedException e) {
+ throw e;
} catch (Exception e) {
LOGGER.error("Exception during feed ingestion ", e);
continueIngestion = adapter.handleException(e);
failedIngestion = !continueIngestion;
}
}
- // Done with the adapter. about to close, setting the stage based on the failed ingestion flag and notifying the
- // runtime manager
- adapterManager.setFailed(failedIngestion);
- adapterManager.setDone(true);
- synchronized (adapterManager) {
- adapterManager.notifyAll();
- }
+ return failedIngestion;
}
-
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java
index 7f5372b..6214d9f 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java
@@ -19,12 +19,15 @@
package org.apache.asterix.external.feed.runtime;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
import org.apache.asterix.active.EntityId;
import org.apache.asterix.external.dataset.adapter.FeedAdapter;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.log4j.Logger;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
/**
* This class manages the execution of an adapter within a feed
@@ -43,15 +46,14 @@
private final IHyracksTaskContext ctx;
- private IngestionRuntime ingestionRuntime; // Runtime representing the ingestion stage of a feed
-
private Future<?> execution;
+ private boolean started = false;
private volatile boolean done = false;
private volatile boolean failed = false;
public AdapterRuntimeManager(IHyracksTaskContext ctx, EntityId entityId, FeedAdapter feedAdapter,
- IFrameWriter writer, int partition) {
+ IFrameWriter writer, int partition) {
this.ctx = ctx;
this.feedId = entityId;
this.feedAdapter = feedAdapter;
@@ -60,23 +62,42 @@
}
public void start() {
- execution = ctx.getExecutorService().submit(adapterExecutor);
+ synchronized (adapterExecutor) {
+ started = true;
+ if (!done) {
+ execution = ctx.getExecutorService().submit(adapterExecutor);
+ } else {
+ LOGGER.log(Level.WARNING, "Someone stopped me before I even start. I will simply not start");
+ }
+ }
}
- public void stop() throws InterruptedException {
- try {
- if (feedAdapter.stop()) {
- // stop() returned true, we wait for the process termination
- execution.get();
- } else {
- // stop() returned false, we try to force shutdown
- execution.cancel(true);
+ public void stop() throws HyracksDataException, InterruptedException {
+ synchronized (adapterExecutor) {
+ try {
+ if (started) {
+ try {
+ ctx.getExecutorService().submit(() -> {
+ if (feedAdapter.stop()) {
+ execution.get();
+ }
+ return null;
+ }).get(30, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ LOGGER.log(Level.WARNING, "Interrupted while trying to stop an adapter runtime", e);
+ throw e;
+ } catch (Exception e) {
+ LOGGER.log(Level.WARNING, "Exception while trying to stop an adapter runtime", e);
+ throw HyracksDataException.create(e);
+ } finally {
+ execution.cancel(true);
+ }
+ } else {
+ LOGGER.log(Level.WARNING, "Adapter executor was stopped before it starts");
+ }
+ } finally {
+ done = true;
}
- } catch (InterruptedException e) {
- LOGGER.error("Interrupted while waiting for feed adapter to finish its work", e);
- throw e;
- } catch (Exception exception) {
- LOGGER.error("Unable to stop adapter " + feedAdapter, exception);
}
}
@@ -101,10 +122,6 @@
return partition;
}
- public IngestionRuntime getIngestionRuntime() {
- return ingestionRuntime;
- }
-
public boolean isFailed() {
return failed;
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
index 3100704..590af01 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
@@ -24,6 +24,7 @@
import org.apache.asterix.active.ActiveRuntimeId;
import org.apache.asterix.active.EntityId;
import org.apache.asterix.active.IActiveRuntime;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
public class IngestionRuntime implements IActiveRuntime {
@@ -50,7 +51,7 @@
}
@Override
- public void stop() throws InterruptedException {
+ public void stop() throws InterruptedException, HyracksDataException {
adapterRuntimeManager.stop();
LOGGER.log(Level.INFO, "Feed " + feedId.getEntityName() + " stopped on partition " + runtimeId);
}