Remove use of static ctx
Change-Id: I758f50772823d7b1935e4d61a6cb2805ba0808ea
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorFactory.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorFactory.java
index 2a11e13..0360ee3 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorFactory.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorFactory.java
@@ -23,6 +23,7 @@
import org.apache.asterix.app.translator.DefaultStatementExecutorFactory;
import org.apache.asterix.app.translator.QueryTranslator;
import org.apache.asterix.common.context.IStorageComponentProvider;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.compiler.provider.ILangCompilationProvider;
import org.apache.asterix.lang.common.base.Statement;
import org.apache.asterix.translator.SessionConfig;
@@ -30,9 +31,9 @@
public class BADQueryTranslatorFactory extends DefaultStatementExecutorFactory {
@Override
- public QueryTranslator create(List<Statement> statements, SessionConfig conf,
+ public QueryTranslator create(ICcApplicationContext appCtx, List<Statement> statements, SessionConfig conf,
ILangCompilationProvider compilationProvider, IStorageComponentProvider storageComponentProvider) {
- return new BADStatementExecutor(statements, conf, compilationProvider, storageComponentProvider,
+ return new BADStatementExecutor(appCtx, statements, conf, compilationProvider, storageComponentProvider,
executorService);
}
}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADRuleSetFactory.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADRuleSetFactory.java
index 7e45fd6..8d4b1e5 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADRuleSetFactory.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADRuleSetFactory.java
@@ -22,6 +22,7 @@
import java.util.List;
import org.apache.asterix.bad.rules.InsertBrokerNotifierForChannelRule;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.compiler.provider.DefaultRuleSetFactory;
import org.apache.asterix.compiler.provider.IRuleSetFactory;
import org.apache.asterix.optimizer.base.RuleCollections;
@@ -35,11 +36,12 @@
public class BADRuleSetFactory implements IRuleSetFactory {
@Override
- public List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> getLogicalRewrites()
- throws AlgebricksException {
- List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> logicalRuleSet = DefaultRuleSetFactory.buildLogical();
+ public List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> getLogicalRewrites(
+ ICcApplicationContext appCtx) throws AlgebricksException {
+ List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> logicalRuleSet =
+ DefaultRuleSetFactory.buildLogical(appCtx);
- List<IAlgebraicRewriteRule> normalizationCollection = RuleCollections.buildNormalizationRuleCollection();
+ List<IAlgebraicRewriteRule> normalizationCollection = RuleCollections.buildNormalizationRuleCollection(appCtx);
List<IAlgebraicRewriteRule> alteredNormalizationCollection = new ArrayList<>();
alteredNormalizationCollection.addAll(normalizationCollection);
@@ -54,7 +56,7 @@
//Find instances of the normalization collection and replace them with the new one
SequentialOnceRuleController seqOnceCtrl = new SequentialOnceRuleController(true);
- for (int i =0; i < logicalRuleSet.size(); i++){
+ for (int i = 0; i < logicalRuleSet.size(); i++) {
List<IAlgebraicRewriteRule> collection = logicalRuleSet.get(i).second;
if (collection.size() == normalizationCollection.size()) {
boolean isNormalizationCollection = true;
@@ -75,7 +77,8 @@
}
@Override
- public List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> getPhysicalRewrites() {
- return DefaultRuleSetFactory.buildPhysical();
+ public List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> getPhysicalRewrites(
+ ICcApplicationContext appCtx) {
+ return DefaultRuleSetFactory.buildPhysical(appCtx);
}
}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
index f62a7e0..0f2e212 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
@@ -29,6 +29,7 @@
import org.apache.asterix.bad.metadata.Channel;
import org.apache.asterix.bad.metadata.Procedure;
import org.apache.asterix.common.context.IStorageComponentProvider;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.functions.FunctionSignature;
import org.apache.asterix.compiler.provider.ILangCompilationProvider;
import org.apache.asterix.lang.common.base.Statement;
@@ -42,13 +43,12 @@
public class BADStatementExecutor extends QueryTranslator {
- public BADStatementExecutor(List<Statement> statements, SessionConfig conf,
+ public BADStatementExecutor(ICcApplicationContext appCtx, List<Statement> statements, SessionConfig conf,
ILangCompilationProvider compliationProvider, IStorageComponentProvider storageComponentProvider,
ExecutorService executorService) {
- super(statements, conf, compliationProvider, storageComponentProvider, executorService);
+ super(appCtx, statements, conf, compliationProvider, storageComponentProvider, executorService);
}
-
@Override
protected void handleDataverseDropStatement(MetadataProvider metadataProvider, Statement stmt,
IHyracksClientConnection hcc) throws Exception {
@@ -59,7 +59,7 @@
metadataProvider.setMetadataTxnContext(mdTxnCtx);
Identifier dvId = ((DataverseDropStatement) stmt).getDataverseName();
List<Broker> brokers = BADLangExtension.getBrokers(mdTxnCtx, dvId.getValue());
- MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getDefaultDataverse(),
+ MetadataProvider tempMdProvider = new MetadataProvider(appCtx, metadataProvider.getDefaultDataverse(),
metadataProvider.getStorageComponentProvider());
tempMdProvider.setConfig(metadataProvider.getConfig());
for (Broker broker : brokers) {
@@ -70,8 +70,8 @@
List<Channel> channels = BADLangExtension.getChannels(mdTxnCtx, dvId.getValue());
for (Channel channel : channels) {
tempMdProvider.getLocks().reset();
- ChannelDropStatement drop = new ChannelDropStatement(dvId,
- new Identifier(channel.getChannelId().getEntityName()), false);
+ ChannelDropStatement drop =
+ new ChannelDropStatement(dvId, new Identifier(channel.getChannelId().getEntityName()), false);
drop.handle(this, tempMdProvider, hcc, null, null, null, 0);
}
List<Procedure> procedures = BADLangExtension.getProcedures(mdTxnCtx, dvId.getValue());
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
index 854ae07..1b655da 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
@@ -19,6 +19,7 @@
package org.apache.asterix.bad.lang.statement;
import org.apache.asterix.active.ActiveJobNotificationHandler;
+import org.apache.asterix.active.ActiveLifecycleListener;
import org.apache.asterix.active.EntityId;
import org.apache.asterix.algebra.extension.IExtensionStatement;
import org.apache.asterix.app.translator.QueryTranslator;
@@ -26,6 +27,7 @@
import org.apache.asterix.bad.lang.BADLangExtension;
import org.apache.asterix.bad.metadata.Channel;
import org.apache.asterix.bad.metadata.PrecompiledJobEventListener;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.lang.common.statement.DropDatasetStatement;
import org.apache.asterix.lang.common.struct.Identifier;
@@ -85,12 +87,14 @@
public void handle(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats,
int resultSetIdCounter) throws HyracksDataException, AlgebricksException {
-
String dataverse = ((QueryTranslator) statementExecutor).getActiveDataverse(dataverseName);
boolean txnActive = false;
EntityId entityId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, dataverse, channelName.getValue());
- PrecompiledJobEventListener listener = (PrecompiledJobEventListener) ActiveJobNotificationHandler.INSTANCE
- .getActiveEntityListener(entityId);
+ ICcApplicationContext appCtx = metadataProvider.getApplicationContext();
+ ActiveLifecycleListener activeListener = (ActiveLifecycleListener) appCtx.getActiveLifecycleListener();
+ ActiveJobNotificationHandler activeEventHandler = activeListener.getNotificationHandler();
+ PrecompiledJobEventListener listener =
+ (PrecompiledJobEventListener) activeEventHandler.getActiveEntityListener(entityId);
Channel channel = null;
MetadataTransactionContext mdTxnCtx = null;
@@ -111,13 +115,13 @@
listener.getExecutorService().shutdownNow();
JobId hyracksJobId = listener.getJobId();
listener.deActivate();
- ActiveJobNotificationHandler.INSTANCE.removeListener(listener);
+ activeEventHandler.removeListener(listener);
if (hyracksJobId != null) {
hcc.destroyJob(hyracksJobId);
}
//Create a metadata provider to use in nested jobs.
- MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getDefaultDataverse(),
+ MetadataProvider tempMdProvider = new MetadataProvider(appCtx, metadataProvider.getDefaultDataverse(),
metadataProvider.getStorageComponentProvider());
tempMdProvider.setConfig(metadataProvider.getConfig());
//Drop the Channel Datasets
@@ -131,7 +135,6 @@
new Identifier(channel.getSubscriptionsDataset()), true);
((QueryTranslator) statementExecutor).handleDatasetDropStatement(tempMdProvider, dropStmt, hcc);
-
//Remove the Channel Metadata
MetadataManager.INSTANCE.deleteEntity(mdTxnCtx, channel);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
index 305ca20..60f871e 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
@@ -128,8 +128,7 @@
int resultSetIdCounter) throws HyracksDataException, AlgebricksException {
String dataverse = ((QueryTranslator) statementExecutor).getActiveDataverse(dataverseName);
- String brokerDataverse = ((QueryTranslator) statementExecutor)
-.getActiveDataverse(brokerDataverseName);
+ String brokerDataverse = ((QueryTranslator) statementExecutor).getActiveDataverse(brokerDataverseName);
MetadataTransactionContext mdTxnCtx = null;
try {
@@ -153,7 +152,7 @@
Query subscriptionTuple = new Query(false);
- List<FieldBinding> fb = new ArrayList<FieldBinding>();
+ List<FieldBinding> fb = new ArrayList<>();
LiteralExpr leftExpr = new LiteralExpr(new StringLiteral(BADConstants.DataverseName));
Expression rightExpr = new LiteralExpr(new StringLiteral(brokerDataverse));
fb.add(new FieldBinding(leftExpr, rightExpr));
@@ -165,11 +164,11 @@
if (subscriptionId != null) {
leftExpr = new LiteralExpr(new StringLiteral(BADConstants.SubscriptionId));
- List<Expression> UUIDList = new ArrayList<Expression>();
+ List<Expression> UUIDList = new ArrayList<>();
UUIDList.add(new LiteralExpr(new StringLiteral(subscriptionId)));
FunctionIdentifier function = BuiltinFunctions.UUID_CONSTRUCTOR;
- FunctionSignature UUIDfunc = new FunctionSignature(function.getNamespace(), function.getName(),
- function.getArity());
+ FunctionSignature UUIDfunc =
+ new FunctionSignature(function.getNamespace(), function.getName(), function.getArity());
CallExpr UUIDCall = new CallExpr(UUIDfunc, UUIDList);
rightExpr = UUIDCall;
@@ -186,8 +185,8 @@
subscriptionTuple.setVarCounter(varCounter);
- MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getDefaultDataverse(),
- metadataProvider.getStorageComponentProvider());
+ MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getApplicationContext(),
+ metadataProvider.getDefaultDataverse(), metadataProvider.getStorageComponentProvider());
tempMdProvider.setConfig(metadataProvider.getConfig());
if (subscriptionId == null) {
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
index 0cc96ad..60de69e 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
@@ -129,12 +129,12 @@
condition.setCurrentop(true);
condition.addOperator("=");
- List<Expression> UUIDList = new ArrayList<Expression>();
+ List<Expression> UUIDList = new ArrayList<>();
UUIDList.add(new LiteralExpr(new StringLiteral(subscriptionId)));
FunctionIdentifier function = BuiltinFunctions.UUID_CONSTRUCTOR;
- FunctionSignature UUIDfunc = new FunctionSignature(function.getNamespace(), function.getName(),
- function.getArity());
+ FunctionSignature UUIDfunc =
+ new FunctionSignature(function.getNamespace(), function.getName(), function.getArity());
CallExpr UUIDCall = new CallExpr(UUIDfunc, UUIDList);
condition.addOperand(UUIDCall);
@@ -143,8 +143,8 @@
new Identifier(subscriptionsDatasetName), condition, varCounter);
SqlppDeleteRewriteVisitor visitor = new SqlppDeleteRewriteVisitor();
delete.accept(visitor, null);
- MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getDefaultDataverse(),
- metadataProvider.getStorageComponentProvider());
+ MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getApplicationContext(),
+ metadataProvider.getDefaultDataverse(), metadataProvider.getStorageComponentProvider());
tempMdProvider.setConfig(metadataProvider.getConfig());
((QueryTranslator) statementExecutor).handleDeleteStatement(tempMdProvider, delete, hcc, false);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
index 19ea29d..571a2d7 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
@@ -30,6 +30,7 @@
import java.util.logging.Logger;
import org.apache.asterix.active.ActiveJobNotificationHandler;
+import org.apache.asterix.active.ActiveLifecycleListener;
import org.apache.asterix.active.EntityId;
import org.apache.asterix.algebra.extension.IExtensionStatement;
import org.apache.asterix.app.translator.QueryTranslator;
@@ -41,6 +42,7 @@
import org.apache.asterix.bad.metadata.PrecompiledJobEventListener;
import org.apache.asterix.bad.metadata.PrecompiledJobEventListener.PrecompiledType;
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.functions.FunctionSignature;
@@ -174,10 +176,10 @@
Identifier subscriptionsTypeName = new Identifier(BADConstants.ChannelSubscriptionsType);
Identifier resultsTypeName = new Identifier(BADConstants.ChannelResultsType);
//Setup the subscriptions dataset
- List<List<String>> partitionFields = new ArrayList<List<String>>();
- List<Integer> keyIndicators = new ArrayList<Integer>();
+ List<List<String>> partitionFields = new ArrayList<>();
+ List<Integer> keyIndicators = new ArrayList<>();
keyIndicators.add(0);
- List<String> fieldNames = new ArrayList<String>();
+ List<String> fieldNames = new ArrayList<>();
fieldNames.add(BADConstants.SubscriptionId);
partitionFields.add(fieldNames);
IDatasetDetailsDecl idd = new InternalDetailsDecl(partitionFields, keyIndicators, true, null, false);
@@ -186,8 +188,8 @@
new HashMap<String, String>(), new HashMap<String, String>(), DatasetType.INTERNAL, idd, true);
//Setup the results dataset
- partitionFields = new ArrayList<List<String>>();
- fieldNames = new ArrayList<String>();
+ partitionFields = new ArrayList<>();
+ fieldNames = new ArrayList<>();
fieldNames.add(BADConstants.ResultId);
partitionFields.add(fieldNames);
idd = new InternalDetailsDecl(partitionFields, keyIndicators, true, null, false);
@@ -238,8 +240,7 @@
}
private void setupExecutorJob(EntityId entityId, JobSpecification channeljobSpec, IHyracksClientConnection hcc,
- PrecompiledJobEventListener listener, boolean predistributed)
- throws Exception {
+ PrecompiledJobEventListener listener, boolean predistributed) throws Exception {
if (channeljobSpec != null) {
//TODO: Find a way to fix optimizer tests so we don't need this check
JobId jobId = null;
@@ -272,8 +273,11 @@
Identifier subscriptionsName = new Identifier(channelName + BADConstants.subscriptionEnding);
Identifier resultsName = new Identifier(channelName + BADConstants.resultsEnding);
EntityId entityId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, dataverse, channelName.getValue());
- PrecompiledJobEventListener listener = (PrecompiledJobEventListener) ActiveJobNotificationHandler.INSTANCE
- .getActiveEntityListener(entityId);
+ ICcApplicationContext appCtx = metadataProvider.getApplicationContext();
+ ActiveLifecycleListener activeListener = (ActiveLifecycleListener) appCtx.getActiveLifecycleListener();
+ ActiveJobNotificationHandler activeEventHandler = activeListener.getNotificationHandler();
+ PrecompiledJobEventListener listener =
+ (PrecompiledJobEventListener) activeEventHandler.getActiveEntityListener(entityId);
boolean alreadyActive = false;
Channel channel = null;
@@ -302,8 +306,8 @@
if (MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverse, resultsName.getValue()) != null) {
throw new AsterixException("The channel name:" + channelName + " is not available.");
}
- MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getDefaultDataverse(),
- metadataProvider.getStorageComponentProvider());
+ MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getApplicationContext(),
+ metadataProvider.getDefaultDataverse(), metadataProvider.getStorageComponentProvider());
tempMdProvider.setConfig(metadataProvider.getConfig());
//Create Channel Datasets
createDatasets(statementExecutor, subscriptionsName, resultsName, tempMdProvider, hcc, hdc, stats,
@@ -320,7 +324,7 @@
datasets.add(MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverse, resultsName.getValue()));
//TODO: Add datasets used by channel function
listener = new PrecompiledJobEventListener(entityId, PrecompiledType.CHANNEL, datasets);
- ActiveJobNotificationHandler.INSTANCE.registerListener(listener);
+ activeEventHandler.registerListener(listener);
}
if (distributed) {
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
index d378890..7373337 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
@@ -27,6 +27,7 @@
import java.util.logging.Logger;
import org.apache.asterix.active.ActiveJobNotificationHandler;
+import org.apache.asterix.active.ActiveLifecycleListener;
import org.apache.asterix.active.EntityId;
import org.apache.asterix.algebra.extension.IExtensionStatement;
import org.apache.asterix.app.result.ResultReader;
@@ -37,6 +38,7 @@
import org.apache.asterix.bad.metadata.PrecompiledJobEventListener;
import org.apache.asterix.bad.metadata.PrecompiledJobEventListener.PrecompiledType;
import org.apache.asterix.bad.metadata.Procedure;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.functions.FunctionSignature;
@@ -87,7 +89,7 @@
Expression period) {
this.signature = signature;
this.functionBody = functionBody;
- this.paramList = new ArrayList<String>();
+ this.paramList = new ArrayList<>();
for (VarIdentifier varId : parameterList) {
this.paramList.add(varId.getValue());
}
@@ -168,7 +170,8 @@
throw new CompilationException("Procedure can only execute a single statement");
}
if (fStatements.get(0).getKind() == Statement.Kind.INSERT) {
- return new Pair<>(((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider,
+ return new Pair<>(
+ ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider,
fStatements.get(0), hcc, hdc, ResultDelivery.ASYNC, stats, true, null, null),
PrecompiledType.INSERT);
} else if (fStatements.get(0).getKind() == Statement.Kind.QUERY) {
@@ -182,14 +185,14 @@
fStatements.get(0).accept(visitor, null);
return new Pair<>(((QueryTranslator) statementExecutor).handleDeleteStatement(metadataProvider,
fStatements.get(0), hcc, true), PrecompiledType.DELETE);
- }else{
+ } else {
throw new CompilationException("Procedure can only execute a single delete, insert, or query");
}
}
private void setupDistributedJob(EntityId entityId, JobSpecification jobSpec, IHyracksClientConnection hcc,
PrecompiledJobEventListener listener, ResultSetId resultSetId, IHyracksDataset hdc, Stats stats)
- throws Exception {
+ throws Exception {
JobId jobId = hcc.distributeJob(jobSpec);
listener.storeDistributedInfo(jobId, null, new ResultReader(hdc, jobId, resultSetId));
}
@@ -198,15 +201,15 @@
public void handle(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats,
int resultSetIdCounter) throws HyracksDataException, AlgebricksException {
-
+ ICcApplicationContext appCtx = metadataProvider.getApplicationContext();
+ ActiveLifecycleListener activeListener = (ActiveLifecycleListener) appCtx.getActiveLifecycleListener();
+ ActiveJobNotificationHandler activeEventHandler = activeListener.getNotificationHandler();
initialize();
-
String dataverse =
((QueryTranslator) statementExecutor).getActiveDataverse(new Identifier(signature.getNamespace()));
-
EntityId entityId = new EntityId(BADConstants.PROCEDURE_KEYWORD, dataverse, signature.getName());
PrecompiledJobEventListener listener =
- (PrecompiledJobEventListener) ActiveJobNotificationHandler.INSTANCE.getActiveEntityListener(entityId);
+ (PrecompiledJobEventListener) activeEventHandler.getActiveEntityListener(entityId);
boolean alreadyActive = false;
Procedure procedure = null;
@@ -229,8 +232,8 @@
procedure = new Procedure(dataverse, signature.getName(), signature.getArity(), getParamList(),
Function.RETURNTYPE_VOID, getFunctionBody(), Function.LANGUAGE_AQL, duration);
- MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getDefaultDataverse(),
- metadataProvider.getStorageComponentProvider());
+ MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getApplicationContext(),
+ metadataProvider.getDefaultDataverse(), metadataProvider.getStorageComponentProvider());
tempMdProvider.setConfig(metadataProvider.getConfig());
metadataProvider.setResultSetId(new ResultSetId(resultSetIdCounter++));
@@ -250,7 +253,7 @@
if (listener == null) {
//TODO: Add datasets used by channel function
listener = new PrecompiledJobEventListener(entityId, procedureJobSpec.second, new ArrayList<>());
- ActiveJobNotificationHandler.INSTANCE.registerListener(listener);
+ activeEventHandler.registerListener(listener);
}
setupDistributedJob(entityId, procedureJobSpec.first, hcc, listener, tempMdProvider.getResultSetId(), hdc,
stats);
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
index 1a319a1..a8ec9aa 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
@@ -22,6 +22,7 @@
import java.util.concurrent.ScheduledExecutorService;
import org.apache.asterix.active.ActiveJobNotificationHandler;
+import org.apache.asterix.active.ActiveLifecycleListener;
import org.apache.asterix.active.EntityId;
import org.apache.asterix.algebra.extension.IExtensionStatement;
import org.apache.asterix.api.http.server.ResultUtil;
@@ -33,6 +34,7 @@
import org.apache.asterix.bad.metadata.PrecompiledJobEventListener;
import org.apache.asterix.bad.metadata.PrecompiledJobEventListener.PrecompiledType;
import org.apache.asterix.bad.metadata.Procedure;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.lang.common.struct.Identifier;
import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
@@ -92,21 +94,21 @@
public void handle(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats,
int resultSetIdCounter) throws HyracksDataException, AlgebricksException {
-
-
+ ICcApplicationContext appCtx = metadataProvider.getApplicationContext();
+ ActiveLifecycleListener activeListener = (ActiveLifecycleListener) appCtx.getActiveLifecycleListener();
+ ActiveJobNotificationHandler activeEventHandler = activeListener.getNotificationHandler();
String dataverse = ((QueryTranslator) statementExecutor).getActiveDataverse(new Identifier(dataverseName));
boolean txnActive = false;
EntityId entityId = new EntityId(BADConstants.PROCEDURE_KEYWORD, dataverse, procedureName);
- PrecompiledJobEventListener listener = (PrecompiledJobEventListener) ActiveJobNotificationHandler.INSTANCE
- .getActiveEntityListener(entityId);
+ PrecompiledJobEventListener listener =
+ (PrecompiledJobEventListener) activeEventHandler.getActiveEntityListener(entityId);
Procedure procedure = null;
MetadataTransactionContext mdTxnCtx = null;
try {
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
txnActive = true;
- procedure = BADLangExtension.getProcedure(mdTxnCtx, dataverse, procedureName,
- Integer.toString(getArity()));
+ procedure = BADLangExtension.getProcedure(mdTxnCtx, dataverse, procedureName, Integer.toString(getArity()));
if (procedure == null) {
throw new AlgebricksException("There is no procedure with this name " + procedureName + ".");
}
@@ -118,8 +120,8 @@
if (listener.getType() == PrecompiledType.QUERY) {
hcc.waitForCompletion(hyracksJobId);
ResultReader resultReader = listener.getResultReader();
- ResultUtil.printResults(resultReader, ((QueryTranslator) statementExecutor).getSessionConfig(),
- new Stats(), null);
+ ResultUtil.printResults(appCtx, resultReader,
+ ((QueryTranslator) statementExecutor).getSessionConfig(), new Stats(), null);
}
} else {
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java
index abdf90a..f7c3a74 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java
@@ -19,6 +19,7 @@
package org.apache.asterix.bad.lang.statement;
import org.apache.asterix.active.ActiveJobNotificationHandler;
+import org.apache.asterix.active.ActiveLifecycleListener;
import org.apache.asterix.active.EntityId;
import org.apache.asterix.algebra.extension.IExtensionStatement;
import org.apache.asterix.app.translator.QueryTranslator;
@@ -26,6 +27,7 @@
import org.apache.asterix.bad.lang.BADLangExtension;
import org.apache.asterix.bad.metadata.PrecompiledJobEventListener;
import org.apache.asterix.bad.metadata.Procedure;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.functions.FunctionSignature;
import org.apache.asterix.lang.common.struct.Identifier;
@@ -79,15 +81,17 @@
public void handle(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats,
int resultSetIdCounter) throws HyracksDataException, AlgebricksException {
+ ICcApplicationContext appCtx = metadataProvider.getApplicationContext();
+ ActiveLifecycleListener activeListener = (ActiveLifecycleListener) appCtx.getActiveLifecycleListener();
+ ActiveJobNotificationHandler activeEventHandler = activeListener.getNotificationHandler();
FunctionSignature signature = getFunctionSignature();
String dataverse =
((QueryTranslator) statementExecutor).getActiveDataverse(new Identifier(signature.getNamespace()));
signature.setNamespace(dataverse);
-
boolean txnActive = false;
EntityId entityId = new EntityId(BADConstants.PROCEDURE_KEYWORD, dataverse, signature.getName());
- PrecompiledJobEventListener listener = (PrecompiledJobEventListener) ActiveJobNotificationHandler.INSTANCE
- .getActiveEntityListener(entityId);
+ PrecompiledJobEventListener listener =
+ (PrecompiledJobEventListener) activeEventHandler.getActiveEntityListener(entityId);
Procedure procedure = null;
MetadataTransactionContext mdTxnCtx = null;
@@ -111,7 +115,7 @@
}
JobId hyracksJobId = listener.getJobId();
listener.deActivate();
- ActiveJobNotificationHandler.INSTANCE.removeListener(listener);
+ activeEventHandler.removeListener(listener);
if (hyracksJobId != null) {
hcc.destroyJob(hyracksJobId);
}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java
index c5b7ef2..a246193 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java
@@ -26,7 +26,7 @@
import org.apache.asterix.active.ActiveManager;
import org.apache.asterix.active.EntityId;
import org.apache.asterix.bad.ChannelJobService;
-import org.apache.asterix.common.api.IAppRuntimeContext;
+import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.dataflow.data.nontagged.serde.ADateTimeSerializerDeserializer;
import org.apache.asterix.dataflow.data.nontagged.serde.AOrderedListSerializerDeserializer;
import org.apache.asterix.dataflow.data.nontagged.serde.AStringSerializerDeserializer;
@@ -51,8 +51,8 @@
private final ByteBufferInputStream bbis = new ByteBufferInputStream();
private final DataInputStream di = new DataInputStream(bbis);
- private final AOrderedListSerializerDeserializer subSerDes = new AOrderedListSerializerDeserializer(
- new AOrderedListType(BuiltinType.AUUID, null));
+ private final AOrderedListSerializerDeserializer subSerDes =
+ new AOrderedListSerializerDeserializer(new AOrderedListType(BuiltinType.AUUID, null));
private IPointable inputArg0 = new VoidPointable();
private IPointable inputArg1 = new VoidPointable();
@@ -70,7 +70,7 @@
eval0 = brokerEvalFactory.createScalarEvaluator(ctx);
eval1 = subEvalFactory.createScalarEvaluator(ctx);
eval2 = channelExecutionEvalFactory.createScalarEvaluator(ctx);
- this.activeManager = (ActiveManager) ((IAppRuntimeContext) ctx.getJobletContext().getServiceContext()
+ this.activeManager = (ActiveManager) ((INcApplicationContext) ctx.getJobletContext().getServiceContext()
.getApplicationContext()).getActiveManager();
this.entityId = activeJobId;
}