Added long-term test to verify pre-distributed jobs fix

Fixed bug from master change to DeleteStatement
Fixed Lock Management in BAD

Change-Id: I99e799e203f6ca6082f9c90f04e606c436eb00ee
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
index 9be02f6..5cc6444 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
@@ -59,21 +59,27 @@
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
         Identifier dvId = ((DataverseDropStatement) stmt).getDataverseName();
         List<Broker> brokers = BADLangExtension.getBrokers(mdTxnCtx, dvId.getValue());
+        MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getDefaultDataverse(),
+                metadataProvider.getStorageComponentProvider());
+        tempMdProvider.setConfig(metadataProvider.getConfig());
         for (Broker broker : brokers) {
+            tempMdProvider.getLocks().reset();
             BrokerDropStatement drop = new BrokerDropStatement(dvId, new Identifier(broker.getBrokerName()), false);
-            drop.handle(this, metadataProvider, hcc, null, null, null, 0);
+            drop.handle(this, tempMdProvider, hcc, null, null, null, 0);
         }
         List<Channel> channels = BADLangExtension.getChannels(mdTxnCtx, dvId.getValue());
         for (Channel channel : channels) {
+            tempMdProvider.getLocks().reset();
             ChannelDropStatement drop = new ChannelDropStatement(dvId,
                     new Identifier(channel.getChannelId().getEntityName()), false);
-            drop.handle(this, metadataProvider, hcc, null, null, null, 0);
+            drop.handle(this, tempMdProvider, hcc, null, null, null, 0);
         }
         List<Procedure> procedures = BADLangExtension.getProcedures(mdTxnCtx, dvId.getValue());
         for (Procedure procedure : procedures) {
+            tempMdProvider.getLocks().reset();
             ProcedureDropStatement drop = new ProcedureDropStatement(new FunctionSignature(dvId.getValue(),
                     procedure.getEntityId().getEntityName(), procedure.getArity()), false);
-            drop.handle(this, metadataProvider, hcc, null, null, null, 0);
+            drop.handle(this, tempMdProvider, hcc, null, null, null, 0);
         }
         MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
     }
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/BrokerDropStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/BrokerDropStatement.java
index 018e211..5b81903 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/BrokerDropStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/BrokerDropStatement.java
@@ -94,6 +94,8 @@
         } catch (Exception e) {
             QueryTranslator.abort(e, e, mdTxnCtx);
             throw new HyracksDataException(e);
+        } finally {
+            metadataProvider.getLocks().unlock();
         }
     }
 
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
index 89b0e9a..854ae07 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
@@ -116,16 +116,20 @@
                 hcc.destroyJob(hyracksJobId);
             }
 
+            //Create a metadata provider to use in nested jobs.
+            MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getDefaultDataverse(),
+                    metadataProvider.getStorageComponentProvider());
+            tempMdProvider.setConfig(metadataProvider.getConfig());
             //Drop the Channel Datasets
             //TODO: Need to find some way to handle if this fails.
             //TODO: Prevent datasets for Channels from being dropped elsewhere
             DropDatasetStatement dropStmt = new DropDatasetStatement(new Identifier(dataverse),
                     new Identifier(channel.getResultsDatasetName()), true);
-            ((QueryTranslator) statementExecutor).handleDatasetDropStatement(metadataProvider, dropStmt, hcc);
-
+            ((QueryTranslator) statementExecutor).handleDatasetDropStatement(tempMdProvider, dropStmt, hcc);
+            tempMdProvider.getLocks().reset();
             dropStmt = new DropDatasetStatement(new Identifier(dataverse),
                     new Identifier(channel.getSubscriptionsDataset()), true);
-            ((QueryTranslator) statementExecutor).handleDatasetDropStatement(metadataProvider, dropStmt, hcc);
+            ((QueryTranslator) statementExecutor).handleDatasetDropStatement(tempMdProvider, dropStmt, hcc);
 
 
             //Remove the Channel Metadata
@@ -137,6 +141,8 @@
                 QueryTranslator.abort(e, e, mdTxnCtx);
             }
             throw new HyracksDataException(e);
+        } finally {
+            metadataProvider.getLocks().unlock();
         }
     }
 
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
index bb3cef2..5c5cdf1 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
@@ -189,8 +189,12 @@
 
             subscriptionTuple.setVarCounter(varCounter);
 
-            if (subscriptionId == null) {
+            MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getDefaultDataverse(),
+                    metadataProvider.getStorageComponentProvider());
+            tempMdProvider.setConfig(metadataProvider.getConfig());
 
+            if (subscriptionId == null) {
+                //To create a new subscription
                 VariableExpr subscriptionVar = new VariableExpr(new VarIdentifier("$sub", 1));
                 VariableExpr useSubscriptionVar = new VariableExpr(new VarIdentifier("$sub", 1));
                 VariableExpr resultVar = new VariableExpr(new VarIdentifier("$result", 0));
@@ -204,17 +208,25 @@
                 FLWOGRExpression body = new FLWOGRExpression(clauseList, useSubscriptionVar);
 
                 metadataProvider.setResultSetId(new ResultSetId(resultSetIdCounter++));
-                metadataProvider.setResultAsyncMode(
-                        resultDelivery == ResultDelivery.ASYNC || resultDelivery == ResultDelivery.DEFERRED);
+                boolean resultsAsync =
+                        resultDelivery == ResultDelivery.ASYNC || resultDelivery == ResultDelivery.DEFERRED;
+                metadataProvider.setResultAsyncMode(resultsAsync);
+                tempMdProvider.setResultSetId(metadataProvider.getResultSetId());
+                tempMdProvider.setResultAsyncMode(resultsAsync);
+                tempMdProvider.setWriterFactory(metadataProvider.getWriterFactory());
+                tempMdProvider
+                        .setResultSerializerFactoryProvider(metadataProvider.getResultSerializerFactoryProvider());
+                tempMdProvider.setOutputFile(metadataProvider.getOutputFile());
+
                 InsertStatement insert = new InsertStatement(new Identifier(dataverse),
-                        new Identifier(subscriptionsDatasetName), subscriptionTuple, varCounter, resultVar,
- body);
-                ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider, insert, hcc, hdc,
+                        new Identifier(subscriptionsDatasetName), subscriptionTuple, varCounter, resultVar, body);
+                ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(tempMdProvider, insert, hcc, hdc,
                         resultDelivery, stats, false, null, null);
             } else {
+                //To update an existing subscription
                 UpsertStatement upsert = new UpsertStatement(new Identifier(dataverse),
                         new Identifier(subscriptionsDatasetName), subscriptionTuple, varCounter, null, null);
-                ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider, upsert, hcc, hdc,
+                ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(tempMdProvider, upsert, hcc, hdc,
                         resultDelivery, stats, false, null, null);
             }
 
@@ -222,6 +234,8 @@
         } catch (Exception e) {
             QueryTranslator.abort(e, e, mdTxnCtx);
             throw new HyracksDataException(e);
+        } finally {
+            metadataProvider.getLocks().unlock();
         }
 
     }
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
index 1558508..538a5ea 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
@@ -60,18 +60,14 @@
     private final String subscriptionId;
     private final int varCounter;
     private VariableExpr vars;
-    private List<String> dataverses;
-    private List<String> datasets;
 
     public ChannelUnsubscribeStatement(VariableExpr vars, Identifier dataverseName, Identifier channelName,
-            String subscriptionId, int varCounter, List<String> dataverses, List<String> datasets) {
+            String subscriptionId, int varCounter) {
         this.vars = vars;
         this.channelName = channelName;
         this.dataverseName = dataverseName;
         this.subscriptionId = subscriptionId;
         this.varCounter = varCounter;
-        this.dataverses = dataverses;
-        this.datasets = datasets;
     }
 
     public Identifier getDataverseName() {
@@ -90,14 +86,6 @@
         return subscriptionId;
     }
 
-    public List<String> getDataverses() {
-        return dataverses;
-    }
-
-    public List<String> getDatasets() {
-        return datasets;
-    }
-
     public int getVarCounter() {
         return varCounter;
     }
@@ -152,15 +140,19 @@
             condition.addOperand(UUIDCall);
 
             DeleteStatement delete = new DeleteStatement(vars, new Identifier(dataverse),
-                    new Identifier(subscriptionsDatasetName), condition, varCounter, dataverses, datasets);
+                    new Identifier(subscriptionsDatasetName), condition, varCounter);
             AqlDeleteRewriteVisitor visitor = new AqlDeleteRewriteVisitor();
             delete.accept(visitor, null);
-
-            ((QueryTranslator) statementExecutor).handleDeleteStatement(metadataProvider, delete, hcc, false);
+            MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getDefaultDataverse(),
+                    metadataProvider.getStorageComponentProvider());
+            tempMdProvider.setConfig(metadataProvider.getConfig());
+            ((QueryTranslator) statementExecutor).handleDeleteStatement(tempMdProvider, delete, hcc, false);
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
         } catch (Exception e) {
             QueryTranslator.abort(e, e, mdTxnCtx);
             throw new HyracksDataException(e);
+        } finally {
+            metadataProvider.getLocks().unlock();
         }
     }
 }
\ No newline at end of file
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateBrokerStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateBrokerStatement.java
index 2c60a9d..b4f3eae 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateBrokerStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateBrokerStatement.java
@@ -101,6 +101,8 @@
             }
             LOGGER.log(Level.WARNING, "Failed creating a broker", e);
             throw new HyracksDataException(e);
+        } finally {
+            metadataProvider.getLocks().unlock();
         }
     }
 }
\ No newline at end of file
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
index a5e9c7c..f138d4f 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
@@ -197,6 +197,7 @@
         //Run both statements to create datasets
         ((QueryTranslator) statementExecutor).handleCreateDatasetStatement(metadataProvider, createSubscriptionsDataset,
                 hcc);
+        metadataProvider.getLocks().reset();
         ((QueryTranslator) statementExecutor).handleCreateDatasetStatement(metadataProvider, createResultsDataset, hcc);
 
     }
@@ -299,14 +300,16 @@
             if (MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverse, resultsName.getValue()) != null) {
                 throw new AsterixException("The channel name:" + channelName + " is not available.");
             }
-
+            MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getDefaultDataverse(),
+                    metadataProvider.getStorageComponentProvider());
+            tempMdProvider.setConfig(metadataProvider.getConfig());
             //Create Channel Datasets
-            createDatasets(statementExecutor, subscriptionsName, resultsName, metadataProvider, hcc, hdc, stats,
+            createDatasets(statementExecutor, subscriptionsName, resultsName, tempMdProvider, hcc, hdc, stats,
                     dataverse);
-
+            tempMdProvider.getLocks().reset();
             //Create Channel Internal Job
             JobSpecification channeljobSpec = createChannelJob(statementExecutor, subscriptionsName, resultsName,
-                    metadataProvider, hcc, hdc, stats, dataverse);
+                    tempMdProvider, hcc, hdc, stats, dataverse);
 
             // Now we subscribe
             if (listener == null) {
@@ -332,6 +335,8 @@
             }
             LOGGER.log(Level.WARNING, "Failed creating a channel", e);
             throw new HyracksDataException(e);
+        } finally {
+            metadataProvider.getLocks().unlock();
         }
 
     }
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
index 7a1bc54..1db9b26 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
@@ -140,10 +140,29 @@
         durationParser.parse(duration.toCharArray(), 0, duration.toCharArray().length, outputStream);
     }
 
+    private JobSpecification compileQueryJob(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
+            IHyracksClientConnection hcc, Query q) throws Exception {
+        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+        boolean bActiveTxn = true;
+        metadataProvider.setMetadataTxnContext(mdTxnCtx);
+        JobSpecification jobSpec = null;
+        try {
+            jobSpec = ((QueryTranslator) statementExecutor).rewriteCompileQuery(hcc, metadataProvider, q, null);
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+            bActiveTxn = false;
+        } catch (Exception e) {
+            LOGGER.log(Level.INFO, e.getMessage(), e);
+            if (bActiveTxn) {
+                ((QueryTranslator) statementExecutor).abort(e, e, mdTxnCtx);
+            }
+            throw e;
+        }
+        return jobSpec;
+    }
+
     private Pair<JobSpecification, PrecompiledType> createProcedureJob(String body,
-            IStatementExecutor statementExecutor,
-            MetadataProvider metadataProvider, IHyracksClientConnection hcc, IHyracksDataset hdc, Stats stats)
-                    throws Exception {
+            IStatementExecutor statementExecutor, MetadataProvider metadataProvider, IHyracksClientConnection hcc,
+            IHyracksDataset hdc, Stats stats) throws Exception {
         StringBuilder builder = new StringBuilder();
         builder.append(body);
         builder.append(";");
@@ -157,8 +176,11 @@
                             fStatements.get(0), hcc, hdc, ResultDelivery.ASYNC, stats, true, null, null),
                     PrecompiledType.INSERT);
         } else if (fStatements.get(0).getKind() == Statement.Kind.QUERY) {
-            return new Pair<>(((QueryTranslator) statementExecutor).rewriteCompileQuery(hcc, metadataProvider,
-                    (Query) fStatements.get(0), null), PrecompiledType.QUERY);
+            Pair<JobSpecification, PrecompiledType> pair =
+                    new Pair<>(compileQueryJob(statementExecutor, metadataProvider, hcc, (Query) fStatements.get(0)),
+                            PrecompiledType.QUERY);
+            metadataProvider.getLocks().unlock();
+            return pair;
         } else if (fStatements.get(0).getKind() == Statement.Kind.DELETE) {
             AqlDeleteRewriteVisitor visitor = new AqlDeleteRewriteVisitor();
             fStatements.get(0).accept(visitor, null);
@@ -170,10 +192,10 @@
     }
 
     private void setupDistributedJob(EntityId entityId, JobSpecification jobSpec, IHyracksClientConnection hcc,
-            PrecompiledJobEventListener listener, MetadataProvider metadataProvider, IHyracksDataset hdc, Stats stats)
+            PrecompiledJobEventListener listener, ResultSetId resultSetId, IHyracksDataset hdc, Stats stats)
                     throws Exception {
         JobId jobId = hcc.distributeJob(jobSpec);
-        listener.storeDistributedInfo(jobId, null, new ResultReader(hdc, jobId, metadataProvider.getResultSetId()));
+        listener.storeDistributedInfo(jobId, null, new ResultReader(hdc, jobId, resultSetId));
     }
 
     @Override
@@ -211,12 +233,22 @@
             procedure = new Procedure(dataverse, signature.getName(), signature.getArity(), getParamList(),
                     Function.RETURNTYPE_VOID, getFunctionBody(), Function.LANGUAGE_AQL, duration);
 
-            metadataProvider.setResultSetId(new ResultSetId(0));
+            MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getDefaultDataverse(),
+                    metadataProvider.getStorageComponentProvider());
+            tempMdProvider.setConfig(metadataProvider.getConfig());
+
             metadataProvider.setResultSetId(new ResultSetId(resultSetIdCounter++));
+            boolean resultsAsync = resultDelivery == ResultDelivery.ASYNC || resultDelivery == ResultDelivery.DEFERRED;
+            metadataProvider.setResultAsyncMode(resultsAsync);
+            tempMdProvider.setResultSetId(metadataProvider.getResultSetId());
+            tempMdProvider.setResultAsyncMode(resultsAsync);
+            tempMdProvider.setWriterFactory(metadataProvider.getWriterFactory());
+            tempMdProvider.setResultSerializerFactoryProvider(metadataProvider.getResultSerializerFactoryProvider());
+            tempMdProvider.setOutputFile(metadataProvider.getOutputFile());
 
             //Create Procedure Internal Job
             Pair<JobSpecification, PrecompiledType> procedureJobSpec =
-                    createProcedureJob(getFunctionBody(), statementExecutor, metadataProvider, hcc, hdc, stats);
+                    createProcedureJob(getFunctionBody(), statementExecutor, tempMdProvider, hcc, hdc, stats);
 
             // Now we subscribe
             if (listener == null) {
@@ -224,8 +256,8 @@
                 listener = new PrecompiledJobEventListener(entityId, procedureJobSpec.second, new ArrayList<>());
                 ActiveJobNotificationHandler.INSTANCE.registerListener(listener);
             }
-
-            setupDistributedJob(entityId, procedureJobSpec.first, hcc, listener, metadataProvider, hdc, stats);
+            setupDistributedJob(entityId, procedureJobSpec.first, hcc, listener, tempMdProvider.getResultSetId(), hdc,
+                    stats);
 
             MetadataManager.INSTANCE.addEntity(mdTxnCtx, procedure);
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -235,6 +267,8 @@
             }
             LOGGER.log(Level.WARNING, "Failed creating a procedure", e);
             throw new HyracksDataException(e);
+        } finally {
+            metadataProvider.getLocks().unlock();
         }
 
     }
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
index 7bd00c1..1a319a1 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
@@ -136,6 +136,8 @@
                 QueryTranslator.abort(e, e, mdTxnCtx);
             }
             throw new HyracksDataException(e);
+        } finally {
+            metadataProvider.getLocks().unlock();
         }
     }
 
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java
index 9fe8a83..abdf90a 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java
@@ -125,6 +125,8 @@
                 QueryTranslator.abort(e, e, mdTxnCtx);
             }
             throw new HyracksDataException(e);
+        } finally {
+            metadataProvider.getLocks().unlock();
         }
     }
 
diff --git a/asterix-bad/src/main/resources/lang-extension/lang.txt b/asterix-bad/src/main/resources/lang-extension/lang.txt
index 2001988..adce6ed 100644
--- a/asterix-bad/src/main/resources/lang-extension/lang.txt
+++ b/asterix-bad/src/main/resources/lang-extension/lang.txt
@@ -226,19 +226,12 @@
    }
    | "unsubscribe" id = StringLiteral() <FROM> nameComponents = QualifiedName()
       {
-        setDataverses(new ArrayList<String>());
-        setDatasets(new ArrayList<String>());
         VariableExpr varExp = new VariableExpr();
         VarIdentifier var = new VarIdentifier();
         varExp.setVar(var);
         var.setValue("$subscriptionPlaceholder");
         getCurrentScope().addNewVarSymbolToScope(varExp.getVar());
-        List<String> dataverses = getDataverses();
-        List<String> datasets = getDatasets();
-        // we remove the pointer to the dataverses and datasets
-        setDataverses(null);
-        setDatasets(null);
-        stmt = new ChannelUnsubscribeStatement(varExp, nameComponents.first, nameComponents.second, id, getVarCounter(), dataverses, datasets);
+        stmt = new ChannelUnsubscribeStatement(varExp, nameComponents.first, nameComponents.second, id, getVarCounter());
       }
      | "change" "subscription" subscriptionId = StringLiteral()  <ON> nameComponents = QualifiedName()
        <LEFTPAREN> (tmp = Expression()