Coordinated change for extensible MetadataProvider

Change-Id: Id7df08806befad531ac071163993b38b25d57847
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
index 51923ae..3cf2ce6 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
@@ -236,17 +236,17 @@
         MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
         metadataProvider.getLocks().unlock();
 
-        metadataProvider = new MetadataProvider(appCtx, activeDataverse);
+        metadataProvider = MetadataProvider.create(appCtx, activeDataverse);
         super.handleCreateIndexStatement(metadataProvider, stmt, hcc, requestParameters);
 
         for (Channel channel : usages.first) {
-            metadataProvider = new MetadataProvider(appCtx, activeDataverse);
+            metadataProvider = MetadataProvider.create(appCtx, activeDataverse);
             BADJobService.redeployJobSpec(channel.getChannelId(), channel.getChannelBody(), metadataProvider, this, hcc,
                     requestParameters, false);
             metadataProvider.getLocks().unlock();
         }
         for (Procedure procedure : usages.second) {
-            metadataProvider = new MetadataProvider(appCtx, activeDataverse);
+            metadataProvider = MetadataProvider.create(appCtx, activeDataverse);
             BADJobService.redeployJobSpec(procedure.getEntityId(), procedure.getBody(), metadataProvider, this, hcc,
                     requestParameters, false);
             metadataProvider.getLocks().unlock();
@@ -303,7 +303,7 @@
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
         DataverseName dvId = ((DataverseDropStatement) stmt).getDataverseName();
-        MetadataProvider tempMdProvider = new MetadataProvider(appCtx, metadataProvider.getDefaultDataverse());
+        MetadataProvider tempMdProvider = MetadataProvider.create(appCtx, metadataProvider.getDefaultDataverse());
         tempMdProvider.getConfig().putAll(metadataProvider.getConfig());
         List<Channel> channels = BADLangExtension.getAllChannels(mdTxnCtx);
         for (Channel channel : channels) {
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
index 3a50777..23b3483 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
@@ -130,7 +130,7 @@
                 }
             }
             //Create a metadata provider to use in nested jobs.
-            MetadataProvider tempMdProvider = new MetadataProvider(appCtx, metadataProvider.getDefaultDataverse());
+            MetadataProvider tempMdProvider = MetadataProvider.create(appCtx, metadataProvider.getDefaultDataverse());
             tempMdProvider.getConfig().putAll(metadataProvider.getConfig());
             //Drop the Channel Datasets
             //TODO: Need to find some way to handle if this fails.
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
index 2c87711..de9532c 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
@@ -178,7 +178,7 @@
             RecordConstructor recordCon = new RecordConstructor(fb);
             subscriptionTuple.setBody(recordCon);
             subscriptionTuple.setVarCounter(varCounter);
-            MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getApplicationContext(),
+            MetadataProvider tempMdProvider = MetadataProvider.create(metadataProvider.getApplicationContext(),
                     metadataProvider.getDefaultDataverse());
             tempMdProvider.getConfig().putAll(metadataProvider.getConfig());
 
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
index 4f097c3..e8a381d 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
@@ -136,7 +136,7 @@
                     condition, varCounter);
             SqlppDeleteRewriteVisitor visitor = new SqlppDeleteRewriteVisitor(metadataProvider);
             delete.accept(visitor, null);
-            MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getApplicationContext(),
+            MetadataProvider tempMdProvider = MetadataProvider.create(metadataProvider.getApplicationContext(),
                     metadataProvider.getDefaultDataverse());
             tempMdProvider.getConfig().putAll(metadataProvider.getConfig());
             ((QueryTranslator) statementExecutor).handleDeleteStatement(tempMdProvider, delete, hcc, false, null, null);
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
index b807cc7..50ddf46 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
@@ -305,7 +305,7 @@
             if (!push && MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, resultsTableName) != null) {
                 throw new AsterixException("The channel name:" + channelName + " is not available.");
             }
-            MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getApplicationContext(),
+            MetadataProvider tempMdProvider = MetadataProvider.create(metadataProvider.getApplicationContext(),
                     metadataProvider.getDefaultDataverse());
             tempMdProvider.getConfig().putAll(metadataProvider.getConfig());
             final IResultSet resultSet = requestContext.getResultSet();
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/recovery/BADGlobalRecoveryManager.java b/asterix-bad/src/main/java/org/apache/asterix/bad/recovery/BADGlobalRecoveryManager.java
index 6dcf1fd..fe7e511 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/recovery/BADGlobalRecoveryManager.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/recovery/BADGlobalRecoveryManager.java
@@ -120,7 +120,7 @@
         //Redeploy Jobs
         for (Channel channel : channels) {
             EntityId entityId = channel.getChannelId();
-            metadataProvider = new MetadataProvider(appCtx, MetadataBuiltinEntities.DEFAULT_DATAVERSE);
+            metadataProvider = MetadataProvider.create(appCtx, MetadataBuiltinEntities.DEFAULT_DATAVERSE);
             DeployedJobSpecEventListener listener =
                     new DeployedJobSpecEventListener(appCtx, entityId, channel.getResultsDatasetName().equals("")
                             ? PrecompiledType.PUSH_CHANNEL : PrecompiledType.CHANNEL);
@@ -144,7 +144,7 @@
         }
         for (Procedure procedure : procedures) {
             EntityId entityId = procedure.getEntityId();
-            metadataProvider = new MetadataProvider(appCtx, MetadataBuiltinEntities.DEFAULT_DATAVERSE);
+            metadataProvider = MetadataProvider.create(appCtx, MetadataBuiltinEntities.DEFAULT_DATAVERSE);
             metadataProvider.setWriterFactory(PrinterBasedWriterFactory.INSTANCE);
             metadataProvider.setResultSerializerFactoryProvider(ResultSerializerFactoryProvider.INSTANCE);
             DeployedJobSpecEventListener listener =