Remove use of static ctx

Change-Id: I758f50772823d7b1935e4d61a6cb2805ba0808ea
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorFactory.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorFactory.java
index 2a11e13..0360ee3 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorFactory.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorFactory.java
@@ -23,6 +23,7 @@
 import org.apache.asterix.app.translator.DefaultStatementExecutorFactory;
 import org.apache.asterix.app.translator.QueryTranslator;
 import org.apache.asterix.common.context.IStorageComponentProvider;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.compiler.provider.ILangCompilationProvider;
 import org.apache.asterix.lang.common.base.Statement;
 import org.apache.asterix.translator.SessionConfig;
@@ -30,9 +31,9 @@
 public class BADQueryTranslatorFactory extends DefaultStatementExecutorFactory {
 
     @Override
-    public QueryTranslator create(List<Statement> statements, SessionConfig conf,
+    public QueryTranslator create(ICcApplicationContext appCtx, List<Statement> statements, SessionConfig conf,
             ILangCompilationProvider compilationProvider, IStorageComponentProvider storageComponentProvider) {
-        return new BADStatementExecutor(statements, conf, compilationProvider, storageComponentProvider,
+        return new BADStatementExecutor(appCtx, statements, conf, compilationProvider, storageComponentProvider,
                 executorService);
     }
 }
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADRuleSetFactory.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADRuleSetFactory.java
index 7e45fd6..8d4b1e5 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADRuleSetFactory.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADRuleSetFactory.java
@@ -22,6 +22,7 @@
 import java.util.List;
 
 import org.apache.asterix.bad.rules.InsertBrokerNotifierForChannelRule;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.compiler.provider.DefaultRuleSetFactory;
 import org.apache.asterix.compiler.provider.IRuleSetFactory;
 import org.apache.asterix.optimizer.base.RuleCollections;
@@ -35,11 +36,12 @@
 public class BADRuleSetFactory implements IRuleSetFactory {
 
     @Override
-    public List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> getLogicalRewrites()
-            throws AlgebricksException {
-        List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> logicalRuleSet = DefaultRuleSetFactory.buildLogical();
+    public List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> getLogicalRewrites(
+            ICcApplicationContext appCtx) throws AlgebricksException {
+        List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> logicalRuleSet =
+                DefaultRuleSetFactory.buildLogical(appCtx);
 
-        List<IAlgebraicRewriteRule> normalizationCollection = RuleCollections.buildNormalizationRuleCollection();
+        List<IAlgebraicRewriteRule> normalizationCollection = RuleCollections.buildNormalizationRuleCollection(appCtx);
         List<IAlgebraicRewriteRule> alteredNormalizationCollection = new ArrayList<>();
         alteredNormalizationCollection.addAll(normalizationCollection);
 
@@ -54,7 +56,7 @@
 
         //Find instances of the normalization collection and replace them with the new one
         SequentialOnceRuleController seqOnceCtrl = new SequentialOnceRuleController(true);
-        for (int i =0; i < logicalRuleSet.size(); i++){
+        for (int i = 0; i < logicalRuleSet.size(); i++) {
             List<IAlgebraicRewriteRule> collection = logicalRuleSet.get(i).second;
             if (collection.size() == normalizationCollection.size()) {
                 boolean isNormalizationCollection = true;
@@ -75,7 +77,8 @@
     }
 
     @Override
-    public List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> getPhysicalRewrites() {
-        return DefaultRuleSetFactory.buildPhysical();
+    public List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> getPhysicalRewrites(
+            ICcApplicationContext appCtx) {
+        return DefaultRuleSetFactory.buildPhysical(appCtx);
     }
 }
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
index f62a7e0..0f2e212 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
@@ -29,6 +29,7 @@
 import org.apache.asterix.bad.metadata.Channel;
 import org.apache.asterix.bad.metadata.Procedure;
 import org.apache.asterix.common.context.IStorageComponentProvider;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.functions.FunctionSignature;
 import org.apache.asterix.compiler.provider.ILangCompilationProvider;
 import org.apache.asterix.lang.common.base.Statement;
@@ -42,13 +43,12 @@
 
 public class BADStatementExecutor extends QueryTranslator {
 
-    public BADStatementExecutor(List<Statement> statements, SessionConfig conf,
+    public BADStatementExecutor(ICcApplicationContext appCtx, List<Statement> statements, SessionConfig conf,
             ILangCompilationProvider compliationProvider, IStorageComponentProvider storageComponentProvider,
             ExecutorService executorService) {
-        super(statements, conf, compliationProvider, storageComponentProvider, executorService);
+        super(appCtx, statements, conf, compliationProvider, storageComponentProvider, executorService);
     }
 
-
     @Override
     protected void handleDataverseDropStatement(MetadataProvider metadataProvider, Statement stmt,
             IHyracksClientConnection hcc) throws Exception {
@@ -59,7 +59,7 @@
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
         Identifier dvId = ((DataverseDropStatement) stmt).getDataverseName();
         List<Broker> brokers = BADLangExtension.getBrokers(mdTxnCtx, dvId.getValue());
-        MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getDefaultDataverse(),
+        MetadataProvider tempMdProvider = new MetadataProvider(appCtx, metadataProvider.getDefaultDataverse(),
                 metadataProvider.getStorageComponentProvider());
         tempMdProvider.setConfig(metadataProvider.getConfig());
         for (Broker broker : brokers) {
@@ -70,8 +70,8 @@
         List<Channel> channels = BADLangExtension.getChannels(mdTxnCtx, dvId.getValue());
         for (Channel channel : channels) {
             tempMdProvider.getLocks().reset();
-            ChannelDropStatement drop = new ChannelDropStatement(dvId,
-                    new Identifier(channel.getChannelId().getEntityName()), false);
+            ChannelDropStatement drop =
+                    new ChannelDropStatement(dvId, new Identifier(channel.getChannelId().getEntityName()), false);
             drop.handle(this, tempMdProvider, hcc, null, null, null, 0);
         }
         List<Procedure> procedures = BADLangExtension.getProcedures(mdTxnCtx, dvId.getValue());
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
index 854ae07..1b655da 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.bad.lang.statement;
 
 import org.apache.asterix.active.ActiveJobNotificationHandler;
+import org.apache.asterix.active.ActiveLifecycleListener;
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.algebra.extension.IExtensionStatement;
 import org.apache.asterix.app.translator.QueryTranslator;
@@ -26,6 +27,7 @@
 import org.apache.asterix.bad.lang.BADLangExtension;
 import org.apache.asterix.bad.metadata.Channel;
 import org.apache.asterix.bad.metadata.PrecompiledJobEventListener;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.lang.common.statement.DropDatasetStatement;
 import org.apache.asterix.lang.common.struct.Identifier;
@@ -85,12 +87,14 @@
     public void handle(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
             IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats,
             int resultSetIdCounter) throws HyracksDataException, AlgebricksException {
-
         String dataverse = ((QueryTranslator) statementExecutor).getActiveDataverse(dataverseName);
         boolean txnActive = false;
         EntityId entityId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, dataverse, channelName.getValue());
-        PrecompiledJobEventListener listener = (PrecompiledJobEventListener) ActiveJobNotificationHandler.INSTANCE
-                .getActiveEntityListener(entityId);
+        ICcApplicationContext appCtx = metadataProvider.getApplicationContext();
+        ActiveLifecycleListener activeListener = (ActiveLifecycleListener) appCtx.getActiveLifecycleListener();
+        ActiveJobNotificationHandler activeEventHandler = activeListener.getNotificationHandler();
+        PrecompiledJobEventListener listener =
+                (PrecompiledJobEventListener) activeEventHandler.getActiveEntityListener(entityId);
         Channel channel = null;
 
         MetadataTransactionContext mdTxnCtx = null;
@@ -111,13 +115,13 @@
             listener.getExecutorService().shutdownNow();
             JobId hyracksJobId = listener.getJobId();
             listener.deActivate();
-            ActiveJobNotificationHandler.INSTANCE.removeListener(listener);
+            activeEventHandler.removeListener(listener);
             if (hyracksJobId != null) {
                 hcc.destroyJob(hyracksJobId);
             }
 
             //Create a metadata provider to use in nested jobs.
-            MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getDefaultDataverse(),
+            MetadataProvider tempMdProvider = new MetadataProvider(appCtx, metadataProvider.getDefaultDataverse(),
                     metadataProvider.getStorageComponentProvider());
             tempMdProvider.setConfig(metadataProvider.getConfig());
             //Drop the Channel Datasets
@@ -131,7 +135,6 @@
                     new Identifier(channel.getSubscriptionsDataset()), true);
             ((QueryTranslator) statementExecutor).handleDatasetDropStatement(tempMdProvider, dropStmt, hcc);
 
-
             //Remove the Channel Metadata
             MetadataManager.INSTANCE.deleteEntity(mdTxnCtx, channel);
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
index 305ca20..60f871e 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
@@ -128,8 +128,7 @@
             int resultSetIdCounter) throws HyracksDataException, AlgebricksException {
 
         String dataverse = ((QueryTranslator) statementExecutor).getActiveDataverse(dataverseName);
-        String brokerDataverse = ((QueryTranslator) statementExecutor)
-.getActiveDataverse(brokerDataverseName);
+        String brokerDataverse = ((QueryTranslator) statementExecutor).getActiveDataverse(brokerDataverseName);
 
         MetadataTransactionContext mdTxnCtx = null;
         try {
@@ -153,7 +152,7 @@
 
             Query subscriptionTuple = new Query(false);
 
-            List<FieldBinding> fb = new ArrayList<FieldBinding>();
+            List<FieldBinding> fb = new ArrayList<>();
             LiteralExpr leftExpr = new LiteralExpr(new StringLiteral(BADConstants.DataverseName));
             Expression rightExpr = new LiteralExpr(new StringLiteral(brokerDataverse));
             fb.add(new FieldBinding(leftExpr, rightExpr));
@@ -165,11 +164,11 @@
             if (subscriptionId != null) {
                 leftExpr = new LiteralExpr(new StringLiteral(BADConstants.SubscriptionId));
 
-                List<Expression> UUIDList = new ArrayList<Expression>();
+                List<Expression> UUIDList = new ArrayList<>();
                 UUIDList.add(new LiteralExpr(new StringLiteral(subscriptionId)));
                 FunctionIdentifier function = BuiltinFunctions.UUID_CONSTRUCTOR;
-                FunctionSignature UUIDfunc = new FunctionSignature(function.getNamespace(), function.getName(),
-                        function.getArity());
+                FunctionSignature UUIDfunc =
+                        new FunctionSignature(function.getNamespace(), function.getName(), function.getArity());
                 CallExpr UUIDCall = new CallExpr(UUIDfunc, UUIDList);
 
                 rightExpr = UUIDCall;
@@ -186,8 +185,8 @@
 
             subscriptionTuple.setVarCounter(varCounter);
 
-            MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getDefaultDataverse(),
-                    metadataProvider.getStorageComponentProvider());
+            MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getApplicationContext(),
+                    metadataProvider.getDefaultDataverse(), metadataProvider.getStorageComponentProvider());
             tempMdProvider.setConfig(metadataProvider.getConfig());
 
             if (subscriptionId == null) {
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
index 0cc96ad..60de69e 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
@@ -129,12 +129,12 @@
             condition.setCurrentop(true);
             condition.addOperator("=");
 
-            List<Expression> UUIDList = new ArrayList<Expression>();
+            List<Expression> UUIDList = new ArrayList<>();
             UUIDList.add(new LiteralExpr(new StringLiteral(subscriptionId)));
 
             FunctionIdentifier function = BuiltinFunctions.UUID_CONSTRUCTOR;
-            FunctionSignature UUIDfunc = new FunctionSignature(function.getNamespace(), function.getName(),
-                    function.getArity());
+            FunctionSignature UUIDfunc =
+                    new FunctionSignature(function.getNamespace(), function.getName(), function.getArity());
             CallExpr UUIDCall = new CallExpr(UUIDfunc, UUIDList);
 
             condition.addOperand(UUIDCall);
@@ -143,8 +143,8 @@
                     new Identifier(subscriptionsDatasetName), condition, varCounter);
             SqlppDeleteRewriteVisitor visitor = new SqlppDeleteRewriteVisitor();
             delete.accept(visitor, null);
-            MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getDefaultDataverse(),
-                    metadataProvider.getStorageComponentProvider());
+            MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getApplicationContext(),
+                    metadataProvider.getDefaultDataverse(), metadataProvider.getStorageComponentProvider());
             tempMdProvider.setConfig(metadataProvider.getConfig());
             ((QueryTranslator) statementExecutor).handleDeleteStatement(tempMdProvider, delete, hcc, false);
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
index 19ea29d..571a2d7 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
@@ -30,6 +30,7 @@
 import java.util.logging.Logger;
 
 import org.apache.asterix.active.ActiveJobNotificationHandler;
+import org.apache.asterix.active.ActiveLifecycleListener;
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.algebra.extension.IExtensionStatement;
 import org.apache.asterix.app.translator.QueryTranslator;
@@ -41,6 +42,7 @@
 import org.apache.asterix.bad.metadata.PrecompiledJobEventListener;
 import org.apache.asterix.bad.metadata.PrecompiledJobEventListener.PrecompiledType;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.functions.FunctionSignature;
@@ -174,10 +176,10 @@
         Identifier subscriptionsTypeName = new Identifier(BADConstants.ChannelSubscriptionsType);
         Identifier resultsTypeName = new Identifier(BADConstants.ChannelResultsType);
         //Setup the subscriptions dataset
-        List<List<String>> partitionFields = new ArrayList<List<String>>();
-        List<Integer> keyIndicators = new ArrayList<Integer>();
+        List<List<String>> partitionFields = new ArrayList<>();
+        List<Integer> keyIndicators = new ArrayList<>();
         keyIndicators.add(0);
-        List<String> fieldNames = new ArrayList<String>();
+        List<String> fieldNames = new ArrayList<>();
         fieldNames.add(BADConstants.SubscriptionId);
         partitionFields.add(fieldNames);
         IDatasetDetailsDecl idd = new InternalDetailsDecl(partitionFields, keyIndicators, true, null, false);
@@ -186,8 +188,8 @@
                 new HashMap<String, String>(), new HashMap<String, String>(), DatasetType.INTERNAL, idd, true);
 
         //Setup the results dataset
-        partitionFields = new ArrayList<List<String>>();
-        fieldNames = new ArrayList<String>();
+        partitionFields = new ArrayList<>();
+        fieldNames = new ArrayList<>();
         fieldNames.add(BADConstants.ResultId);
         partitionFields.add(fieldNames);
         idd = new InternalDetailsDecl(partitionFields, keyIndicators, true, null, false);
@@ -238,8 +240,7 @@
     }
 
     private void setupExecutorJob(EntityId entityId, JobSpecification channeljobSpec, IHyracksClientConnection hcc,
-            PrecompiledJobEventListener listener, boolean predistributed)
-            throws Exception {
+            PrecompiledJobEventListener listener, boolean predistributed) throws Exception {
         if (channeljobSpec != null) {
             //TODO: Find a way to fix optimizer tests so we don't need this check
             JobId jobId = null;
@@ -272,8 +273,11 @@
         Identifier subscriptionsName = new Identifier(channelName + BADConstants.subscriptionEnding);
         Identifier resultsName = new Identifier(channelName + BADConstants.resultsEnding);
         EntityId entityId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, dataverse, channelName.getValue());
-        PrecompiledJobEventListener listener = (PrecompiledJobEventListener) ActiveJobNotificationHandler.INSTANCE
-                .getActiveEntityListener(entityId);
+        ICcApplicationContext appCtx = metadataProvider.getApplicationContext();
+        ActiveLifecycleListener activeListener = (ActiveLifecycleListener) appCtx.getActiveLifecycleListener();
+        ActiveJobNotificationHandler activeEventHandler = activeListener.getNotificationHandler();
+        PrecompiledJobEventListener listener =
+                (PrecompiledJobEventListener) activeEventHandler.getActiveEntityListener(entityId);
         boolean alreadyActive = false;
         Channel channel = null;
 
@@ -302,8 +306,8 @@
             if (MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverse, resultsName.getValue()) != null) {
                 throw new AsterixException("The channel name:" + channelName + " is not available.");
             }
-            MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getDefaultDataverse(),
-                    metadataProvider.getStorageComponentProvider());
+            MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getApplicationContext(),
+                    metadataProvider.getDefaultDataverse(), metadataProvider.getStorageComponentProvider());
             tempMdProvider.setConfig(metadataProvider.getConfig());
             //Create Channel Datasets
             createDatasets(statementExecutor, subscriptionsName, resultsName, tempMdProvider, hcc, hdc, stats,
@@ -320,7 +324,7 @@
                 datasets.add(MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverse, resultsName.getValue()));
                 //TODO: Add datasets used by channel function
                 listener = new PrecompiledJobEventListener(entityId, PrecompiledType.CHANNEL, datasets);
-                ActiveJobNotificationHandler.INSTANCE.registerListener(listener);
+                activeEventHandler.registerListener(listener);
             }
 
             if (distributed) {
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
index d378890..7373337 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
@@ -27,6 +27,7 @@
 import java.util.logging.Logger;
 
 import org.apache.asterix.active.ActiveJobNotificationHandler;
+import org.apache.asterix.active.ActiveLifecycleListener;
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.algebra.extension.IExtensionStatement;
 import org.apache.asterix.app.result.ResultReader;
@@ -37,6 +38,7 @@
 import org.apache.asterix.bad.metadata.PrecompiledJobEventListener;
 import org.apache.asterix.bad.metadata.PrecompiledJobEventListener.PrecompiledType;
 import org.apache.asterix.bad.metadata.Procedure;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.functions.FunctionSignature;
@@ -87,7 +89,7 @@
             Expression period) {
         this.signature = signature;
         this.functionBody = functionBody;
-        this.paramList = new ArrayList<String>();
+        this.paramList = new ArrayList<>();
         for (VarIdentifier varId : parameterList) {
             this.paramList.add(varId.getValue());
         }
@@ -168,7 +170,8 @@
             throw new CompilationException("Procedure can only execute a single statement");
         }
         if (fStatements.get(0).getKind() == Statement.Kind.INSERT) {
-            return new Pair<>(((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider,
+            return new Pair<>(
+                    ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider,
                             fStatements.get(0), hcc, hdc, ResultDelivery.ASYNC, stats, true, null, null),
                     PrecompiledType.INSERT);
         } else if (fStatements.get(0).getKind() == Statement.Kind.QUERY) {
@@ -182,14 +185,14 @@
             fStatements.get(0).accept(visitor, null);
             return new Pair<>(((QueryTranslator) statementExecutor).handleDeleteStatement(metadataProvider,
                     fStatements.get(0), hcc, true), PrecompiledType.DELETE);
-        }else{
+        } else {
             throw new CompilationException("Procedure can only execute a single delete, insert, or query");
         }
     }
 
     private void setupDistributedJob(EntityId entityId, JobSpecification jobSpec, IHyracksClientConnection hcc,
             PrecompiledJobEventListener listener, ResultSetId resultSetId, IHyracksDataset hdc, Stats stats)
-                    throws Exception {
+            throws Exception {
         JobId jobId = hcc.distributeJob(jobSpec);
         listener.storeDistributedInfo(jobId, null, new ResultReader(hdc, jobId, resultSetId));
     }
@@ -198,15 +201,15 @@
     public void handle(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
             IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats,
             int resultSetIdCounter) throws HyracksDataException, AlgebricksException {
-
+        ICcApplicationContext appCtx = metadataProvider.getApplicationContext();
+        ActiveLifecycleListener activeListener = (ActiveLifecycleListener) appCtx.getActiveLifecycleListener();
+        ActiveJobNotificationHandler activeEventHandler = activeListener.getNotificationHandler();
         initialize();
-
         String dataverse =
                 ((QueryTranslator) statementExecutor).getActiveDataverse(new Identifier(signature.getNamespace()));
-
         EntityId entityId = new EntityId(BADConstants.PROCEDURE_KEYWORD, dataverse, signature.getName());
         PrecompiledJobEventListener listener =
-                (PrecompiledJobEventListener) ActiveJobNotificationHandler.INSTANCE.getActiveEntityListener(entityId);
+                (PrecompiledJobEventListener) activeEventHandler.getActiveEntityListener(entityId);
         boolean alreadyActive = false;
         Procedure procedure = null;
 
@@ -229,8 +232,8 @@
             procedure = new Procedure(dataverse, signature.getName(), signature.getArity(), getParamList(),
                     Function.RETURNTYPE_VOID, getFunctionBody(), Function.LANGUAGE_AQL, duration);
 
-            MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getDefaultDataverse(),
-                    metadataProvider.getStorageComponentProvider());
+            MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getApplicationContext(),
+                    metadataProvider.getDefaultDataverse(), metadataProvider.getStorageComponentProvider());
             tempMdProvider.setConfig(metadataProvider.getConfig());
 
             metadataProvider.setResultSetId(new ResultSetId(resultSetIdCounter++));
@@ -250,7 +253,7 @@
             if (listener == null) {
                 //TODO: Add datasets used by channel function
                 listener = new PrecompiledJobEventListener(entityId, procedureJobSpec.second, new ArrayList<>());
-                ActiveJobNotificationHandler.INSTANCE.registerListener(listener);
+                activeEventHandler.registerListener(listener);
             }
             setupDistributedJob(entityId, procedureJobSpec.first, hcc, listener, tempMdProvider.getResultSetId(), hdc,
                     stats);
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
index 1a319a1..a8ec9aa 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
@@ -22,6 +22,7 @@
 import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.asterix.active.ActiveJobNotificationHandler;
+import org.apache.asterix.active.ActiveLifecycleListener;
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.algebra.extension.IExtensionStatement;
 import org.apache.asterix.api.http.server.ResultUtil;
@@ -33,6 +34,7 @@
 import org.apache.asterix.bad.metadata.PrecompiledJobEventListener;
 import org.apache.asterix.bad.metadata.PrecompiledJobEventListener.PrecompiledType;
 import org.apache.asterix.bad.metadata.Procedure;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.lang.common.struct.Identifier;
 import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
@@ -92,21 +94,21 @@
     public void handle(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
             IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats,
             int resultSetIdCounter) throws HyracksDataException, AlgebricksException {
-
-
+        ICcApplicationContext appCtx = metadataProvider.getApplicationContext();
+        ActiveLifecycleListener activeListener = (ActiveLifecycleListener) appCtx.getActiveLifecycleListener();
+        ActiveJobNotificationHandler activeEventHandler = activeListener.getNotificationHandler();
         String dataverse = ((QueryTranslator) statementExecutor).getActiveDataverse(new Identifier(dataverseName));
         boolean txnActive = false;
         EntityId entityId = new EntityId(BADConstants.PROCEDURE_KEYWORD, dataverse, procedureName);
-        PrecompiledJobEventListener listener = (PrecompiledJobEventListener) ActiveJobNotificationHandler.INSTANCE
-                .getActiveEntityListener(entityId);
+        PrecompiledJobEventListener listener =
+                (PrecompiledJobEventListener) activeEventHandler.getActiveEntityListener(entityId);
         Procedure procedure = null;
 
         MetadataTransactionContext mdTxnCtx = null;
         try {
             mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
             txnActive = true;
-            procedure = BADLangExtension.getProcedure(mdTxnCtx, dataverse, procedureName,
-                    Integer.toString(getArity()));
+            procedure = BADLangExtension.getProcedure(mdTxnCtx, dataverse, procedureName, Integer.toString(getArity()));
             if (procedure == null) {
                 throw new AlgebricksException("There is no procedure with this name " + procedureName + ".");
             }
@@ -118,8 +120,8 @@
                 if (listener.getType() == PrecompiledType.QUERY) {
                     hcc.waitForCompletion(hyracksJobId);
                     ResultReader resultReader = listener.getResultReader();
-                    ResultUtil.printResults(resultReader, ((QueryTranslator) statementExecutor).getSessionConfig(),
-                            new Stats(), null);
+                    ResultUtil.printResults(appCtx, resultReader,
+                            ((QueryTranslator) statementExecutor).getSessionConfig(), new Stats(), null);
                 }
 
             } else {
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java
index abdf90a..f7c3a74 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.bad.lang.statement;
 
 import org.apache.asterix.active.ActiveJobNotificationHandler;
+import org.apache.asterix.active.ActiveLifecycleListener;
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.algebra.extension.IExtensionStatement;
 import org.apache.asterix.app.translator.QueryTranslator;
@@ -26,6 +27,7 @@
 import org.apache.asterix.bad.lang.BADLangExtension;
 import org.apache.asterix.bad.metadata.PrecompiledJobEventListener;
 import org.apache.asterix.bad.metadata.Procedure;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.functions.FunctionSignature;
 import org.apache.asterix.lang.common.struct.Identifier;
@@ -79,15 +81,17 @@
     public void handle(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
             IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats,
             int resultSetIdCounter) throws HyracksDataException, AlgebricksException {
+        ICcApplicationContext appCtx = metadataProvider.getApplicationContext();
+        ActiveLifecycleListener activeListener = (ActiveLifecycleListener) appCtx.getActiveLifecycleListener();
+        ActiveJobNotificationHandler activeEventHandler = activeListener.getNotificationHandler();
         FunctionSignature signature = getFunctionSignature();
         String dataverse =
                 ((QueryTranslator) statementExecutor).getActiveDataverse(new Identifier(signature.getNamespace()));
         signature.setNamespace(dataverse);
-
         boolean txnActive = false;
         EntityId entityId = new EntityId(BADConstants.PROCEDURE_KEYWORD, dataverse, signature.getName());
-        PrecompiledJobEventListener listener = (PrecompiledJobEventListener) ActiveJobNotificationHandler.INSTANCE
-                .getActiveEntityListener(entityId);
+        PrecompiledJobEventListener listener =
+                (PrecompiledJobEventListener) activeEventHandler.getActiveEntityListener(entityId);
         Procedure procedure = null;
 
         MetadataTransactionContext mdTxnCtx = null;
@@ -111,7 +115,7 @@
             }
             JobId hyracksJobId = listener.getJobId();
             listener.deActivate();
-            ActiveJobNotificationHandler.INSTANCE.removeListener(listener);
+            activeEventHandler.removeListener(listener);
             if (hyracksJobId != null) {
                 hcc.destroyJob(hyracksJobId);
             }
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java
index c5b7ef2..a246193 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java
@@ -26,7 +26,7 @@
 import org.apache.asterix.active.ActiveManager;
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.bad.ChannelJobService;
-import org.apache.asterix.common.api.IAppRuntimeContext;
+import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.dataflow.data.nontagged.serde.ADateTimeSerializerDeserializer;
 import org.apache.asterix.dataflow.data.nontagged.serde.AOrderedListSerializerDeserializer;
 import org.apache.asterix.dataflow.data.nontagged.serde.AStringSerializerDeserializer;
@@ -51,8 +51,8 @@
 
     private final ByteBufferInputStream bbis = new ByteBufferInputStream();
     private final DataInputStream di = new DataInputStream(bbis);
-    private final AOrderedListSerializerDeserializer subSerDes = new AOrderedListSerializerDeserializer(
-            new AOrderedListType(BuiltinType.AUUID, null));
+    private final AOrderedListSerializerDeserializer subSerDes =
+            new AOrderedListSerializerDeserializer(new AOrderedListType(BuiltinType.AUUID, null));
 
     private IPointable inputArg0 = new VoidPointable();
     private IPointable inputArg1 = new VoidPointable();
@@ -70,7 +70,7 @@
         eval0 = brokerEvalFactory.createScalarEvaluator(ctx);
         eval1 = subEvalFactory.createScalarEvaluator(ctx);
         eval2 = channelExecutionEvalFactory.createScalarEvaluator(ctx);
-        this.activeManager = (ActiveManager) ((IAppRuntimeContext) ctx.getJobletContext().getServiceContext()
+        this.activeManager = (ActiveManager) ((INcApplicationContext) ctx.getJobletContext().getServiceContext()
                 .getApplicationContext()).getActiveManager();
         this.entityId = activeJobId;
     }