Added long-term test to verify pre-distributed jobs fix
Fixed bug from master change to DeleteStatement
Fixed Lock Management in BAD
Change-Id: I99e799e203f6ca6082f9c90f04e606c436eb00ee
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
index 9be02f6..5cc6444 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
@@ -59,21 +59,27 @@
metadataProvider.setMetadataTxnContext(mdTxnCtx);
Identifier dvId = ((DataverseDropStatement) stmt).getDataverseName();
List<Broker> brokers = BADLangExtension.getBrokers(mdTxnCtx, dvId.getValue());
+ MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getDefaultDataverse(),
+ metadataProvider.getStorageComponentProvider());
+ tempMdProvider.setConfig(metadataProvider.getConfig());
for (Broker broker : brokers) {
+ tempMdProvider.getLocks().reset();
BrokerDropStatement drop = new BrokerDropStatement(dvId, new Identifier(broker.getBrokerName()), false);
- drop.handle(this, metadataProvider, hcc, null, null, null, 0);
+ drop.handle(this, tempMdProvider, hcc, null, null, null, 0);
}
List<Channel> channels = BADLangExtension.getChannels(mdTxnCtx, dvId.getValue());
for (Channel channel : channels) {
+ tempMdProvider.getLocks().reset();
ChannelDropStatement drop = new ChannelDropStatement(dvId,
new Identifier(channel.getChannelId().getEntityName()), false);
- drop.handle(this, metadataProvider, hcc, null, null, null, 0);
+ drop.handle(this, tempMdProvider, hcc, null, null, null, 0);
}
List<Procedure> procedures = BADLangExtension.getProcedures(mdTxnCtx, dvId.getValue());
for (Procedure procedure : procedures) {
+ tempMdProvider.getLocks().reset();
ProcedureDropStatement drop = new ProcedureDropStatement(new FunctionSignature(dvId.getValue(),
procedure.getEntityId().getEntityName(), procedure.getArity()), false);
- drop.handle(this, metadataProvider, hcc, null, null, null, 0);
+ drop.handle(this, tempMdProvider, hcc, null, null, null, 0);
}
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/BrokerDropStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/BrokerDropStatement.java
index 018e211..5b81903 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/BrokerDropStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/BrokerDropStatement.java
@@ -94,6 +94,8 @@
} catch (Exception e) {
QueryTranslator.abort(e, e, mdTxnCtx);
throw new HyracksDataException(e);
+ } finally {
+ metadataProvider.getLocks().unlock();
}
}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
index 89b0e9a..854ae07 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
@@ -116,16 +116,20 @@
hcc.destroyJob(hyracksJobId);
}
+ //Create a metadata provider to use in nested jobs.
+ MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getDefaultDataverse(),
+ metadataProvider.getStorageComponentProvider());
+ tempMdProvider.setConfig(metadataProvider.getConfig());
//Drop the Channel Datasets
//TODO: Need to find some way to handle if this fails.
//TODO: Prevent datasets for Channels from being dropped elsewhere
DropDatasetStatement dropStmt = new DropDatasetStatement(new Identifier(dataverse),
new Identifier(channel.getResultsDatasetName()), true);
- ((QueryTranslator) statementExecutor).handleDatasetDropStatement(metadataProvider, dropStmt, hcc);
-
+ ((QueryTranslator) statementExecutor).handleDatasetDropStatement(tempMdProvider, dropStmt, hcc);
+ tempMdProvider.getLocks().reset();
dropStmt = new DropDatasetStatement(new Identifier(dataverse),
new Identifier(channel.getSubscriptionsDataset()), true);
- ((QueryTranslator) statementExecutor).handleDatasetDropStatement(metadataProvider, dropStmt, hcc);
+ ((QueryTranslator) statementExecutor).handleDatasetDropStatement(tempMdProvider, dropStmt, hcc);
//Remove the Channel Metadata
@@ -137,6 +141,8 @@
QueryTranslator.abort(e, e, mdTxnCtx);
}
throw new HyracksDataException(e);
+ } finally {
+ metadataProvider.getLocks().unlock();
}
}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
index bb3cef2..5c5cdf1 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
@@ -189,8 +189,12 @@
subscriptionTuple.setVarCounter(varCounter);
- if (subscriptionId == null) {
+ MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getDefaultDataverse(),
+ metadataProvider.getStorageComponentProvider());
+ tempMdProvider.setConfig(metadataProvider.getConfig());
+ if (subscriptionId == null) {
+ //To create a new subscription
VariableExpr subscriptionVar = new VariableExpr(new VarIdentifier("$sub", 1));
VariableExpr useSubscriptionVar = new VariableExpr(new VarIdentifier("$sub", 1));
VariableExpr resultVar = new VariableExpr(new VarIdentifier("$result", 0));
@@ -204,17 +208,25 @@
FLWOGRExpression body = new FLWOGRExpression(clauseList, useSubscriptionVar);
metadataProvider.setResultSetId(new ResultSetId(resultSetIdCounter++));
- metadataProvider.setResultAsyncMode(
- resultDelivery == ResultDelivery.ASYNC || resultDelivery == ResultDelivery.DEFERRED);
+ boolean resultsAsync =
+ resultDelivery == ResultDelivery.ASYNC || resultDelivery == ResultDelivery.DEFERRED;
+ metadataProvider.setResultAsyncMode(resultsAsync);
+ tempMdProvider.setResultSetId(metadataProvider.getResultSetId());
+ tempMdProvider.setResultAsyncMode(resultsAsync);
+ tempMdProvider.setWriterFactory(metadataProvider.getWriterFactory());
+ tempMdProvider
+ .setResultSerializerFactoryProvider(metadataProvider.getResultSerializerFactoryProvider());
+ tempMdProvider.setOutputFile(metadataProvider.getOutputFile());
+
InsertStatement insert = new InsertStatement(new Identifier(dataverse),
- new Identifier(subscriptionsDatasetName), subscriptionTuple, varCounter, resultVar,
- body);
- ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider, insert, hcc, hdc,
+ new Identifier(subscriptionsDatasetName), subscriptionTuple, varCounter, resultVar, body);
+ ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(tempMdProvider, insert, hcc, hdc,
resultDelivery, stats, false, null, null);
} else {
+ //To update an existing subscription
UpsertStatement upsert = new UpsertStatement(new Identifier(dataverse),
new Identifier(subscriptionsDatasetName), subscriptionTuple, varCounter, null, null);
- ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider, upsert, hcc, hdc,
+ ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(tempMdProvider, upsert, hcc, hdc,
resultDelivery, stats, false, null, null);
}
@@ -222,6 +234,8 @@
} catch (Exception e) {
QueryTranslator.abort(e, e, mdTxnCtx);
throw new HyracksDataException(e);
+ } finally {
+ metadataProvider.getLocks().unlock();
}
}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
index 1558508..538a5ea 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
@@ -60,18 +60,14 @@
private final String subscriptionId;
private final int varCounter;
private VariableExpr vars;
- private List<String> dataverses;
- private List<String> datasets;
public ChannelUnsubscribeStatement(VariableExpr vars, Identifier dataverseName, Identifier channelName,
- String subscriptionId, int varCounter, List<String> dataverses, List<String> datasets) {
+ String subscriptionId, int varCounter) {
this.vars = vars;
this.channelName = channelName;
this.dataverseName = dataverseName;
this.subscriptionId = subscriptionId;
this.varCounter = varCounter;
- this.dataverses = dataverses;
- this.datasets = datasets;
}
public Identifier getDataverseName() {
@@ -90,14 +86,6 @@
return subscriptionId;
}
- public List<String> getDataverses() {
- return dataverses;
- }
-
- public List<String> getDatasets() {
- return datasets;
- }
-
public int getVarCounter() {
return varCounter;
}
@@ -152,15 +140,19 @@
condition.addOperand(UUIDCall);
DeleteStatement delete = new DeleteStatement(vars, new Identifier(dataverse),
- new Identifier(subscriptionsDatasetName), condition, varCounter, dataverses, datasets);
+ new Identifier(subscriptionsDatasetName), condition, varCounter);
AqlDeleteRewriteVisitor visitor = new AqlDeleteRewriteVisitor();
delete.accept(visitor, null);
-
- ((QueryTranslator) statementExecutor).handleDeleteStatement(metadataProvider, delete, hcc, false);
+ MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getDefaultDataverse(),
+ metadataProvider.getStorageComponentProvider());
+ tempMdProvider.setConfig(metadataProvider.getConfig());
+ ((QueryTranslator) statementExecutor).handleDeleteStatement(tempMdProvider, delete, hcc, false);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
} catch (Exception e) {
QueryTranslator.abort(e, e, mdTxnCtx);
throw new HyracksDataException(e);
+ } finally {
+ metadataProvider.getLocks().unlock();
}
}
}
\ No newline at end of file
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateBrokerStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateBrokerStatement.java
index 2c60a9d..b4f3eae 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateBrokerStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateBrokerStatement.java
@@ -101,6 +101,8 @@
}
LOGGER.log(Level.WARNING, "Failed creating a broker", e);
throw new HyracksDataException(e);
+ } finally {
+ metadataProvider.getLocks().unlock();
}
}
}
\ No newline at end of file
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
index a5e9c7c..f138d4f 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
@@ -197,6 +197,7 @@
//Run both statements to create datasets
((QueryTranslator) statementExecutor).handleCreateDatasetStatement(metadataProvider, createSubscriptionsDataset,
hcc);
+ metadataProvider.getLocks().reset();
((QueryTranslator) statementExecutor).handleCreateDatasetStatement(metadataProvider, createResultsDataset, hcc);
}
@@ -299,14 +300,16 @@
if (MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverse, resultsName.getValue()) != null) {
throw new AsterixException("The channel name:" + channelName + " is not available.");
}
-
+ MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getDefaultDataverse(),
+ metadataProvider.getStorageComponentProvider());
+ tempMdProvider.setConfig(metadataProvider.getConfig());
//Create Channel Datasets
- createDatasets(statementExecutor, subscriptionsName, resultsName, metadataProvider, hcc, hdc, stats,
+ createDatasets(statementExecutor, subscriptionsName, resultsName, tempMdProvider, hcc, hdc, stats,
dataverse);
-
+ tempMdProvider.getLocks().reset();
//Create Channel Internal Job
JobSpecification channeljobSpec = createChannelJob(statementExecutor, subscriptionsName, resultsName,
- metadataProvider, hcc, hdc, stats, dataverse);
+ tempMdProvider, hcc, hdc, stats, dataverse);
// Now we subscribe
if (listener == null) {
@@ -332,6 +335,8 @@
}
LOGGER.log(Level.WARNING, "Failed creating a channel", e);
throw new HyracksDataException(e);
+ } finally {
+ metadataProvider.getLocks().unlock();
}
}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
index 7a1bc54..1db9b26 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
@@ -140,10 +140,29 @@
durationParser.parse(duration.toCharArray(), 0, duration.toCharArray().length, outputStream);
}
+ private JobSpecification compileQueryJob(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
+ IHyracksClientConnection hcc, Query q) throws Exception {
+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ boolean bActiveTxn = true;
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ JobSpecification jobSpec = null;
+ try {
+ jobSpec = ((QueryTranslator) statementExecutor).rewriteCompileQuery(hcc, metadataProvider, q, null);
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ bActiveTxn = false;
+ } catch (Exception e) {
+ LOGGER.log(Level.INFO, e.getMessage(), e);
+ if (bActiveTxn) {
+ ((QueryTranslator) statementExecutor).abort(e, e, mdTxnCtx);
+ }
+ throw e;
+ }
+ return jobSpec;
+ }
+
private Pair<JobSpecification, PrecompiledType> createProcedureJob(String body,
- IStatementExecutor statementExecutor,
- MetadataProvider metadataProvider, IHyracksClientConnection hcc, IHyracksDataset hdc, Stats stats)
- throws Exception {
+ IStatementExecutor statementExecutor, MetadataProvider metadataProvider, IHyracksClientConnection hcc,
+ IHyracksDataset hdc, Stats stats) throws Exception {
StringBuilder builder = new StringBuilder();
builder.append(body);
builder.append(";");
@@ -157,8 +176,11 @@
fStatements.get(0), hcc, hdc, ResultDelivery.ASYNC, stats, true, null, null),
PrecompiledType.INSERT);
} else if (fStatements.get(0).getKind() == Statement.Kind.QUERY) {
- return new Pair<>(((QueryTranslator) statementExecutor).rewriteCompileQuery(hcc, metadataProvider,
- (Query) fStatements.get(0), null), PrecompiledType.QUERY);
+ Pair<JobSpecification, PrecompiledType> pair =
+ new Pair<>(compileQueryJob(statementExecutor, metadataProvider, hcc, (Query) fStatements.get(0)),
+ PrecompiledType.QUERY);
+ metadataProvider.getLocks().unlock();
+ return pair;
} else if (fStatements.get(0).getKind() == Statement.Kind.DELETE) {
AqlDeleteRewriteVisitor visitor = new AqlDeleteRewriteVisitor();
fStatements.get(0).accept(visitor, null);
@@ -170,10 +192,10 @@
}
private void setupDistributedJob(EntityId entityId, JobSpecification jobSpec, IHyracksClientConnection hcc,
- PrecompiledJobEventListener listener, MetadataProvider metadataProvider, IHyracksDataset hdc, Stats stats)
+ PrecompiledJobEventListener listener, ResultSetId resultSetId, IHyracksDataset hdc, Stats stats)
throws Exception {
JobId jobId = hcc.distributeJob(jobSpec);
- listener.storeDistributedInfo(jobId, null, new ResultReader(hdc, jobId, metadataProvider.getResultSetId()));
+ listener.storeDistributedInfo(jobId, null, new ResultReader(hdc, jobId, resultSetId));
}
@Override
@@ -211,12 +233,22 @@
procedure = new Procedure(dataverse, signature.getName(), signature.getArity(), getParamList(),
Function.RETURNTYPE_VOID, getFunctionBody(), Function.LANGUAGE_AQL, duration);
- metadataProvider.setResultSetId(new ResultSetId(0));
+ MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getDefaultDataverse(),
+ metadataProvider.getStorageComponentProvider());
+ tempMdProvider.setConfig(metadataProvider.getConfig());
+
metadataProvider.setResultSetId(new ResultSetId(resultSetIdCounter++));
+ boolean resultsAsync = resultDelivery == ResultDelivery.ASYNC || resultDelivery == ResultDelivery.DEFERRED;
+ metadataProvider.setResultAsyncMode(resultsAsync);
+ tempMdProvider.setResultSetId(metadataProvider.getResultSetId());
+ tempMdProvider.setResultAsyncMode(resultsAsync);
+ tempMdProvider.setWriterFactory(metadataProvider.getWriterFactory());
+ tempMdProvider.setResultSerializerFactoryProvider(metadataProvider.getResultSerializerFactoryProvider());
+ tempMdProvider.setOutputFile(metadataProvider.getOutputFile());
//Create Procedure Internal Job
Pair<JobSpecification, PrecompiledType> procedureJobSpec =
- createProcedureJob(getFunctionBody(), statementExecutor, metadataProvider, hcc, hdc, stats);
+ createProcedureJob(getFunctionBody(), statementExecutor, tempMdProvider, hcc, hdc, stats);
// Now we subscribe
if (listener == null) {
@@ -224,8 +256,8 @@
listener = new PrecompiledJobEventListener(entityId, procedureJobSpec.second, new ArrayList<>());
ActiveJobNotificationHandler.INSTANCE.registerListener(listener);
}
-
- setupDistributedJob(entityId, procedureJobSpec.first, hcc, listener, metadataProvider, hdc, stats);
+ setupDistributedJob(entityId, procedureJobSpec.first, hcc, listener, tempMdProvider.getResultSetId(), hdc,
+ stats);
MetadataManager.INSTANCE.addEntity(mdTxnCtx, procedure);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -235,6 +267,8 @@
}
LOGGER.log(Level.WARNING, "Failed creating a procedure", e);
throw new HyracksDataException(e);
+ } finally {
+ metadataProvider.getLocks().unlock();
}
}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
index 7bd00c1..1a319a1 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
@@ -136,6 +136,8 @@
QueryTranslator.abort(e, e, mdTxnCtx);
}
throw new HyracksDataException(e);
+ } finally {
+ metadataProvider.getLocks().unlock();
}
}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java
index 9fe8a83..abdf90a 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java
@@ -125,6 +125,8 @@
QueryTranslator.abort(e, e, mdTxnCtx);
}
throw new HyracksDataException(e);
+ } finally {
+ metadataProvider.getLocks().unlock();
}
}
diff --git a/asterix-bad/src/main/resources/lang-extension/lang.txt b/asterix-bad/src/main/resources/lang-extension/lang.txt
index 2001988..adce6ed 100644
--- a/asterix-bad/src/main/resources/lang-extension/lang.txt
+++ b/asterix-bad/src/main/resources/lang-extension/lang.txt
@@ -226,19 +226,12 @@
}
| "unsubscribe" id = StringLiteral() <FROM> nameComponents = QualifiedName()
{
- setDataverses(new ArrayList<String>());
- setDatasets(new ArrayList<String>());
VariableExpr varExp = new VariableExpr();
VarIdentifier var = new VarIdentifier();
varExp.setVar(var);
var.setValue("$subscriptionPlaceholder");
getCurrentScope().addNewVarSymbolToScope(varExp.getVar());
- List<String> dataverses = getDataverses();
- List<String> datasets = getDatasets();
- // we remove the pointer to the dataverses and datasets
- setDataverses(null);
- setDatasets(null);
- stmt = new ChannelUnsubscribeStatement(varExp, nameComponents.first, nameComponents.second, id, getVarCounter(), dataverses, datasets);
+ stmt = new ChannelUnsubscribeStatement(varExp, nameComponents.first, nameComponents.second, id, getVarCounter());
}
| "change" "subscription" subscriptionId = StringLiteral() <ON> nameComponents = QualifiedName()
<LEFTPAREN> (tmp = Expression()