Prevent case where drop channel hangs indefinitely
Change-Id: I7ed8efea454c19b2d0b86f01b196bb361d35450f
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java b/asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java
index 548f1ba..3aca099 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java
@@ -19,38 +19,39 @@
package org.apache.asterix.bad;
public interface BADConstants {
- public static final String SubscriptionId = "subscriptionId";
- public static final String BrokerName = "BrokerName";
- public static final String ChannelName = "ChannelName";
- public static final String ProcedureName = "ProcedureName";
- public static final String DataverseName = "DataverseName";
- public static final String BrokerEndPoint = "BrokerEndPoint";
- public static final String DeliveryTime = "deliveryTime";
- public static final String ResultId = "resultId";
- public static final String ChannelExecutionTime = "channelExecutionTime";
- public static final String ChannelSubscriptionsType = "ChannelSubscriptionsType";
- public static final String ChannelResultsType = "ChannelResultsType";
- public static final String ResultsDatasetName = "ResultsDatasetName";
- public static final String SubscriptionsDatasetName = "SubscriptionsDatasetName";
- public static final String CHANNEL_EXTENSION_NAME = "Channel";
- public static final String PROCEDURE_KEYWORD = "Procedure";
- public static final String BROKER_KEYWORD = "Broker";
- public static final String RECORD_TYPENAME_BROKER = "BrokerRecordType";
- public static final String RECORD_TYPENAME_CHANNEL = "ChannelRecordType";
- public static final String RECORD_TYPENAME_PROCEDURE = "ProcedureRecordType";
- public static final String subscriptionEnding = "Subscriptions";
- public static final String resultsEnding = "Results";
- public static final String BAD_METADATA_EXTENSION_NAME = "BADMetadataExtension";
- public static final String BAD_DATAVERSE_NAME = "Metadata";
- public static final String Duration = "Duration";
- public static final String Function = "Function";
- public static final String FIELD_NAME_ARITY = "Arity";
- public static final String FIELD_NAME_PARAMS = "Params";
- public static final String FIELD_NAME_RETURN_TYPE = "ReturnType";
- public static final String FIELD_NAME_DEFINITION = "Definition";
- public static final String FIELD_NAME_LANGUAGE = "Language";
+ String SubscriptionId = "subscriptionId";
+ String BrokerName = "BrokerName";
+ String ChannelName = "ChannelName";
+ String ProcedureName = "ProcedureName";
+ String DataverseName = "DataverseName";
+ String BrokerEndPoint = "BrokerEndPoint";
+ String DeliveryTime = "deliveryTime";
+ String ResultId = "resultId";
+ String ChannelExecutionTime = "channelExecutionTime";
+ String ChannelSubscriptionsType = "ChannelSubscriptionsType";
+ String ChannelResultsType = "ChannelResultsType";
+ String ResultsDatasetName = "ResultsDatasetName";
+ String SubscriptionsDatasetName = "SubscriptionsDatasetName";
+ String CHANNEL_EXTENSION_NAME = "Channel";
+ String PROCEDURE_KEYWORD = "Procedure";
+ String BROKER_KEYWORD = "Broker";
+ String RECORD_TYPENAME_BROKER = "BrokerRecordType";
+ String RECORD_TYPENAME_CHANNEL = "ChannelRecordType";
+ String RECORD_TYPENAME_PROCEDURE = "ProcedureRecordType";
+ String subscriptionEnding = "Subscriptions";
+ String resultsEnding = "Results";
+ String BAD_METADATA_EXTENSION_NAME = "BADMetadataExtension";
+ String BAD_DATAVERSE_NAME = "Metadata";
+ String Duration = "Duration";
+ String Function = "Function";
+ String FIELD_NAME_ARITY = "Arity";
+ String FIELD_NAME_PARAMS = "Params";
+ String FIELD_NAME_RETURN_TYPE = "ReturnType";
+ String FIELD_NAME_DEFINITION = "Definition";
+ String FIELD_NAME_LANGUAGE = "Language";
//To enable new Asterix TxnId for separate deployed job spec invocations
- public static final byte[] TRANSACTION_ID_PARAMETER_NAME = "TxnIdParameter".getBytes();
+ byte[] TRANSACTION_ID_PARAMETER_NAME = "TxnIdParameter".getBytes();
+ int EXECUTOR_TIMEOUT = 20;
public enum ChannelJobType {
REPETITIVE
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
index 2b189be..80355c0 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
@@ -122,7 +122,11 @@
} else {
listener.getExecutorService().shutdown();
- listener.getExecutorService().awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
+ if (!listener.getExecutorService().awaitTermination(BADConstants.EXECUTOR_TIMEOUT, TimeUnit.SECONDS)) {
+ LOGGER.log(Level.SEVERE,
+ "Executor Service is terminating non-gracefully for: " + entityId.getExtensionName() + " "
+ + entityId.getDataverse() + "." + entityId.getEntityName());
+ }
DeployedJobSpecId deployedJobSpecId = listener.getDeployedJobSpecId();
listener.deActivate();
activeEventHandler.unregisterListener(listener);
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java
index 18e769d..1555bea 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java
@@ -120,7 +120,12 @@
} else {
if (listener.getExecutorService() != null) {
listener.getExecutorService().shutdown();
- listener.getExecutorService().awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
+ if (!listener.getExecutorService().awaitTermination(BADConstants.EXECUTOR_TIMEOUT,
+ TimeUnit.SECONDS)) {
+ LOGGER.log(Level.SEVERE,
+ "Executor Service is terminating non-gracefully for: " + entityId.getExtensionName()
+ + " " + entityId.getDataverse() + "." + entityId.getEntityName());
+ }
}
DeployedJobSpecId deployedJobSpecId = listener.getDeployedJobSpecId();
listener.deActivate();