Coordinated change to adopt multipart dataverse names
Change-Id: Ib9378478b416e2dc633801a560488fb490ef8596
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java b/asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java
index 0467f6e..a787778 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java
@@ -18,6 +18,8 @@
*/
package org.apache.asterix.bad;
+import org.apache.asterix.common.metadata.DataverseName;
+
public interface BADConstants {
String SubscriptionId = "subscriptionId";
String BrokerName = "BrokerName";
@@ -41,7 +43,6 @@
String subscriptionEnding = "Subscriptions";
String resultsEnding = "Results";
String BAD_METADATA_EXTENSION_NAME = "BADMetadataExtension";
- String BAD_DATAVERSE_NAME = "Metadata";
String Duration = "Duration";
String Function = "Function";
String FIELD_NAME_ARITY = "Arity";
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/BADJobService.java b/asterix-bad/src/main/java/org/apache/asterix/bad/BADJobService.java
index cd3883a..6378ee6 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/BADJobService.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/BADJobService.java
@@ -99,7 +99,7 @@
}
} catch (Exception e) {
LOGGER.log(Level.SEVERE, "Job Failed to run for " + entityId.getExtensionName() + " "
- + entityId.getDataverse() + "." + entityId.getEntityName() + ".", e);
+ + entityId.getDataverseName() + "." + entityId.getEntityName() + ".", e);
}
}
}, period, period, TimeUnit.MILLISECONDS);
@@ -113,7 +113,7 @@
null, listener, null);
if (executionMilliseconds > period) {
LOGGER.log(Level.SEVERE,
- "Periodic job for " + entityId.getExtensionName() + " " + entityId.getDataverse() + "."
+ "Periodic job for " + entityId.getExtensionName() + " " + entityId.getDataverseName() + "."
+ entityId.getEntityName() + " was unable to meet the required period of " + period
+ " milliseconds. Actually took " + executionMilliseconds + " execution will shutdown"
+ new Date());
@@ -147,8 +147,9 @@
}
LOGGER.log(Level.SEVERE,
- "Deployed Job execution completed for " + entityId.getExtensionName() + " " + entityId.getDataverse()
- + "." + entityId.getEntityName() + ". Took " + executionMilliseconds + " milliseconds ");
+ "Deployed Job execution completed for " + entityId.getExtensionName() + " "
+ + entityId.getDataverseName() + "." + entityId.getEntityName() + ". Took "
+ + executionMilliseconds + " milliseconds ");
return executionMilliseconds;
@@ -282,7 +283,7 @@
} else if (procedureStatement.getKind() == Statement.Kind.QUERY) {
return compileQueryJob(statementExecutor, metadataProvider, hcc, (Query) procedureStatement);
} else {
- SqlppDeleteRewriteVisitor visitor = new SqlppDeleteRewriteVisitor();
+ SqlppDeleteRewriteVisitor visitor = new SqlppDeleteRewriteVisitor(metadataProvider);
procedureStatement.accept(visitor, null);
return ((QueryTranslator) statementExecutor).handleDeleteStatement(metadataProvider, procedureStatement,
hcc, true, null, null);
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADLangExtension.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADLangExtension.java
index 4a92cf0..4e94508 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADLangExtension.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADLangExtension.java
@@ -33,6 +33,7 @@
import org.apache.asterix.bad.metadata.Procedure;
import org.apache.asterix.bad.metadata.ProcedureSearchKey;
import org.apache.asterix.common.api.ExtensionId;
+import org.apache.asterix.common.metadata.DataverseName;
import org.apache.asterix.compiler.provider.ILangCompilationProvider;
import org.apache.asterix.metadata.MetadataManager;
import org.apache.asterix.metadata.MetadataTransactionContext;
@@ -67,7 +68,7 @@
return ExtensionKind.LANG;
}
- public static Broker getBroker(MetadataTransactionContext mdTxnCtx, String dataverseName, String brokerName)
+ public static Broker getBroker(MetadataTransactionContext mdTxnCtx, DataverseName dataverseName, String brokerName)
throws AlgebricksException {
BrokerSearchKey brokerSearchKey = new BrokerSearchKey(dataverseName, brokerName);
List<Broker> brokers = MetadataManager.INSTANCE.getEntities(mdTxnCtx, brokerSearchKey);
@@ -80,8 +81,8 @@
}
}
- public static Channel getChannel(MetadataTransactionContext mdTxnCtx, String dataverseName, String channelName)
- throws AlgebricksException {
+ public static Channel getChannel(MetadataTransactionContext mdTxnCtx, DataverseName dataverseName,
+ String channelName) throws AlgebricksException {
ChannelSearchKey channelSearchKey = new ChannelSearchKey(dataverseName, channelName);
List<Channel> channels = MetadataManager.INSTANCE.getEntities(mdTxnCtx, channelSearchKey);
if (channels.isEmpty()) {
@@ -93,7 +94,7 @@
}
}
- public static Procedure getProcedure(MetadataTransactionContext mdTxnCtx, String dataverseName,
+ public static Procedure getProcedure(MetadataTransactionContext mdTxnCtx, DataverseName dataverseName,
String procedureName, String arity) throws AlgebricksException {
ProcedureSearchKey procedureSearchKey = new ProcedureSearchKey(dataverseName, procedureName, arity);
List<Procedure> procedures = MetadataManager.INSTANCE.getEntities(mdTxnCtx, procedureSearchKey);
@@ -106,7 +107,7 @@
}
}
- public static List<Broker> getBrokers(MetadataTransactionContext mdTxnCtx, String dataverseName)
+ public static List<Broker> getBrokers(MetadataTransactionContext mdTxnCtx, DataverseName dataverseName)
throws AlgebricksException {
DataverseBrokersSearchKey brokerSearchKey = new DataverseBrokersSearchKey(dataverseName);
return MetadataManager.INSTANCE.getEntities(mdTxnCtx, brokerSearchKey);
@@ -117,13 +118,13 @@
return MetadataManager.INSTANCE.getEntities(mdTxnCtx, channelSearchKey);
}
- public static List<Channel> getChannels(MetadataTransactionContext mdTxnCtx, String dataverseName)
+ public static List<Channel> getChannels(MetadataTransactionContext mdTxnCtx, DataverseName dataverseName)
throws AlgebricksException {
DataverseChannelsSearchKey channelSearchKey = new DataverseChannelsSearchKey(dataverseName);
return MetadataManager.INSTANCE.getEntities(mdTxnCtx, channelSearchKey);
}
- public static List<Procedure> getProcedures(MetadataTransactionContext mdTxnCtx, String dataverseName)
+ public static List<Procedure> getProcedures(MetadataTransactionContext mdTxnCtx, DataverseName dataverseName)
throws AlgebricksException {
DataverseProceduresSearchKey proceduresSearchKey = new DataverseProceduresSearchKey(dataverseName);
return MetadataManager.INSTANCE.getEntities(mdTxnCtx, proceduresSearchKey);
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
index e57a2e5..51923ae 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
@@ -36,6 +36,7 @@
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.functions.FunctionSignature;
+import org.apache.asterix.common.metadata.DataverseName;
import org.apache.asterix.compiler.provider.ILangCompilationProvider;
import org.apache.asterix.lang.common.base.Statement;
import org.apache.asterix.lang.common.statement.CreateIndexStatement;
@@ -53,6 +54,7 @@
import org.apache.asterix.translator.SessionOutput;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.common.utils.Triple;
import org.apache.hyracks.api.client.IHyracksClientConnection;
public class BADStatementExecutor extends QueryTranslator {
@@ -65,15 +67,15 @@
//TODO: Most of this file could go away if we had metadata dependencies
private Pair<List<Channel>, List<Procedure>> checkIfDatasetIsInUse(MetadataTransactionContext mdTxnCtx,
- String dataverse, String dataset, boolean checkAll) throws AlgebricksException {
+ DataverseName dataverse, String dataset, boolean checkAll) throws AlgebricksException {
List<Channel> channelsUsingDataset = new ArrayList<>();
List<Procedure> proceduresUsingDataset = new ArrayList<>();
List<Channel> channels = BADLangExtension.getAllChannels(mdTxnCtx);
for (Channel channel : channels) {
- List<List<List<String>>> dependencies = channel.getDependencies();
- List<List<String>> datasetDependencies = dependencies.get(0);
- for (List<String> dependency : datasetDependencies) {
- if (dependency.get(0).equals(dataverse) && dependency.get(1).equals(dataset)) {
+ List<List<Triple<DataverseName, String, String>>> dependencies = channel.getDependencies();
+ List<Triple<DataverseName, String, String>> datasetDependencies = dependencies.get(0);
+ for (Triple<DataverseName, String, String> dependency : datasetDependencies) {
+ if (dependency.first.equals(dataverse) && dependency.second.equals(dataset)) {
channelsUsingDataset.add(channel);
if (!checkAll) {
return new Pair<>(channelsUsingDataset, proceduresUsingDataset);
@@ -85,10 +87,10 @@
}
List<Procedure> procedures = BADLangExtension.getAllProcedures(mdTxnCtx);
for (Procedure procedure : procedures) {
- List<List<List<String>>> dependencies = procedure.getDependencies();
- List<List<String>> datasetDependencies = dependencies.get(0);
- for (List<String> dependency : datasetDependencies) {
- if (dependency.get(0).equals(dataverse) && dependency.get(1).equals(dataset)) {
+ List<List<Triple<DataverseName, String, String>>> dependencies = procedure.getDependencies();
+ List<Triple<DataverseName, String, String>> datasetDependencies = dependencies.get(0);
+ for (Triple<DataverseName, String, String> dependency : datasetDependencies) {
+ if (dependency.first.equals(dataverse) && dependency.second.equals(dataset)) {
proceduresUsingDataset.add(procedure);
if (!checkAll) {
return new Pair<>(channelsUsingDataset, proceduresUsingDataset);
@@ -101,18 +103,18 @@
}
private Pair<List<Channel>, List<Procedure>> checkIfFunctionIsInUse(MetadataTransactionContext mdTxnCtx,
- String dvId, String function, String arity, boolean checkAll)
+ DataverseName dvId, String function, String arity, boolean checkAll)
throws CompilationException, AlgebricksException {
List<Channel> channelsUsingFunction = new ArrayList<>();
List<Procedure> proceduresUsingFunction = new ArrayList<>();
List<Channel> channels = BADLangExtension.getAllChannels(mdTxnCtx);
for (Channel channel : channels) {
- List<List<List<String>>> dependencies = channel.getDependencies();
- List<List<String>> datasetDependencies = dependencies.get(1);
- for (List<String> dependency : datasetDependencies) {
- if (dependency.get(0).equals(dvId) && dependency.get(1).equals(function)
- && dependency.get(2).equals(arity)) {
+ List<List<Triple<DataverseName, String, String>>> dependencies = channel.getDependencies();
+ List<Triple<DataverseName, String, String>> datasetDependencies = dependencies.get(1);
+ for (Triple<DataverseName, String, String> dependency : datasetDependencies) {
+ if (dependency.first.equals(dvId) && dependency.second.equals(function)
+ && dependency.third.equals(arity)) {
channelsUsingFunction.add(channel);
if (!checkAll) {
return new Pair<>(channelsUsingFunction, proceduresUsingFunction);
@@ -123,11 +125,11 @@
}
List<Procedure> procedures = BADLangExtension.getAllProcedures(mdTxnCtx);
for (Procedure procedure : procedures) {
- List<List<List<String>>> dependencies = procedure.getDependencies();
- List<List<String>> datasetDependencies = dependencies.get(1);
- for (List<String> dependency : datasetDependencies) {
- if (dependency.get(0).equals(dvId) && dependency.get(1).equals(function)
- && dependency.get(2).equals(arity)) {
+ List<List<Triple<DataverseName, String, String>>> dependencies = procedure.getDependencies();
+ List<Triple<DataverseName, String, String>> datasetDependencies = dependencies.get(1);
+ for (Triple<DataverseName, String, String> dependency : datasetDependencies) {
+ if (dependency.first.equals(dvId) && dependency.second.equals(function)
+ && dependency.third.equals(arity)) {
proceduresUsingFunction.add(procedure);
if (!checkAll) {
return new Pair<>(channelsUsingFunction, proceduresUsingFunction);
@@ -139,7 +141,7 @@
return new Pair<>(channelsUsingFunction, proceduresUsingFunction);
}
- private void throwErrorIfDatasetUsed(MetadataTransactionContext mdTxnCtx, String dataverse, String dataset)
+ private void throwErrorIfDatasetUsed(MetadataTransactionContext mdTxnCtx, DataverseName dataverse, String dataset)
throws CompilationException, AlgebricksException {
Pair<List<Channel>, List<Procedure>> dependents = checkIfDatasetIsInUse(mdTxnCtx, dataverse, dataset, false);
if (dependents.first.size() > 0) {
@@ -152,7 +154,7 @@
}
}
- private void throwErrorIfFunctionUsed(MetadataTransactionContext mdTxnCtx, String dataverse, String function,
+ private void throwErrorIfFunctionUsed(MetadataTransactionContext mdTxnCtx, DataverseName dataverse, String function,
String arity, FunctionSignature sig) throws CompilationException, AlgebricksException {
Pair<List<Channel>, List<Procedure>> dependents =
checkIfFunctionIsInUse(mdTxnCtx, dataverse, function, arity, false);
@@ -172,7 +174,7 @@
IHyracksClientConnection hcc, IRequestParameters requestParameters) throws Exception {
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
- String dvId = getActiveDataverse(((DropDatasetStatement) stmt).getDataverseName());
+ DataverseName dvId = getActiveDataverseName(((DropDatasetStatement) stmt).getDataverseName());
Identifier dsId = ((DropDatasetStatement) stmt).getDatasetName();
throwErrorIfDatasetUsed(mdTxnCtx, dvId, dsId.getValue());
@@ -188,17 +190,17 @@
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
//Allow channels to use the new index
- String dvId = getActiveDataverse(((CreateIndexStatement) stmt).getDataverseName());
+ DataverseName dvId = getActiveDataverseName(((CreateIndexStatement) stmt).getDataverseName());
String dsId = ((CreateIndexStatement) stmt).getDatasetName().getValue();
Pair<List<Channel>, List<Procedure>> usages = checkIfDatasetIsInUse(mdTxnCtx, dvId, dsId, true);
List<Dataverse> dataverseList = MetadataManager.INSTANCE.getDataverses(mdTxnCtx);
for (Dataverse dv : dataverseList) {
- List<Function> functions = MetadataManager.INSTANCE.getFunctions(mdTxnCtx, dv.getDataverseName());
+ List<Function> functions = MetadataManager.INSTANCE.getDataverseFunctions(mdTxnCtx, dv.getDataverseName());
for (Function function : functions) {
- for (List<String> datasetDependency : function.getDependencies().get(0)) {
- if (datasetDependency.get(0).equals(dvId) && datasetDependency.get(1).equals(dsId)) {
+ for (Triple<DataverseName, String, String> datasetDependency : function.getDependencies().get(0)) {
+ if (datasetDependency.first.equals(dvId) && datasetDependency.second.equals(dsId)) {
Pair<List<Channel>, List<Procedure>> functionUsages =
checkIfFunctionIsInUse(mdTxnCtx, function.getDataverseName(), function.getName(),
Integer.toString(function.getArity()), true);
@@ -257,17 +259,17 @@
IHyracksClientConnection hcc, IRequestParameters requestParameters) throws Exception {
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
- String dvId = getActiveDataverse(((IndexDropStatement) stmt).getDataverseName());
+ DataverseName dvId = getActiveDataverseName(((IndexDropStatement) stmt).getDataverseName());
Identifier dsId = ((IndexDropStatement) stmt).getDatasetName();
throwErrorIfDatasetUsed(mdTxnCtx, dvId, dsId.getValue());
List<Dataverse> dataverseList = MetadataManager.INSTANCE.getDataverses(mdTxnCtx);
for (Dataverse dv : dataverseList) {
- List<Function> functions = MetadataManager.INSTANCE.getFunctions(mdTxnCtx, dv.getDataverseName());
+ List<Function> functions = MetadataManager.INSTANCE.getDataverseFunctions(mdTxnCtx, dv.getDataverseName());
for (Function function : functions) {
- for (List<String> datasetDependency : function.getDependencies().get(0)) {
- if (datasetDependency.get(0).equals(dvId) && datasetDependency.get(1).equals(dsId.getValue())) {
+ for (Triple<DataverseName, String, String> datasetDependency : function.getDependencies().get(0)) {
+ if (datasetDependency.first.equals(dvId) && datasetDependency.second.equals(dsId.getValue())) {
throwErrorIfFunctionUsed(mdTxnCtx, function.getDataverseName(), function.getName(),
Integer.toString(function.getArity()), null);
}
@@ -285,7 +287,7 @@
metadataProvider.setMetadataTxnContext(mdTxnCtx);
FunctionSignature sig = ((FunctionDropStatement) stmt).getFunctionSignature();
- String dvId = getActiveDataverseName(sig.getNamespace());
+ DataverseName dvId = getActiveDataverseName(sig.getDataverseName());
String function = sig.getName();
String arity = Integer.toString(sig.getArity());
@@ -300,41 +302,41 @@
IHyracksClientConnection hcc, IRequestParameters requestParameters) throws Exception {
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
- Identifier dvId = ((DataverseDropStatement) stmt).getDataverseName();
+ DataverseName dvId = ((DataverseDropStatement) stmt).getDataverseName();
MetadataProvider tempMdProvider = new MetadataProvider(appCtx, metadataProvider.getDefaultDataverse());
tempMdProvider.getConfig().putAll(metadataProvider.getConfig());
List<Channel> channels = BADLangExtension.getAllChannels(mdTxnCtx);
for (Channel channel : channels) {
- if (channel.getChannelId().getDataverse().equals(dvId.getValue())) {
+ if (channel.getChannelId().getDataverseName().equals(dvId)) {
continue;
}
- List<List<List<String>>> dependencies = channel.getDependencies();
- for (List<List<String>> dependencyList : dependencies) {
- for (List<String> dependency : dependencyList) {
- if (dependency.get(0).equals(dvId.getValue())) {
- throw new CompilationException("Cannot drop dataverse " + dvId.getValue() + ". "
- + channel.getChannelId() + " depends on it!");
+ List<List<Triple<DataverseName, String, String>>> dependencies = channel.getDependencies();
+ for (List<Triple<DataverseName, String, String>> dependencyList : dependencies) {
+ for (Triple<DataverseName, String, String> dependency : dependencyList) {
+ if (dependency.first.equals(dvId)) {
+ throw new CompilationException(
+ "Cannot drop dataverse " + dvId + ". " + channel.getChannelId() + " depends on it!");
}
}
}
}
List<Procedure> procedures = BADLangExtension.getAllProcedures(mdTxnCtx);
for (Procedure procedure : procedures) {
- if (procedure.getEntityId().getDataverse().equals(dvId.getValue())) {
+ if (procedure.getEntityId().getDataverseName().equals(dvId)) {
continue;
}
- List<List<List<String>>> dependencies = procedure.getDependencies();
- for (List<List<String>> dependencyList : dependencies) {
- for (List<String> dependency : dependencyList) {
- if (dependency.get(0).equals(dvId.getValue())) {
- throw new CompilationException("Cannot drop dataverse " + dvId.getValue() + ". "
- + procedure.getEntityId() + " depends on it!");
+ List<List<Triple<DataverseName, String, String>>> dependencies = procedure.getDependencies();
+ for (List<Triple<DataverseName, String, String>> dependencyList : dependencies) {
+ for (Triple<DataverseName, String, String> dependency : dependencyList) {
+ if (dependency.first.equals(dvId)) {
+ throw new CompilationException(
+ "Cannot drop dataverse " + dvId + ". " + procedure.getEntityId() + " depends on it!");
}
}
}
}
for (Channel channel : channels) {
- if (!channel.getChannelId().getDataverse().equals(dvId.getValue())) {
+ if (!channel.getChannelId().getDataverseName().equals(dvId)) {
continue;
}
tempMdProvider.getLocks().reset();
@@ -343,15 +345,15 @@
drop.handle(hcc, this, requestParameters, tempMdProvider, 0);
}
for (Procedure procedure : procedures) {
- if (!procedure.getEntityId().getDataverse().equals(dvId.getValue())) {
+ if (!procedure.getEntityId().getDataverseName().equals(dvId)) {
continue;
}
tempMdProvider.getLocks().reset();
- ProcedureDropStatement drop = new ProcedureDropStatement(new FunctionSignature(dvId.getValue(),
- procedure.getEntityId().getEntityName(), procedure.getArity()), false);
+ ProcedureDropStatement drop = new ProcedureDropStatement(
+ new FunctionSignature(dvId, procedure.getEntityId().getEntityName(), procedure.getArity()), false);
drop.handle(hcc, this, requestParameters, tempMdProvider, 0);
}
- List<Broker> brokers = BADLangExtension.getBrokers(mdTxnCtx, dvId.getValue());
+ List<Broker> brokers = BADLangExtension.getBrokers(mdTxnCtx, dvId);
for (Broker broker : brokers) {
tempMdProvider.getLocks().reset();
BrokerDropStatement drop = new BrokerDropStatement(dvId, new Identifier(broker.getBrokerName()), false);
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/BrokerDropStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/BrokerDropStatement.java
index 4f68f40..26c315a 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/BrokerDropStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/BrokerDropStatement.java
@@ -23,6 +23,7 @@
import org.apache.asterix.bad.lang.BADLangExtension;
import org.apache.asterix.bad.metadata.Broker;
import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.metadata.DataverseName;
import org.apache.asterix.lang.common.struct.Identifier;
import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
import org.apache.asterix.metadata.MetadataManager;
@@ -36,11 +37,11 @@
public class BrokerDropStatement extends ExtensionStatement {
- private final Identifier dataverseName;
+ private final DataverseName dataverseName;
private final Identifier brokerName;
private boolean ifExists;
- public BrokerDropStatement(Identifier dataverseName, Identifier brokerName, boolean ifExists) {
+ public BrokerDropStatement(DataverseName dataverseName, Identifier brokerName, boolean ifExists) {
this.brokerName = brokerName;
this.dataverseName = dataverseName;
this.ifExists = ifExists;
@@ -50,7 +51,7 @@
return ifExists;
}
- public Identifier getDataverseName() {
+ public DataverseName getDataverseName() {
return dataverseName;
}
@@ -73,7 +74,7 @@
IRequestParameters requestParameters, MetadataProvider metadataProvider, int resultSetId)
throws HyracksDataException, AlgebricksException {
//TODO: dont drop a broker that's being used
- String dataverse = ((QueryTranslator) statementExecutor).getActiveDataverse(dataverseName);
+ DataverseName dataverse = statementExecutor.getActiveDataverseName(dataverseName);
MetadataTransactionContext mdTxnCtx = null;
try {
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
index 596b436..3a50777 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
@@ -32,6 +32,7 @@
import org.apache.asterix.bad.metadata.DeployedJobSpecEventListener;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.metadata.DataverseName;
import org.apache.asterix.lang.common.statement.DropDatasetStatement;
import org.apache.asterix.lang.common.struct.Identifier;
import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
@@ -48,17 +49,17 @@
public class ChannelDropStatement extends ExtensionStatement {
private static final Logger LOGGER = Logger.getLogger(ChannelDropStatement.class.getName());
- private final Identifier dataverseName;
+ private final DataverseName dataverseName;
private final Identifier channelName;
private boolean ifExists;
- public ChannelDropStatement(Identifier dataverseName, Identifier channelName, boolean ifExists) {
+ public ChannelDropStatement(DataverseName dataverseName, Identifier channelName, boolean ifExists) {
this.dataverseName = dataverseName;
this.channelName = channelName;
this.ifExists = ifExists;
}
- public Identifier getDataverseName() {
+ public DataverseName getDataverseName() {
return dataverseName;
}
@@ -84,7 +85,7 @@
public void handle(IHyracksClientConnection hcc, IStatementExecutor statementExecutor,
IRequestParameters requestParameters, MetadataProvider metadataProvider, int resultSetId)
throws HyracksDataException, AlgebricksException {
- String dataverse = ((QueryTranslator) statementExecutor).getActiveDataverse(dataverseName);
+ DataverseName dataverse = statementExecutor.getActiveDataverseName(dataverseName);
boolean txnActive = false;
EntityId entityId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, dataverse, channelName.getValue());
ICcApplicationContext appCtx = metadataProvider.getApplicationContext();
@@ -112,14 +113,14 @@
//TODO: Channels need to better handle cluster failures
LOGGER.log(Level.SEVERE,
"Tried to drop a Deployed Job whose listener no longer exists: " + entityId.getExtensionName()
- + " " + entityId.getDataverse() + "." + entityId.getEntityName() + ".");
+ + " " + entityId.getDataverseName() + "." + entityId.getEntityName() + ".");
} else {
listener.getExecutorService().shutdown();
if (!listener.getExecutorService().awaitTermination(BADConstants.EXECUTOR_TIMEOUT, TimeUnit.SECONDS)) {
LOGGER.log(Level.SEVERE,
"Executor Service is terminating non-gracefully for: " + entityId.getExtensionName() + " "
- + entityId.getDataverse() + "." + entityId.getEntityName());
+ + entityId.getDataverseName() + "." + entityId.getEntityName());
}
DeployedJobSpecId deployedJobSpecId = listener.getDeployedJobSpecId();
listener.deActivate();
@@ -136,12 +137,11 @@
//Remove the Channel Metadata
MetadataManager.INSTANCE.deleteEntity(mdTxnCtx, channel);
- DropDatasetStatement dropStmt = new DropDatasetStatement(new Identifier(dataverse),
- new Identifier(channel.getResultsDatasetName()), true);
+ DropDatasetStatement dropStmt =
+ new DropDatasetStatement(dataverse, new Identifier(channel.getResultsDatasetName()), true);
((QueryTranslator) statementExecutor).handleDatasetDropStatement(tempMdProvider, dropStmt, hcc, null);
tempMdProvider.getLocks().reset();
- dropStmt = new DropDatasetStatement(new Identifier(dataverse),
- new Identifier(channel.getSubscriptionsDataset()), true);
+ dropStmt = new DropDatasetStatement(dataverse, new Identifier(channel.getSubscriptionsDataset()), true);
((QueryTranslator) statementExecutor).handleDatasetDropStatement(tempMdProvider, dropStmt, hcc, null);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
} catch (Exception e) {
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
index 3703c94..2c87711 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
@@ -30,6 +30,7 @@
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.functions.FunctionSignature;
+import org.apache.asterix.common.metadata.DataverseName;
import org.apache.asterix.lang.common.base.Expression;
import org.apache.asterix.lang.common.expression.CallExpr;
import org.apache.asterix.lang.common.expression.FieldAccessor;
@@ -61,16 +62,16 @@
public class ChannelSubscribeStatement extends ExtensionStatement {
- private final Identifier dataverseName;
+ private final DataverseName dataverseName;
private final Identifier channelName;
- private final Identifier brokerDataverseName;
+ private final DataverseName brokerDataverseName;
private final Identifier brokerName;
private final List<Expression> argList;
private final String subscriptionId;
private final int varCounter;
- public ChannelSubscribeStatement(Identifier dataverseName, Identifier channelName, List<Expression> argList,
- int varCounter, Identifier brokerDataverseName, Identifier brokerName, String subscriptionId) {
+ public ChannelSubscribeStatement(DataverseName dataverseName, Identifier channelName, List<Expression> argList,
+ int varCounter, DataverseName brokerDataverseName, Identifier brokerName, String subscriptionId) {
this.channelName = channelName;
this.dataverseName = dataverseName;
this.brokerDataverseName = brokerDataverseName;
@@ -80,11 +81,11 @@
this.varCounter = varCounter;
}
- public Identifier getDataverseName() {
+ public DataverseName getDataverseName() {
return dataverseName;
}
- public Identifier getBrokerDataverseName() {
+ public DataverseName getBrokerDataverseName() {
return brokerDataverseName;
}
@@ -122,8 +123,8 @@
public void handle(IHyracksClientConnection hcc, IStatementExecutor statementExecutor,
IRequestParameters requestParameters, MetadataProvider metadataProvider, int resultSetId)
throws HyracksDataException, AlgebricksException {
- String dataverse = ((QueryTranslator) statementExecutor).getActiveDataverse(dataverseName);
- String brokerDataverse = ((QueryTranslator) statementExecutor).getActiveDataverse(brokerDataverseName);
+ DataverseName dataverse = statementExecutor.getActiveDataverseName(dataverseName);
+ DataverseName brokerDataverse = statementExecutor.getActiveDataverseName(brokerDataverseName);
MetadataTransactionContext mdTxnCtx = null;
try {
@@ -149,7 +150,7 @@
List<FieldBinding> fb = new ArrayList<>();
LiteralExpr leftExpr = new LiteralExpr(new StringLiteral(BADConstants.DataverseName));
- Expression rightExpr = new LiteralExpr(new StringLiteral(brokerDataverse));
+ Expression rightExpr = new LiteralExpr(new StringLiteral(brokerDataverse.getCanonicalForm()));
fb.add(new FieldBinding(leftExpr, rightExpr));
leftExpr = new LiteralExpr(new StringLiteral(BADConstants.BrokerName));
@@ -162,8 +163,7 @@
List<Expression> UUIDList = new ArrayList<>();
UUIDList.add(new LiteralExpr(new StringLiteral(subscriptionId)));
FunctionIdentifier function = BuiltinFunctions.UUID_CONSTRUCTOR;
- FunctionSignature UUIDfunc =
- new FunctionSignature(function.getNamespace(), function.getName(), function.getArity());
+ FunctionSignature UUIDfunc = new FunctionSignature(function);
CallExpr UUIDCall = new CallExpr(UUIDfunc, UUIDList);
rightExpr = UUIDCall;
@@ -204,14 +204,14 @@
tempMdProvider.setOutputFile(metadataProvider.getOutputFile());
tempMdProvider.setMaxResultReads(requestParameters.getResultProperties().getMaxReads());
- InsertStatement insert = new InsertStatement(new Identifier(dataverse),
- new Identifier(subscriptionsDatasetName), subscriptionTuple, varCounter, resultVar, accessor);
+ InsertStatement insert = new InsertStatement(dataverse, new Identifier(subscriptionsDatasetName),
+ subscriptionTuple, varCounter, resultVar, accessor);
((QueryTranslator) statementExecutor).handleInsertUpsertStatement(tempMdProvider, insert, hcc,
resultSet, resultDelivery, null, stats, false, requestParameters, null, null);
} else {
//To update an existing subscription
- UpsertStatement upsert = new UpsertStatement(new Identifier(dataverse),
- new Identifier(subscriptionsDatasetName), subscriptionTuple, varCounter, null, null);
+ UpsertStatement upsert = new UpsertStatement(dataverse, new Identifier(subscriptionsDatasetName),
+ subscriptionTuple, varCounter, null, null);
((QueryTranslator) statementExecutor).handleInsertUpsertStatement(tempMdProvider, upsert, hcc,
resultSet, resultDelivery, null, stats, false, requestParameters, null, null);
}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
index c51ad27..4f097c3 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
@@ -29,6 +29,7 @@
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.functions.FunctionSignature;
+import org.apache.asterix.common.metadata.DataverseName;
import org.apache.asterix.lang.common.base.Expression;
import org.apache.asterix.lang.common.expression.CallExpr;
import org.apache.asterix.lang.common.expression.FieldAccessor;
@@ -53,13 +54,13 @@
public class ChannelUnsubscribeStatement extends ExtensionStatement {
- private final Identifier dataverseName;
+ private final DataverseName dataverseName;
private final Identifier channelName;
private final String subscriptionId;
private final int varCounter;
private VariableExpr vars;
- public ChannelUnsubscribeStatement(VariableExpr vars, Identifier dataverseName, Identifier channelName,
+ public ChannelUnsubscribeStatement(VariableExpr vars, DataverseName dataverseName, Identifier channelName,
String subscriptionId, int varCounter) {
this.vars = vars;
this.channelName = channelName;
@@ -68,7 +69,7 @@
this.varCounter = varCounter;
}
- public Identifier getDataverseName() {
+ public DataverseName getDataverseName() {
return dataverseName;
}
@@ -102,7 +103,7 @@
public void handle(IHyracksClientConnection hcc, IStatementExecutor statementExecutor,
IRequestParameters requestParameters, MetadataProvider metadataProvider, int resultSetId)
throws HyracksDataException, AlgebricksException {
- String dataverse = ((QueryTranslator) statementExecutor).getActiveDataverse(dataverseName);
+ DataverseName dataverse = statementExecutor.getActiveDataverseName(dataverseName);
MetadataTransactionContext mdTxnCtx = null;
try {
@@ -126,15 +127,14 @@
UUIDList.add(new LiteralExpr(new StringLiteral(subscriptionId)));
FunctionIdentifier function = BuiltinFunctions.UUID_CONSTRUCTOR;
- FunctionSignature UUIDfunc =
- new FunctionSignature(function.getNamespace(), function.getName(), function.getArity());
+ FunctionSignature UUIDfunc = new FunctionSignature(function);
CallExpr UUIDCall = new CallExpr(UUIDfunc, UUIDList);
condition.addOperand(UUIDCall);
- DeleteStatement delete = new DeleteStatement(vars, new Identifier(dataverse),
- new Identifier(subscriptionsDatasetName), condition, varCounter);
- SqlppDeleteRewriteVisitor visitor = new SqlppDeleteRewriteVisitor();
+ DeleteStatement delete = new DeleteStatement(vars, dataverse, new Identifier(subscriptionsDatasetName),
+ condition, varCounter);
+ SqlppDeleteRewriteVisitor visitor = new SqlppDeleteRewriteVisitor(metadataProvider);
delete.accept(visitor, null);
MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getApplicationContext(),
metadataProvider.getDefaultDataverse());
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateBrokerStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateBrokerStatement.java
index 367599b..31dd7cc 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateBrokerStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateBrokerStatement.java
@@ -26,6 +26,7 @@
import org.apache.asterix.bad.lang.BADLangExtension;
import org.apache.asterix.bad.metadata.Broker;
import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.metadata.DataverseName;
import org.apache.asterix.lang.common.struct.Identifier;
import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
import org.apache.asterix.metadata.MetadataManager;
@@ -40,11 +41,11 @@
public class CreateBrokerStatement extends ExtensionStatement {
private static final Logger LOGGER = Logger.getLogger(CreateBrokerStatement.class.getName());
- private final Identifier dataverseName;
+ private final DataverseName dataverseName;
private final Identifier brokerName;
private String endPointName;
- public CreateBrokerStatement(Identifier dataverseName, Identifier brokerName, String endPointName) {
+ public CreateBrokerStatement(DataverseName dataverseName, Identifier brokerName, String endPointName) {
this.brokerName = brokerName;
this.dataverseName = dataverseName;
this.endPointName = endPointName;
@@ -54,7 +55,7 @@
return endPointName;
}
- public Identifier getDataverseName() {
+ public DataverseName getDataverseName() {
return dataverseName;
}
@@ -76,7 +77,7 @@
public void handle(IHyracksClientConnection hcc, IStatementExecutor statementExecutor,
IRequestParameters requestParameters, MetadataProvider metadataProvider, int resultSetId)
throws HyracksDataException, AlgebricksException {
- String dataverse = ((QueryTranslator) statementExecutor).getActiveDataverse(dataverseName);
+ DataverseName dataverse = statementExecutor.getActiveDataverseName(dataverseName);
MetadataTransactionContext mdTxnCtx = null;
try {
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
index e90e1f8..b807cc7 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
@@ -45,6 +45,7 @@
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.MetadataException;
import org.apache.asterix.common.functions.FunctionSignature;
+import org.apache.asterix.common.metadata.DataverseName;
import org.apache.asterix.lang.common.base.Expression;
import org.apache.asterix.lang.common.base.Statement;
import org.apache.asterix.lang.common.expression.CallExpr;
@@ -59,10 +60,12 @@
import org.apache.asterix.lang.common.statement.SetStatement;
import org.apache.asterix.lang.common.struct.Identifier;
import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
+import org.apache.asterix.lang.sqlpp.util.SqlppStatementUtil;
import org.apache.asterix.metadata.MetadataManager;
import org.apache.asterix.metadata.MetadataTransactionContext;
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.metadata.entities.Function;
+import org.apache.asterix.metadata.utils.MetadataConstants;
import org.apache.asterix.om.base.temporal.ADurationParserFactory;
import org.apache.asterix.translator.IRequestParameters;
import org.apache.asterix.translator.IStatementExecutor;
@@ -82,15 +85,14 @@
private final Identifier channelName;
private final FunctionSignature function;
private final CallExpr period;
- private Identifier dataverseName;
+ private DataverseName dataverseName;
private String duration;
private String body;
private String subscriptionsTableName;
private String resultsTableName;
- private String dataverse;
private final boolean push;
- public CreateChannelStatement(Identifier dataverseName, Identifier channelName, FunctionSignature function,
+ public CreateChannelStatement(DataverseName dataverseName, Identifier channelName, FunctionSignature function,
Expression period, boolean push) {
this.channelName = channelName;
this.dataverseName = dataverseName;
@@ -100,7 +102,7 @@
this.push = push;
}
- public Identifier getDataverseName() {
+ public DataverseName getDataverseName() {
return dataverseName;
}
@@ -169,7 +171,7 @@
partitionFields.add(fieldNames);
IDatasetDetailsDecl idd = new InternalDetailsDecl(partitionFields, keyIndicators, true, null);
DatasetDecl createSubscriptionsDataset = new DatasetDecl(dataverseName, new Identifier(subscriptionsTableName),
- new Identifier(BADConstants.BAD_DATAVERSE_NAME), subscriptionsTypeName, null, null, null,
+ MetadataConstants.METADATA_DATAVERSE_NAME, subscriptionsTypeName, null, null, null,
new HashMap<String, String>(), DatasetType.INTERNAL, idd, null, true);
((QueryTranslator) statementExecutor).handleCreateDatasetStatement(metadataProvider, createSubscriptionsDataset,
@@ -183,7 +185,7 @@
partitionFields.add(fieldNames);
idd = new InternalDetailsDecl(partitionFields, keyIndicators, true, null);
DatasetDecl createResultsDataset = new DatasetDecl(dataverseName, new Identifier(resultsTableName),
- new Identifier(BADConstants.BAD_DATAVERSE_NAME), resultsTypeName, null, null, null, new HashMap<>(),
+ MetadataConstants.METADATA_DATAVERSE_NAME, resultsTypeName, null, null, null, new HashMap<>(),
DatasetType.INTERNAL, idd, null, true);
//Create an index on timestamp for results
@@ -216,6 +218,7 @@
private JobSpecification createChannelJob(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
IHyracksClientConnection hcc, IResultSet resultSet, Stats stats) throws Exception {
StringBuilder builder = new StringBuilder();
+ CharSequence dataverse = SqlppStatementUtil.encloseDataverseName(new StringBuilder(), dataverseName);
builder.append("SET inline_with \"false\";\n");
if (!push) {
builder.append("insert into " + dataverse + "." + resultsTableName);
@@ -227,8 +230,9 @@
builder.append("sub." + BADConstants.SubscriptionId + " as " + BADConstants.SubscriptionId + ",");
builder.append("current_datetime() as " + BADConstants.DeliveryTime + "\n");
builder.append("from " + dataverse + "." + subscriptionsTableName + " sub,\n");
- builder.append(BADConstants.BAD_DATAVERSE_NAME + "." + BADConstants.BROKER_KEYWORD + " b, \n");
- builder.append(function.getNamespace() + "." + function.getName() + "(");
+ builder.append(MetadataConstants.METADATA_DATAVERSE_NAME + "." + BADConstants.BROKER_KEYWORD + " b, \n");
+ //TODO(MULTI_PART_DATAVERSE_NAME):REVISIT
+ builder.append(function.getDataverseName().getCanonicalForm() + "." + function.getName() + "(");
int i = 0;
for (; i < function.getArity() - 1; i++) {
builder.append("sub.param" + i + ",");
@@ -266,12 +270,11 @@
//TODO: Figure out how to handle when a subset of the 3 tasks fails
- dataverseName = new Identifier(((QueryTranslator) statementExecutor).getActiveDataverse(dataverseName));
- dataverse = dataverseName.getValue();
+ dataverseName = statementExecutor.getActiveDataverseName(dataverseName);
subscriptionsTableName = channelName + BADConstants.subscriptionEnding;
resultsTableName = push ? "" : channelName + BADConstants.resultsEnding;
- EntityId entityId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, dataverse, channelName.getValue());
+ EntityId entityId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, dataverseName, channelName.getValue());
ICcApplicationContext appCtx = metadataProvider.getApplicationContext();
ActiveNotificationHandler activeEventHandler =
(ActiveNotificationHandler) appCtx.getActiveNotificationHandler();
@@ -283,7 +286,7 @@
try {
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
- channel = BADLangExtension.getChannel(mdTxnCtx, dataverse, channelName.getValue());
+ channel = BADLangExtension.getChannel(mdTxnCtx, dataverseName, channelName.getValue());
if (channel != null) {
throw new AlgebricksException("A channel with this name " + channelName + " already exists.");
}
@@ -296,10 +299,10 @@
initialize(mdTxnCtx);
//check if names are available before creating anything
- if (MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverse, subscriptionsTableName) != null) {
+ if (MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, subscriptionsTableName) != null) {
throw new AsterixException("The channel name:" + channelName + " is not available.");
}
- if (!push && MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverse, resultsTableName) != null) {
+ if (!push && MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, resultsTableName) != null) {
throw new AsterixException("The channel name:" + channelName + " is not available.");
}
MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getApplicationContext(),
@@ -323,8 +326,8 @@
BADJobService.setupExecutorJob(entityId, channeljobSpec, hcc, listener, metadataProvider.getTxnIdFactory(),
duration);
- channel = new Channel(dataverse, channelName.getValue(), subscriptionsTableName, resultsTableName, function,
- duration, null, body);
+ channel = new Channel(dataverseName, channelName.getValue(), subscriptionsTableName, resultsTableName,
+ function, duration, null, body);
MetadataManager.INSTANCE.addEntity(mdTxnCtx, channel);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
index f9677ac..f45f0b4 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
@@ -22,7 +22,6 @@
import java.io.DataOutputStream;
import java.io.StringReader;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -45,6 +44,7 @@
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.MetadataException;
import org.apache.asterix.common.functions.FunctionSignature;
+import org.apache.asterix.common.metadata.DataverseName;
import org.apache.asterix.lang.common.base.Expression;
import org.apache.asterix.lang.common.base.Statement;
import org.apache.asterix.lang.common.expression.CallExpr;
@@ -54,11 +54,11 @@
import org.apache.asterix.lang.common.statement.DeleteStatement;
import org.apache.asterix.lang.common.statement.InsertStatement;
import org.apache.asterix.lang.common.statement.Query;
-import org.apache.asterix.lang.common.struct.Identifier;
import org.apache.asterix.lang.common.struct.VarIdentifier;
import org.apache.asterix.lang.common.util.FunctionUtil;
import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
import org.apache.asterix.lang.sqlpp.rewrites.SqlppRewriterFactory;
+import org.apache.asterix.lang.sqlpp.util.SqlppStatementUtil;
import org.apache.asterix.lang.sqlpp.visitor.SqlppDeleteRewriteVisitor;
import org.apache.asterix.metadata.MetadataManager;
import org.apache.asterix.metadata.MetadataTransactionContext;
@@ -71,6 +71,7 @@
import org.apache.asterix.translator.IStatementExecutor.Stats;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.common.utils.Triple;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.DeployedJobSpecId;
@@ -89,7 +90,7 @@
private final List<VariableExpr> varList;
private final CallExpr period;
private String duration = "";
- private List<List<List<String>>> dependencies;
+ private List<List<Triple<DataverseName, String, String>>> dependencies;
public CreateProcedureStatement(FunctionSignature signature, List<VarIdentifier> parameterList,
List<Integer> paramIds, String functionBody, Expression period) {
@@ -115,7 +116,8 @@
Matcher matcher = variableReference.matcher(newBody);
newBody = matcher.replaceAll("$1get_job_param(\"" + var.getVar() + "\")$2");
}
- return "use " + signature.getNamespace() + ";\n" + newBody + ";";
+ return "use " + SqlppStatementUtil.encloseDataverseName(new StringBuilder(), signature.getDataverseName())
+ + ";\n" + newBody + ";";
}
public String getProcedureBody() {
@@ -181,9 +183,9 @@
throw new CompilationException("Insert procedures cannot have parameters");
}
InsertStatement insertStatement = (InsertStatement) getProcedureBodyStatement();
- dependencies.get(0).add(Arrays.asList(
- ((QueryTranslator) statementExecutor).getActiveDataverse(insertStatement.getDataverseName()),
- insertStatement.getDatasetName().getValue()));
+ dependencies.get(0)
+ .add(new Triple<>(statementExecutor.getActiveDataverseName(insertStatement.getDataverseName()),
+ insertStatement.getDatasetName(), null));
return new Pair<>(((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider,
getProcedureBodyStatement(), hcc, null, ResultDelivery.ASYNC, null, stats, true, null, null, null),
PrecompiledType.INSERT);
@@ -197,7 +199,7 @@
((Query) getProcedureBodyStatement()).getBody(), metadataProvider).get(0));
return pair;
} else if (getProcedureBodyStatement().getKind() == Statement.Kind.DELETE) {
- SqlppDeleteRewriteVisitor visitor = new SqlppDeleteRewriteVisitor();
+ SqlppDeleteRewriteVisitor visitor = new SqlppDeleteRewriteVisitor(metadataProvider);
getProcedureBodyStatement().accept(visitor, null);
DeleteStatement delete = (DeleteStatement) getProcedureBodyStatement();
@@ -228,8 +230,7 @@
ActiveNotificationHandler activeEventHandler =
(ActiveNotificationHandler) appCtx.getActiveNotificationHandler();
initialize();
- String dataverse =
- ((QueryTranslator) statementExecutor).getActiveDataverse(new Identifier(signature.getNamespace()));
+ DataverseName dataverse = statementExecutor.getActiveDataverseName(signature.getDataverseName());
EntityId entityId = new EntityId(BADConstants.PROCEDURE_KEYWORD, dataverse, signature.getName());
DeployedJobSpecEventListener listener = (DeployedJobSpecEventListener) activeEventHandler.getListener(entityId);
boolean alreadyActive = false;
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
index 4ff2092..858cbb2 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
@@ -37,10 +37,10 @@
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.metadata.DataverseName;
import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
import org.apache.asterix.lang.common.base.Expression;
import org.apache.asterix.lang.common.expression.LiteralExpr;
-import org.apache.asterix.lang.common.struct.Identifier;
import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
import org.apache.asterix.metadata.MetadataManager;
import org.apache.asterix.metadata.MetadataTransactionContext;
@@ -60,19 +60,20 @@
public static final String WAIT_FOR_COMPLETION = "wait-for-completion-procedure";
- private final String dataverseName;
+ private final DataverseName dataverseName;
private final String procedureName;
private final int arity;
private final List<Expression> argList;
- public ExecuteProcedureStatement(String dataverseName, String procedureName, int arity, List<Expression> argList) {
+ public ExecuteProcedureStatement(DataverseName dataverseName, String procedureName, int arity,
+ List<Expression> argList) {
this.dataverseName = dataverseName;
this.procedureName = procedureName;
this.arity = arity;
this.argList = argList;
}
- public String getDataverseName() {
+ public DataverseName getDataverseName() {
return dataverseName;
}
@@ -101,7 +102,7 @@
ICcApplicationContext appCtx = metadataProvider.getApplicationContext();
ActiveNotificationHandler activeEventHandler =
(ActiveNotificationHandler) appCtx.getActiveNotificationHandler();
- String dataverse = ((QueryTranslator) statementExecutor).getActiveDataverse(new Identifier(dataverseName));
+ DataverseName dataverse = statementExecutor.getActiveDataverseName(dataverseName);
boolean txnActive = false;
EntityId entityId = new EntityId(BADConstants.PROCEDURE_KEYWORD, dataverse, procedureName);
DeployedJobSpecEventListener listener = (DeployedJobSpecEventListener) activeEventHandler.getListener(entityId);
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java
index 74ea7e0..193c0cb 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java
@@ -33,7 +33,7 @@
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.functions.FunctionSignature;
-import org.apache.asterix.lang.common.struct.Identifier;
+import org.apache.asterix.common.metadata.DataverseName;
import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
import org.apache.asterix.metadata.MetadataManager;
import org.apache.asterix.metadata.MetadataTransactionContext;
@@ -82,11 +82,10 @@
ActiveNotificationHandler activeEventHandler =
(ActiveNotificationHandler) appCtx.getActiveNotificationHandler();
FunctionSignature signature = getFunctionSignature();
- String dataverse =
- ((QueryTranslator) statementExecutor).getActiveDataverse(new Identifier(signature.getNamespace()));
- signature.setNamespace(dataverse);
+ DataverseName dataverseName = statementExecutor.getActiveDataverseName(signature.getDataverseName());
+ signature.setDataverseName(dataverseName);
boolean txnActive = false;
- EntityId entityId = new EntityId(BADConstants.PROCEDURE_KEYWORD, dataverse, signature.getName());
+ EntityId entityId = new EntityId(BADConstants.PROCEDURE_KEYWORD, dataverseName, signature.getName());
DeployedJobSpecEventListener listener = (DeployedJobSpecEventListener) activeEventHandler.getListener(entityId);
if (listener.isActive()) {
@@ -100,7 +99,7 @@
try {
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
txnActive = true;
- procedure = BADLangExtension.getProcedure(mdTxnCtx, dataverse, signature.getName(),
+ procedure = BADLangExtension.getProcedure(mdTxnCtx, dataverseName, signature.getName(),
Integer.toString(signature.getArity()));
txnActive = false;
if (procedure == null) {
@@ -116,7 +115,7 @@
//TODO: Channels need to better handle cluster failures
LOGGER.log(Level.SEVERE,
"Tried to drop a Deployed Job whose listener no longer exists: " + entityId.getExtensionName()
- + " " + entityId.getDataverse() + "." + entityId.getEntityName() + ".");
+ + " " + entityId.getDataverseName() + "." + entityId.getEntityName() + ".");
} else {
if (listener.getExecutorService() != null) {
listener.getExecutorService().shutdown();
@@ -124,7 +123,7 @@
TimeUnit.SECONDS)) {
LOGGER.log(Level.SEVERE,
"Executor Service is terminating non-gracefully for: " + entityId.getExtensionName()
- + " " + entityId.getDataverse() + "." + entityId.getEntityName());
+ + " " + entityId.getDataverseName() + "." + entityId.getEntityName());
}
}
DeployedJobSpecId deployedJobSpecId = listener.getDeployedJobSpecId();
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataExtension.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataExtension.java
index cd2ff86..49371f9 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataExtension.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataExtension.java
@@ -34,6 +34,7 @@
import org.apache.asterix.metadata.entities.Datatype;
import org.apache.asterix.metadata.entities.Dataverse;
import org.apache.asterix.metadata.entitytupletranslators.MetadataTupleTranslatorProvider;
+import org.apache.asterix.metadata.utils.MetadataConstants;
import org.apache.asterix.metadata.utils.MetadataUtil;
import org.apache.asterix.runtime.formats.NonTaggedDataFormat;
import org.apache.hyracks.algebricks.common.utils.Pair;
@@ -44,21 +45,21 @@
public static final ExtensionId BAD_METADATA_EXTENSION_ID =
new ExtensionId(BADConstants.BAD_METADATA_EXTENSION_NAME, 0);
- public static final Dataverse BAD_DATAVERSE = new Dataverse(BADConstants.BAD_DATAVERSE_NAME,
+ public static final Dataverse BAD_DATAVERSE = new Dataverse(MetadataConstants.METADATA_DATAVERSE_NAME,
NonTaggedDataFormat.class.getName(), MetadataUtil.PENDING_NO_OP);
- public static final Datatype BAD_SUBSCRIPTION_DATATYPE = new Datatype(BADConstants.BAD_DATAVERSE_NAME,
+ public static final Datatype BAD_SUBSCRIPTION_DATATYPE = new Datatype(MetadataConstants.METADATA_DATAVERSE_NAME,
BADConstants.ChannelSubscriptionsType, BADMetadataRecordTypes.channelSubscriptionsType, false);
- public static final Datatype BAD_RESULT_DATATYPE = new Datatype(BADConstants.BAD_DATAVERSE_NAME,
+ public static final Datatype BAD_RESULT_DATATYPE = new Datatype(MetadataConstants.METADATA_DATAVERSE_NAME,
BADConstants.ChannelResultsType, BADMetadataRecordTypes.channelResultsType, false);
- public static final Datatype BAD_BROKER_DATATYPE = new Datatype(BADConstants.BAD_DATAVERSE_NAME,
+ public static final Datatype BAD_BROKER_DATATYPE = new Datatype(MetadataConstants.METADATA_DATAVERSE_NAME,
BADConstants.RECORD_TYPENAME_BROKER, BADMetadataRecordTypes.BROKER_RECORDTYPE, false);
- public static final Datatype BAD_CHANNEL_DATATYPE = new Datatype(BADConstants.BAD_DATAVERSE_NAME,
+ public static final Datatype BAD_CHANNEL_DATATYPE = new Datatype(MetadataConstants.METADATA_DATAVERSE_NAME,
BADConstants.RECORD_TYPENAME_CHANNEL, BADMetadataRecordTypes.CHANNEL_RECORDTYPE, false);
- public static final Datatype BAD_PROCEDURE_DATATYPE = new Datatype(BADConstants.BAD_DATAVERSE_NAME,
+ public static final Datatype BAD_PROCEDURE_DATATYPE = new Datatype(MetadataConstants.METADATA_DATAVERSE_NAME,
BADConstants.RECORD_TYPENAME_PROCEDURE, BADMetadataRecordTypes.PROCEDURE_RECORDTYPE, false);
@Override
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataIndexes.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataIndexes.java
index fa35fd5..0722c48 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataIndexes.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataIndexes.java
@@ -24,6 +24,7 @@
import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
import org.apache.asterix.metadata.api.ExtensionMetadataDataset;
import org.apache.asterix.metadata.api.ExtensionMetadataDatasetId;
+import org.apache.asterix.metadata.api.IMetadataEntityTupleTranslatorFactory;
import org.apache.asterix.metadata.bootstrap.MetadataRecordTypes;
import org.apache.asterix.om.types.BuiltinType;
import org.apache.asterix.om.types.IAType;
@@ -55,28 +56,26 @@
public static final int NUM_FIELDS_BROKER_IDX = 3;
public static final int NUM_FIELDS_PROCEDURE_IDX = 4;
- @SuppressWarnings({ "rawtypes", "unchecked" })
- public static final ExtensionMetadataDataset CHANNEL_DATASET = new ExtensionMetadataDataset(PROPERTIES_CHANNEL,
+ public static final ExtensionMetadataDataset CHANNEL_DATASET = new ExtensionMetadataDataset<>(PROPERTIES_CHANNEL,
NUM_FIELDS_CHANNEL_IDX, new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING },
Arrays.asList(Arrays.asList(MetadataRecordTypes.FIELD_NAME_DATAVERSE_NAME),
Arrays.asList(BADConstants.ChannelName)),
0, BADMetadataRecordTypes.CHANNEL_RECORDTYPE, true, new int[] { 0, 1 }, BAD_CHANNEL_INDEX_ID,
- () -> new ChannelTupleTranslator(true));
+ (IMetadataEntityTupleTranslatorFactory<Channel>) ChannelTupleTranslator::new);
- @SuppressWarnings({ "rawtypes", "unchecked" })
- public static final ExtensionMetadataDataset BROKER_DATASET = new ExtensionMetadataDataset(PROPERTIES_BROKER,
+ public static final ExtensionMetadataDataset BROKER_DATASET = new ExtensionMetadataDataset<>(PROPERTIES_BROKER,
NUM_FIELDS_BROKER_IDX, new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING },
Arrays.asList(Arrays.asList(MetadataRecordTypes.FIELD_NAME_DATAVERSE_NAME),
Arrays.asList(BADConstants.BrokerName)),
0, BADMetadataRecordTypes.BROKER_RECORDTYPE, true, new int[] { 0, 1 }, BAD_BROKER_INDEX_ID,
- () -> new BrokerTupleTranslator(true));
+ (IMetadataEntityTupleTranslatorFactory<Broker>) BrokerTupleTranslator::new);
- @SuppressWarnings({ "rawtypes", "unchecked" })
- public static final ExtensionMetadataDataset PROCEDURE_DATASET = new ExtensionMetadataDataset(PROPERTIES_PROCEDURE,
- NUM_FIELDS_PROCEDURE_IDX, new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING },
- Arrays.asList(Arrays.asList(MetadataRecordTypes.FIELD_NAME_DATAVERSE_NAME),
- Arrays.asList(BADConstants.ProcedureName), Arrays.asList(BADConstants.FIELD_NAME_ARITY)),
- 0, BADMetadataRecordTypes.PROCEDURE_RECORDTYPE, true, new int[] { 0, 1, 2 }, BAD_PROCEDURE_INDEX_ID,
- () -> new ProcedureTupleTranslator(true));
+ public static final ExtensionMetadataDataset PROCEDURE_DATASET =
+ new ExtensionMetadataDataset<>(PROPERTIES_PROCEDURE, NUM_FIELDS_PROCEDURE_IDX,
+ new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING },
+ Arrays.asList(Arrays.asList(MetadataRecordTypes.FIELD_NAME_DATAVERSE_NAME),
+ Arrays.asList(BADConstants.ProcedureName), Arrays.asList(BADConstants.FIELD_NAME_ARITY)),
+ 0, BADMetadataRecordTypes.PROCEDURE_RECORDTYPE, true, new int[] { 0, 1, 2 }, BAD_PROCEDURE_INDEX_ID,
+ (IMetadataEntityTupleTranslatorFactory<Procedure>) ProcedureTupleTranslator::new);
}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Broker.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Broker.java
index 006f0dc..feabf3f 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Broker.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Broker.java
@@ -15,6 +15,7 @@
package org.apache.asterix.bad.metadata;
+import org.apache.asterix.common.metadata.DataverseName;
import org.apache.asterix.metadata.api.ExtensionMetadataDatasetId;
import org.apache.asterix.metadata.api.IExtensionMetadataEntity;
@@ -23,19 +24,19 @@
*/
public class Broker implements IExtensionMetadataEntity {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 2L;
- private final String dataverseName;
+ private final DataverseName dataverseName;
private final String brokerName;
private final String endPointName;
- public Broker(String dataverseName, String brokerName, String endPointName) {
+ public Broker(DataverseName dataverseName, String brokerName, String endPointName) {
this.endPointName = endPointName;
this.dataverseName = dataverseName;
this.brokerName = brokerName;
}
- public String getDataverseName() {
+ public DataverseName getDataverseName() {
return dataverseName;
}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BrokerSearchKey.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BrokerSearchKey.java
index b73e9e3..8b29896 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BrokerSearchKey.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BrokerSearchKey.java
@@ -18,17 +18,18 @@
*/
package org.apache.asterix.bad.metadata;
+import org.apache.asterix.common.metadata.DataverseName;
import org.apache.asterix.metadata.MetadataNode;
import org.apache.asterix.metadata.api.ExtensionMetadataDatasetId;
import org.apache.asterix.metadata.api.IExtensionMetadataSearchKey;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
public class BrokerSearchKey implements IExtensionMetadataSearchKey {
- private static final long serialVersionUID = 1L;
- private final String dataverse;
+ private static final long serialVersionUID = 2L;
+ private final DataverseName dataverse;
private final String broker;
- public BrokerSearchKey(String dataverse, String broker) {
+ public BrokerSearchKey(DataverseName dataverse, String broker) {
this.dataverse = dataverse;
this.broker = broker;
}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BrokerTupleTranslator.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BrokerTupleTranslator.java
index de1aab8..1114d06 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BrokerTupleTranslator.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BrokerTupleTranslator.java
@@ -15,16 +15,11 @@
package org.apache.asterix.bad.metadata;
-import java.io.ByteArrayInputStream;
-import java.io.DataInput;
-import java.io.DataInputStream;
-
import org.apache.asterix.common.exceptions.MetadataException;
-import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
+import org.apache.asterix.common.metadata.DataverseName;
import org.apache.asterix.metadata.entitytupletranslators.AbstractTupleTranslator;
import org.apache.asterix.om.base.ARecord;
import org.apache.asterix.om.base.AString;
-import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
@@ -41,46 +36,32 @@
// Payload field containing serialized broker.
public static final int BROKER_PAYLOAD_TUPLE_FIELD_INDEX = 2;
- @SuppressWarnings("unchecked")
- private ISerializerDeserializer<ARecord> recordSerDes =
- SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BADMetadataRecordTypes.BROKER_RECORDTYPE);
-
- @SuppressWarnings("unchecked")
public BrokerTupleTranslator(boolean getTuple) {
- super(getTuple, BADMetadataIndexes.NUM_FIELDS_BROKER_IDX);
+ super(getTuple, BADMetadataIndexes.BROKER_DATASET, BROKER_PAYLOAD_TUPLE_FIELD_INDEX);
}
@Override
- public Broker getMetadataEntityFromTuple(ITupleReference frameTuple) throws HyracksDataException {
- byte[] serRecord = frameTuple.getFieldData(BROKER_PAYLOAD_TUPLE_FIELD_INDEX);
- int recordStartOffset = frameTuple.getFieldStart(BROKER_PAYLOAD_TUPLE_FIELD_INDEX);
- int recordLength = frameTuple.getFieldLength(BROKER_PAYLOAD_TUPLE_FIELD_INDEX);
- ByteArrayInputStream stream = new ByteArrayInputStream(serRecord, recordStartOffset, recordLength);
- DataInput in = new DataInputStream(stream);
- ARecord channelRecord = recordSerDes.deserialize(in);
- return createBrokerFromARecord(channelRecord);
- }
-
- private Broker createBrokerFromARecord(ARecord brokerRecord) {
- Broker broker = null;
- String dataverseName =
+ public Broker createMetadataEntityFromARecord(ARecord brokerRecord) throws HyracksDataException {
+ String dataverseCanonicalName =
((AString) brokerRecord.getValueByPos(BADMetadataRecordTypes.BROKER_DATAVERSE_NAME_FIELD_INDEX))
.getStringValue();
+ DataverseName dataverseName = DataverseName.createFromCanonicalForm(dataverseCanonicalName);
String brokerName =
((AString) brokerRecord.getValueByPos(BADMetadataRecordTypes.BROKER_NAME_FIELD_INDEX)).getStringValue();
String endPointName = ((AString) brokerRecord.getValueByPos(BADMetadataRecordTypes.BROKER_ENDPOINT_FIELD_INDEX))
.getStringValue();
- broker = new Broker(dataverseName, brokerName, endPointName);
- return broker;
+ return new Broker(dataverseName, brokerName, endPointName);
}
@Override
public ITupleReference getTupleFromMetadataEntity(Broker broker) throws HyracksDataException, MetadataException {
+ String dataverseCanonicalName = broker.getDataverseName().getCanonicalForm();
+
// write the key in the first fields of the tuple
tupleBuilder.reset();
- aString.setValue(broker.getDataverseName());
+ aString.setValue(dataverseCanonicalName);
stringSerde.serialize(aString, tupleBuilder.getDataOutput());
tupleBuilder.addFieldEndOffset();
@@ -92,7 +73,7 @@
// write field 0
fieldValue.reset();
- aString.setValue(broker.getDataverseName());
+ aString.setValue(dataverseCanonicalName);
stringSerde.serialize(aString, fieldValue.getDataOutput());
recordBuilder.addField(BADMetadataRecordTypes.BROKER_DATAVERSE_NAME_FIELD_INDEX, fieldValue);
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Channel.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Channel.java
index ed9346c..9c5a1f0 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Channel.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Channel.java
@@ -16,21 +16,22 @@
package org.apache.asterix.bad.metadata;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
import org.apache.asterix.active.EntityId;
import org.apache.asterix.bad.BADConstants;
import org.apache.asterix.common.functions.FunctionSignature;
+import org.apache.asterix.common.metadata.DataverseName;
import org.apache.asterix.metadata.api.ExtensionMetadataDatasetId;
import org.apache.asterix.metadata.api.IExtensionMetadataEntity;
+import org.apache.hyracks.algebricks.common.utils.Triple;
/**
* Metadata describing a channel.
*/
public class Channel implements IExtensionMetadataEntity {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 2L;
/** A unique identifier for the channel */
protected final EntityId channelId;
@@ -39,35 +40,37 @@
private final String duration;
private final String channelBody;
private final FunctionSignature function;
- private final List<String> functionAsPath;
+ private final Triple<DataverseName, String, String> functionAsPath;
/*
Dependencies are stored as an array of size two:
element 0 is a list of dataset dependencies
- -stored as lists of [DataverseName, Dataset] for the datasets
+ -stored as triples of [DataverseName, Dataset, null] for the datasets
element 1 is a list of function dependencies
- -stored as lists of [DataverseName, FunctionName, Arity] for the functions
+ -stored as triples of [DataverseName, FunctionName, Arity] for the functions
*/
- private final List<List<List<String>>> dependencies;
+ private final List<List<Triple<DataverseName, String, String>>> dependencies;
- public Channel(String dataverseName, String channelName, String subscriptionsDataset, String resultsDataset,
- FunctionSignature function, String duration, List<List<List<String>>> dependencies, String channelBody) {
+ public Channel(DataverseName dataverseName, String channelName, String subscriptionsDataset, String resultsDataset,
+ FunctionSignature function, String duration, List<List<Triple<DataverseName, String, String>>> dependencies,
+ String channelBody) {
this.channelId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, dataverseName, channelName);
this.function = function;
this.duration = duration;
this.resultsDatasetName = resultsDataset;
this.subscriptionsDatasetName = subscriptionsDataset;
this.channelBody = channelBody;
- if (this.function.getNamespace() == null) {
- this.function.setNamespace(dataverseName);
+ if (this.function.getDataverseName() == null) {
+ this.function.setDataverseName(dataverseName);
}
- functionAsPath = Arrays.asList(this.function.getNamespace(), this.function.getName(),
+ functionAsPath = new Triple<>(this.function.getDataverseName(), this.function.getName(),
Integer.toString(this.function.getArity()));
if (dependencies == null) {
this.dependencies = new ArrayList<>();
this.dependencies.add(new ArrayList<>());
this.dependencies.add(new ArrayList<>());
- List<String> resultsList = Arrays.asList(dataverseName, resultsDatasetName);
- List<String> subscriptionList = Arrays.asList(dataverseName, subscriptionsDatasetName);
+ Triple<DataverseName, String, String> resultsList = new Triple<>(dataverseName, resultsDatasetName, null);
+ Triple<DataverseName, String, String> subscriptionList =
+ new Triple<>(dataverseName, subscriptionsDatasetName, null);
this.dependencies.get(0).add(resultsList);
this.dependencies.get(0).add(subscriptionList);
this.dependencies.get(1).add(functionAsPath);
@@ -80,7 +83,7 @@
return channelId;
}
- public List<List<List<String>>> getDependencies() {
+ public List<List<Triple<DataverseName, String, String>>> getDependencies() {
return dependencies;
}
@@ -100,7 +103,7 @@
return channelBody;
}
- public List<String> getFunctionAsPath() {
+ public Triple<DataverseName, String, String> getFunctionAsPath() {
return functionAsPath;
}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelSearchKey.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelSearchKey.java
index 679548c..5f8750e 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelSearchKey.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelSearchKey.java
@@ -18,17 +18,18 @@
*/
package org.apache.asterix.bad.metadata;
+import org.apache.asterix.common.metadata.DataverseName;
import org.apache.asterix.metadata.MetadataNode;
import org.apache.asterix.metadata.api.ExtensionMetadataDatasetId;
import org.apache.asterix.metadata.api.IExtensionMetadataSearchKey;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
public class ChannelSearchKey implements IExtensionMetadataSearchKey {
- private static final long serialVersionUID = 1L;
- private final String dataverse;
+ private static final long serialVersionUID = 2L;
+ private final DataverseName dataverse;
private final String channel;
- public ChannelSearchKey(String dataverse, String channel) {
+ public ChannelSearchKey(DataverseName dataverse, String channel) {
this.dataverse = dataverse;
this.channel = channel;
}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java
index 175280e..aea119f 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java
@@ -15,16 +15,13 @@
package org.apache.asterix.bad.metadata;
-import java.io.ByteArrayInputStream;
-import java.io.DataInput;
-import java.io.DataInputStream;
import java.util.ArrayList;
import java.util.List;
import org.apache.asterix.builders.OrderedListBuilder;
import org.apache.asterix.common.exceptions.MetadataException;
import org.apache.asterix.common.functions.FunctionSignature;
-import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
+import org.apache.asterix.common.metadata.DataverseName;
import org.apache.asterix.metadata.entitytupletranslators.AbstractTupleTranslator;
import org.apache.asterix.om.base.AOrderedList;
import org.apache.asterix.om.base.ARecord;
@@ -32,7 +29,7 @@
import org.apache.asterix.om.base.IACursor;
import org.apache.asterix.om.types.AOrderedListType;
import org.apache.asterix.om.types.BuiltinType;
-import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.algebricks.common.utils.Triple;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
@@ -50,35 +47,23 @@
// Payload field containing serialized feed.
public static final int CHANNEL_PAYLOAD_TUPLE_FIELD_INDEX = 2;
- private ISerializerDeserializer<ARecord> recordSerDes = SerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BADMetadataRecordTypes.CHANNEL_RECORDTYPE);
-
private transient OrderedListBuilder dependenciesListBuilder = new OrderedListBuilder();
private transient OrderedListBuilder dependencyListBuilder = new OrderedListBuilder();
private transient OrderedListBuilder dependencyNameListBuilder = new OrderedListBuilder();
+ private transient List<String> dependencySubnames = new ArrayList<>();
private transient AOrderedListType stringList = new AOrderedListType(BuiltinType.ASTRING, null);
private transient AOrderedListType ListofLists =
new AOrderedListType(new AOrderedListType(BuiltinType.ASTRING, null), null);
public ChannelTupleTranslator(boolean getTuple) {
- super(getTuple, BADMetadataIndexes.NUM_FIELDS_CHANNEL_IDX);
+ super(getTuple, BADMetadataIndexes.CHANNEL_DATASET, CHANNEL_PAYLOAD_TUPLE_FIELD_INDEX);
}
@Override
- public Channel getMetadataEntityFromTuple(ITupleReference frameTuple) throws HyracksDataException {
- byte[] serRecord = frameTuple.getFieldData(CHANNEL_PAYLOAD_TUPLE_FIELD_INDEX);
- int recordStartOffset = frameTuple.getFieldStart(CHANNEL_PAYLOAD_TUPLE_FIELD_INDEX);
- int recordLength = frameTuple.getFieldLength(CHANNEL_PAYLOAD_TUPLE_FIELD_INDEX);
- ByteArrayInputStream stream = new ByteArrayInputStream(serRecord, recordStartOffset, recordLength);
- DataInput in = new DataInputStream(stream);
- ARecord channelRecord = recordSerDes.deserialize(in);
- return createChannelFromARecord(channelRecord);
- }
-
- private Channel createChannelFromARecord(ARecord channelRecord) {
- Channel channel = null;
- String dataverseName = ((AString) channelRecord
+ protected Channel createMetadataEntityFromARecord(ARecord channelRecord) {
+ String dataverseCanonicalName = ((AString) channelRecord
.getValueByPos(BADMetadataRecordTypes.CHANNEL_ARECORD_DATAVERSE_NAME_FIELD_INDEX)).getStringValue();
+ DataverseName dataverseName = DataverseName.createFromCanonicalForm(dataverseCanonicalName);
String channelName =
((AString) channelRecord.getValueByPos(BADMetadataRecordTypes.CHANNEL_ARECORD_CHANNEL_NAME_FIELD_INDEX))
.getStringValue();
@@ -88,12 +73,9 @@
((AString) channelRecord.getValueByPos(BADMetadataRecordTypes.CHANNEL_ARECORD_RESULTS_NAME_FIELD_INDEX))
.getStringValue();
- IACursor cursor = ((AOrderedList) channelRecord
- .getValueByPos(BADMetadataRecordTypes.CHANNEL_ARECORD_FUNCTION_FIELD_INDEX)).getCursor();
- List<String> functionSignature = new ArrayList<>();
- while (cursor.next()) {
- functionSignature.add(((AString) cursor.get()).getStringValue());
- }
+ AOrderedList function = ((AOrderedList) channelRecord
+ .getValueByPos(BADMetadataRecordTypes.CHANNEL_ARECORD_FUNCTION_FIELD_INDEX));
+ Triple<DataverseName, String, String> functionSignature = ProcedureTupleTranslator.getDependency(function);
String duration =
((AString) channelRecord.getValueByPos(BADMetadataRecordTypes.CHANNEL_ARECORD_DURATION_FIELD_INDEX))
@@ -101,46 +83,37 @@
IACursor dependenciesCursor = ((AOrderedList) channelRecord
.getValueByPos(BADMetadataRecordTypes.CHANNEL_ARECORD_DEPENDENCIES_FIELD_INDEX)).getCursor();
- List<List<List<String>>> dependencies = new ArrayList<>();
- AOrderedList dependencyList;
- AOrderedList qualifiedList;
- int i = 0;
+ List<List<Triple<DataverseName, String, String>>> dependencies = new ArrayList<>();
while (dependenciesCursor.next()) {
- dependencies.add(new ArrayList<>());
- dependencyList = (AOrderedList) dependenciesCursor.get();
- IACursor qualifiedDependencyCursor = dependencyList.getCursor();
- int j = 0;
+ List<Triple<DataverseName, String, String>> dependencyList = new ArrayList<>();
+ IACursor qualifiedDependencyCursor = ((AOrderedList) dependenciesCursor.get()).getCursor();
while (qualifiedDependencyCursor.next()) {
- qualifiedList = (AOrderedList) qualifiedDependencyCursor.get();
- IACursor qualifiedNameCursor = qualifiedList.getCursor();
- dependencies.get(i).add(new ArrayList<>());
- while (qualifiedNameCursor.next()) {
- dependencies.get(i).get(j).add(((AString) qualifiedNameCursor.get()).getStringValue());
- }
- j++;
+ Triple<DataverseName, String, String> dependency =
+ ProcedureTupleTranslator.getDependency((AOrderedList) qualifiedDependencyCursor.get());
+ dependencyList.add(dependency);
}
- i++;
-
+ dependencies.add(dependencyList);
}
String channelBody =
((AString) channelRecord.getValueByPos(BADMetadataRecordTypes.CHANNEL_ARECORD_BODY_FIELD_INDEX))
.getStringValue();
- FunctionSignature signature = new FunctionSignature(functionSignature.get(0), functionSignature.get(1),
- Integer.parseInt(functionSignature.get(2)));
+ FunctionSignature signature = new FunctionSignature(functionSignature.first, functionSignature.second,
+ Integer.parseInt(functionSignature.third));
- channel = new Channel(dataverseName, channelName, subscriptionsName, resultsName, signature, duration,
+ return new Channel(dataverseName, channelName, subscriptionsName, resultsName, signature, duration,
dependencies, channelBody);
- return channel;
}
@Override
public ITupleReference getTupleFromMetadataEntity(Channel channel) throws HyracksDataException, MetadataException {
- // write the key in the first fields of the tuple
+ String dataverseCanonicalName = channel.getChannelId().getDataverseName().getCanonicalForm();
+ // write the key in the first fields of the tuple
tupleBuilder.reset();
- aString.setValue(channel.getChannelId().getDataverse());
+
+ aString.setValue(dataverseCanonicalName);
stringSerde.serialize(aString, tupleBuilder.getDataOutput());
tupleBuilder.addFieldEndOffset();
@@ -152,7 +125,7 @@
// write field 0
fieldValue.reset();
- aString.setValue(channel.getChannelId().getDataverse());
+ aString.setValue(dataverseCanonicalName);
stringSerde.serialize(aString, fieldValue.getDataOutput());
recordBuilder.addField(BADMetadataRecordTypes.CHANNEL_ARECORD_DATAVERSE_NAME_FIELD_INDEX, fieldValue);
@@ -178,7 +151,8 @@
OrderedListBuilder listBuilder = new OrderedListBuilder();
ArrayBackedValueStorage itemValue = new ArrayBackedValueStorage();
listBuilder.reset(stringList);
- for (String pathPart : channel.getFunctionAsPath()) {
+ ProcedureTupleTranslator.getDependencySubNames(channel.getFunctionAsPath(), dependencySubnames);
+ for (String pathPart : dependencySubnames) {
itemValue.reset();
aString.setValue(pathPart);
stringSerde.serialize(aString, itemValue.getDataOutput());
@@ -197,12 +171,13 @@
// write field 6
dependenciesListBuilder.reset((AOrderedListType) BADMetadataRecordTypes.CHANNEL_RECORDTYPE
.getFieldTypes()[BADMetadataRecordTypes.CHANNEL_ARECORD_DEPENDENCIES_FIELD_INDEX]);
- List<List<List<String>>> dependenciesList = channel.getDependencies();
- for (List<List<String>> dependencies : dependenciesList) {
+ List<List<Triple<DataverseName, String, String>>> dependenciesList = channel.getDependencies();
+ for (List<Triple<DataverseName, String, String>> dependencies : dependenciesList) {
dependencyListBuilder.reset(ListofLists);
- for (List<String> dependency : dependencies) {
+ for (Triple<DataverseName, String, String> dependency : dependencies) {
dependencyNameListBuilder.reset(stringList);
- for (String subName : dependency) {
+ ProcedureTupleTranslator.getDependencySubNames(dependency, dependencySubnames);
+ for (String subName : dependencySubnames) {
itemValue.reset();
aString.setValue(subName);
stringSerde.serialize(aString, itemValue.getDataOutput());
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DataverseBrokersSearchKey.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DataverseBrokersSearchKey.java
index 527d65b..4b52a7a 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DataverseBrokersSearchKey.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DataverseBrokersSearchKey.java
@@ -18,16 +18,17 @@
*/
package org.apache.asterix.bad.metadata;
+import org.apache.asterix.common.metadata.DataverseName;
import org.apache.asterix.metadata.MetadataNode;
import org.apache.asterix.metadata.api.ExtensionMetadataDatasetId;
import org.apache.asterix.metadata.api.IExtensionMetadataSearchKey;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
public class DataverseBrokersSearchKey implements IExtensionMetadataSearchKey {
- private static final long serialVersionUID = 1L;
- private final String dataverse;
+ private static final long serialVersionUID = 2L;
+ private final DataverseName dataverse;
- public DataverseBrokersSearchKey(String dataverse) {
+ public DataverseBrokersSearchKey(DataverseName dataverse) {
this.dataverse = dataverse;
}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DataverseChannelsSearchKey.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DataverseChannelsSearchKey.java
index ffb3ab6..c27f7c0 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DataverseChannelsSearchKey.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DataverseChannelsSearchKey.java
@@ -18,16 +18,17 @@
*/
package org.apache.asterix.bad.metadata;
+import org.apache.asterix.common.metadata.DataverseName;
import org.apache.asterix.metadata.MetadataNode;
import org.apache.asterix.metadata.api.ExtensionMetadataDatasetId;
import org.apache.asterix.metadata.api.IExtensionMetadataSearchKey;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
public class DataverseChannelsSearchKey implements IExtensionMetadataSearchKey {
- private static final long serialVersionUID = 1L;
- private final String dataverse;
+ private static final long serialVersionUID = 2L;
+ private final DataverseName dataverse;
- public DataverseChannelsSearchKey(String dataverse) {
+ public DataverseChannelsSearchKey(DataverseName dataverse) {
this.dataverse = dataverse;
}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DataverseProceduresSearchKey.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DataverseProceduresSearchKey.java
index 9699e21..3f6c143 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DataverseProceduresSearchKey.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DataverseProceduresSearchKey.java
@@ -18,16 +18,17 @@
*/
package org.apache.asterix.bad.metadata;
+import org.apache.asterix.common.metadata.DataverseName;
import org.apache.asterix.metadata.MetadataNode;
import org.apache.asterix.metadata.api.ExtensionMetadataDatasetId;
import org.apache.asterix.metadata.api.IExtensionMetadataSearchKey;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
public class DataverseProceduresSearchKey implements IExtensionMetadataSearchKey {
- private static final long serialVersionUID = 1L;
- private final String dataverse;
+ private static final long serialVersionUID = 2L;
+ private final DataverseName dataverse;
- public DataverseProceduresSearchKey(String dataverse) {
+ public DataverseProceduresSearchKey(DataverseName dataverse) {
this.dataverse = dataverse;
}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Procedure.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Procedure.java
index dff4577..c989611 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Procedure.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Procedure.java
@@ -23,8 +23,10 @@
import org.apache.asterix.active.EntityId;
import org.apache.asterix.bad.BADConstants;
+import org.apache.asterix.common.metadata.DataverseName;
import org.apache.asterix.metadata.api.ExtensionMetadataDatasetId;
import org.apache.asterix.metadata.api.IExtensionMetadataEntity;
+import org.apache.hyracks.algebricks.common.utils.Triple;
public class Procedure implements IExtensionMetadataEntity {
private static final long serialVersionUID = 1L;
@@ -40,14 +42,15 @@
/*
Dependencies are stored as an array of size two:
element 0 is a list of dataset dependencies
- -stored as lists of [DataverseName, Dataset] for the datasets
+ -stored as triples of [DataverseName, Dataset, null] for the datasets
element 1 is a list of function dependencies
- -stored as lists of [DataverseName, FunctionName, Arity] for the functions
+ -stored as triples of [DataverseName, FunctionName, Arity] for the functions
*/
- private final List<List<List<String>>> dependencies;
+ private final List<List<Triple<DataverseName, String, String>>> dependencies;
- public Procedure(String dataverseName, String functionName, int arity, List<String> params, String type,
- String functionBody, String language, String duration, List<List<List<String>>> dependencies) {
+ public Procedure(DataverseName dataverseName, String functionName, int arity, List<String> params, String type,
+ String functionBody, String language, String duration,
+ List<List<Triple<DataverseName, String, String>>> dependencies) {
this.procedureId = new EntityId(BADConstants.PROCEDURE_KEYWORD, dataverseName, functionName);
this.params = params;
this.body = functionBody;
@@ -92,7 +95,7 @@
return duration;
}
- public List<List<List<String>>> getDependencies() {
+ public List<List<Triple<DataverseName, String, String>>> getDependencies() {
return dependencies;
}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ProcedureSearchKey.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ProcedureSearchKey.java
index 6456170..60fd77a 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ProcedureSearchKey.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ProcedureSearchKey.java
@@ -18,18 +18,19 @@
*/
package org.apache.asterix.bad.metadata;
+import org.apache.asterix.common.metadata.DataverseName;
import org.apache.asterix.metadata.MetadataNode;
import org.apache.asterix.metadata.api.ExtensionMetadataDatasetId;
import org.apache.asterix.metadata.api.IExtensionMetadataSearchKey;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
public class ProcedureSearchKey implements IExtensionMetadataSearchKey {
- private static final long serialVersionUID = 1L;
- private final String dataverse;
+ private static final long serialVersionUID = 2L;
+ private final DataverseName dataverse;
private final String channel;
private final String arity;
- public ProcedureSearchKey(String dataverse, String channel, String arity) {
+ public ProcedureSearchKey(DataverseName dataverse, String channel, String arity) {
this.dataverse = dataverse;
this.channel = channel;
this.arity = arity;
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ProcedureTupleTranslator.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ProcedureTupleTranslator.java
index 2bf00c0..d5395d2 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ProcedureTupleTranslator.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ProcedureTupleTranslator.java
@@ -19,15 +19,12 @@
package org.apache.asterix.bad.metadata;
-import java.io.ByteArrayInputStream;
-import java.io.DataInput;
-import java.io.DataInputStream;
import java.util.ArrayList;
import java.util.List;
import org.apache.asterix.builders.OrderedListBuilder;
import org.apache.asterix.common.exceptions.MetadataException;
-import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
+import org.apache.asterix.common.metadata.DataverseName;
import org.apache.asterix.metadata.entitytupletranslators.AbstractTupleTranslator;
import org.apache.asterix.om.base.AOrderedList;
import org.apache.asterix.om.base.ARecord;
@@ -35,7 +32,7 @@
import org.apache.asterix.om.base.IACursor;
import org.apache.asterix.om.types.AOrderedListType;
import org.apache.asterix.om.types.BuiltinType;
-import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.algebricks.common.utils.Triple;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
@@ -55,34 +52,23 @@
// Payload field containing serialized Procedure.
public static final int PROCEDURE_PAYLOAD_TUPLE_FIELD_INDEX = 3;
- private ISerializerDeserializer<ARecord> recordSerDes = SerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BADMetadataRecordTypes.PROCEDURE_RECORDTYPE);
-
private transient OrderedListBuilder dependenciesListBuilder = new OrderedListBuilder();
private transient OrderedListBuilder dependencyListBuilder = new OrderedListBuilder();
private transient OrderedListBuilder dependencyNameListBuilder = new OrderedListBuilder();
+ private transient List<String> dependencySubnames = new ArrayList<>();
private transient AOrderedListType stringList = new AOrderedListType(BuiltinType.ASTRING, null);
- private transient AOrderedListType ListofLists =
+ private transient AOrderedListType listOfLists =
new AOrderedListType(new AOrderedListType(BuiltinType.ASTRING, null), null);
protected ProcedureTupleTranslator(boolean getTuple) {
- super(getTuple, BADMetadataIndexes.NUM_FIELDS_PROCEDURE_IDX);
+ super(getTuple, BADMetadataIndexes.PROCEDURE_DATASET, PROCEDURE_PAYLOAD_TUPLE_FIELD_INDEX);
}
@Override
- public Procedure getMetadataEntityFromTuple(ITupleReference frameTuple) throws HyracksDataException {
- byte[] serRecord = frameTuple.getFieldData(PROCEDURE_PAYLOAD_TUPLE_FIELD_INDEX);
- int recordStartOffset = frameTuple.getFieldStart(PROCEDURE_PAYLOAD_TUPLE_FIELD_INDEX);
- int recordLength = frameTuple.getFieldLength(PROCEDURE_PAYLOAD_TUPLE_FIELD_INDEX);
- ByteArrayInputStream stream = new ByteArrayInputStream(serRecord, recordStartOffset, recordLength);
- DataInput in = new DataInputStream(stream);
- ARecord procedureRecord = recordSerDes.deserialize(in);
- return createProcedureFromARecord(procedureRecord);
- }
-
- private Procedure createProcedureFromARecord(ARecord procedureRecord) {
- String dataverseName = ((AString) procedureRecord
+ public Procedure createMetadataEntityFromARecord(ARecord procedureRecord) throws HyracksDataException {
+ String dataverseCanonicalName = ((AString) procedureRecord
.getValueByPos(BADMetadataRecordTypes.PROCEDURE_ARECORD_DATAVERSENAME_FIELD_INDEX)).getStringValue();
+ DataverseName dataverseName = DataverseName.createFromCanonicalForm(dataverseCanonicalName);
String procedureName = ((AString) procedureRecord
.getValueByPos(BADMetadataRecordTypes.PROCEDURE_ARECORD_PROCEDURE_NAME_FIELD_INDEX)).getStringValue();
String arity = ((AString) procedureRecord
@@ -112,39 +98,44 @@
IACursor dependenciesCursor = ((AOrderedList) procedureRecord
.getValueByPos(BADMetadataRecordTypes.PROCEDURE_ARECORD_DEPENDENCIES_FIELD_INDEX)).getCursor();
- List<List<List<String>>> dependencies = new ArrayList<>();
- AOrderedList dependencyList;
- AOrderedList qualifiedList;
- int i = 0;
+ List<List<Triple<DataverseName, String, String>>> dependencies = new ArrayList<>();
while (dependenciesCursor.next()) {
- dependencies.add(new ArrayList<>());
- dependencyList = (AOrderedList) dependenciesCursor.get();
- IACursor qualifiedDependencyCursor = dependencyList.getCursor();
- int j = 0;
+ List<Triple<DataverseName, String, String>> dependencyList = new ArrayList<>();
+ IACursor qualifiedDependencyCursor = ((AOrderedList) dependenciesCursor.get()).getCursor();
while (qualifiedDependencyCursor.next()) {
- qualifiedList = (AOrderedList) qualifiedDependencyCursor.get();
- IACursor qualifiedNameCursor = qualifiedList.getCursor();
- dependencies.get(i).add(new ArrayList<>());
- while (qualifiedNameCursor.next()) {
- dependencies.get(i).get(j).add(((AString) qualifiedNameCursor.get()).getStringValue());
- }
- j++;
+ Triple<DataverseName, String, String> dependency =
+ getDependency((AOrderedList) qualifiedDependencyCursor.get());
+ dependencyList.add(dependency);
}
- i++;
-
+ dependencies.add(dependencyList);
}
return new Procedure(dataverseName, procedureName, Integer.parseInt(arity), params, returnType, definition,
language, duration, dependencies);
+ }
+ static Triple<DataverseName, String, String> getDependency(AOrderedList dependencySubnames) {
+ String dataverseCanonicalName = ((AString) dependencySubnames.getItem(0)).getStringValue();
+ DataverseName dataverseName = DataverseName.createFromCanonicalForm(dataverseCanonicalName);
+ String second = null, third = null;
+ int ln = dependencySubnames.size();
+ if (ln > 1) {
+ second = ((AString) dependencySubnames.getItem(1)).getStringValue();
+ if (ln > 2) {
+ third = ((AString) dependencySubnames.getItem(2)).getStringValue();
+ }
+ }
+ return new Triple<>(dataverseName, second, third);
}
@Override
public ITupleReference getTupleFromMetadataEntity(Procedure procedure)
throws HyracksDataException, MetadataException {
+ String dataverseCanonicalName = procedure.getEntityId().getDataverseName().getCanonicalForm();
+
// write the key in the first 2 fields of the tuple
tupleBuilder.reset();
- aString.setValue(procedure.getEntityId().getDataverse());
+ aString.setValue(dataverseCanonicalName);
stringSerde.serialize(aString, tupleBuilder.getDataOutput());
tupleBuilder.addFieldEndOffset();
aString.setValue(procedure.getEntityId().getEntityName());
@@ -160,7 +151,7 @@
// write field 0
fieldValue.reset();
- aString.setValue(procedure.getEntityId().getDataverse());
+ aString.setValue(dataverseCanonicalName);
stringSerde.serialize(aString, fieldValue.getDataOutput());
recordBuilder.addField(BADMetadataRecordTypes.PROCEDURE_ARECORD_DATAVERSENAME_FIELD_INDEX, fieldValue);
@@ -218,12 +209,13 @@
// write field 8
dependenciesListBuilder.reset((AOrderedListType) BADMetadataRecordTypes.PROCEDURE_RECORDTYPE
.getFieldTypes()[BADMetadataRecordTypes.PROCEDURE_ARECORD_DEPENDENCIES_FIELD_INDEX]);
- List<List<List<String>>> dependenciesList = procedure.getDependencies();
- for (List<List<String>> dependencies : dependenciesList) {
- dependencyListBuilder.reset(ListofLists);
- for (List<String> dependency : dependencies) {
+ List<List<Triple<DataverseName, String, String>>> dependenciesList = procedure.getDependencies();
+ for (List<Triple<DataverseName, String, String>> dependencies : dependenciesList) {
+ dependencyListBuilder.reset(listOfLists);
+ for (Triple<DataverseName, String, String> dependency : dependencies) {
dependencyNameListBuilder.reset(stringList);
- for (String subName : dependency) {
+ getDependencySubNames(dependency, dependencySubnames);
+ for (String subName : dependencySubnames) {
itemValue.reset();
aString.setValue(subName);
stringSerde.serialize(aString, itemValue.getDataOutput());
@@ -250,4 +242,14 @@
return tuple;
}
+ static void getDependencySubNames(Triple<DataverseName, String, String> dependency, List<String> outList) {
+ outList.clear();
+ outList.add(dependency.first.getCanonicalForm());
+ if (dependency.second != null) {
+ outList.add(dependency.second);
+ }
+ if (dependency.third != null) {
+ outList.add(dependency.third);
+ }
+ }
}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/recovery/BADGlobalRecoveryManager.java b/asterix-bad/src/main/java/org/apache/asterix/bad/recovery/BADGlobalRecoveryManager.java
index 629fab3..6dcf1fd 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/recovery/BADGlobalRecoveryManager.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/recovery/BADGlobalRecoveryManager.java
@@ -138,7 +138,7 @@
listener.setExecutorService(ses);
metadataProvider.getLocks().unlock();
- LOGGER.log(Level.SEVERE, entityId.getExtensionName() + " " + entityId.getDataverse() + "."
+ LOGGER.log(Level.SEVERE, entityId.getExtensionName() + " " + entityId.getDataverseName() + "."
+ entityId.getEntityName() + " was stopped by cluster failure. It has restarted.");
}
@@ -164,7 +164,7 @@
true);
metadataProvider.getLocks().unlock();
//Log that the procedure stopped by cluster restart. Procedure is available again now.
- LOGGER.log(Level.SEVERE, entityId.getExtensionName() + " " + entityId.getDataverse() + "."
+ LOGGER.log(Level.SEVERE, entityId.getExtensionName() + " " + entityId.getDataverseName() + "."
+ entityId.getEntityName()
+ " was lost with cluster failure and any repetitive instances have stopped. It is now available to run again.");
//TODO: allow repetitive procedures to restart execution automatically
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java b/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
index 79051b4..6a0b31c 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
@@ -26,8 +26,10 @@
import org.apache.asterix.bad.BADConstants;
import org.apache.asterix.bad.runtime.NotifyBrokerOperator;
import org.apache.asterix.bad.runtime.NotifyBrokerPOperator;
+import org.apache.asterix.common.metadata.DataverseName;
import org.apache.asterix.lang.common.util.FunctionUtil;
import org.apache.asterix.metadata.declared.DatasetDataSource;
+import org.apache.asterix.metadata.utils.MetadataConstants;
import org.apache.asterix.om.base.AString;
import org.apache.asterix.om.constants.AsterixConstantValue;
import org.apache.asterix.om.functions.BuiltinFunctions;
@@ -84,7 +86,7 @@
push = true;
}
DataSourceScanOperator subscriptionsScan;
- String channelDataverse;
+ DataverseName channelDataverse;
String channelName;
if (!push) {
@@ -102,7 +104,7 @@
}
DatasetDataSource dds = (DatasetDataSource) insertOp.getDataSource();
String datasetName = dds.getDataset().getDatasetName();
- if (!dds.getDataset().getItemTypeDataverseName().equals("Metadata")
+ if (!dds.getDataset().getItemTypeDataverseName().equals(MetadataConstants.METADATA_DATAVERSE_NAME)
|| !dds.getDataset().getItemTypeName().equals("ChannelResultsType")
|| !datasetName.endsWith("Results")) {
return false;
@@ -179,7 +181,7 @@
}
private DelegateOperator createBrokerOp(LogicalVariable brokerEndpointVar, LogicalVariable sendVar,
- LogicalVariable channelExecutionVar, String channelDataverse, String channelName, boolean push) {
+ LogicalVariable channelExecutionVar, DataverseName channelDataverse, String channelName, boolean push) {
NotifyBrokerOperator notifyBrokerOp =
new NotifyBrokerOperator(brokerEndpointVar, sendVar, channelExecutionVar, push);
EntityId activeId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, channelDataverse, channelName);
@@ -192,7 +194,7 @@
private DelegateOperator createNotifyBrokerPushPlan(LogicalVariable brokerEndpointVar, LogicalVariable sendVar,
LogicalVariable channelExecutionVar, IOptimizationContext context, ILogicalOperator eOp,
- DistributeResultOperator distributeOp, String channelDataverse, String channelName)
+ DistributeResultOperator distributeOp, DataverseName channelDataverse, String channelName)
throws AlgebricksException {
//Find the assign operator to get the result type that we need
AbstractLogicalOperator assign = (AbstractLogicalOperator) eOp.getInputs().get(0).getValue();
@@ -213,7 +215,7 @@
private DelegateOperator createNotifyBrokerPullPlan(LogicalVariable brokerEndpointVar, LogicalVariable sendVar,
LogicalVariable channelExecutionVar, IOptimizationContext context, ILogicalOperator eOp,
- DistributeResultOperator distributeOp, String channelDataverse, String channelName)
+ DistributeResultOperator distributeOp, DataverseName channelDataverse, String channelName)
throws AlgebricksException {
//Create the Distinct Op
@@ -354,7 +356,7 @@
if (op instanceof DataSourceScanOperator) {
if (((DataSourceScanOperator) op).getDataSource() instanceof DatasetDataSource) {
DatasetDataSource dds = (DatasetDataSource) ((DataSourceScanOperator) op).getDataSource();
- if (dds.getDataset().getDataverseName().equals("Metadata")
+ if (dds.getDataset().getDataverseName().equals(MetadataConstants.METADATA_DATAVERSE_NAME)
&& dds.getDataset().getDatasetName().equals("Broker")) {
return true;
}
@@ -367,7 +369,7 @@
if (op instanceof DataSourceScanOperator) {
if (((DataSourceScanOperator) op).getDataSource() instanceof DatasetDataSource) {
DatasetDataSource dds = (DatasetDataSource) ((DataSourceScanOperator) op).getDataSource();
- if (dds.getDataset().getItemTypeDataverseName().equals("Metadata")
+ if (dds.getDataset().getItemTypeDataverseName().equals(MetadataConstants.METADATA_DATAVERSE_NAME)
&& dds.getDataset().getItemTypeName().equals("ChannelSubscriptionsType")) {
if (subscriptionsName.equals("") || dds.getDataset().getDatasetName().equals(subscriptionsName)) {
return true;
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java
index ac742e8..69fb7d4 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java
@@ -121,9 +121,9 @@
if (push) {
resultTitle = "\"results\"";
}
- String jsonStr = "{ \"dataverseName\":\"" + entityId.getDataverse() + "\", \"channelName\":\""
- + entityId.getEntityName() + "\", \"" + BADConstants.ChannelExecutionTime + "\":\""
- + executionTimeString + "\", " + resultTitle + ":[";
+ String jsonStr = "{ \"dataverseName\":\"" + entityId.getDataverseName().getCanonicalForm()
+ + "\", \"channelName\":\"" + entityId.getEntityName() + "\", \"" + BADConstants.ChannelExecutionTime
+ + "\":\"" + executionTimeString + "\", " + resultTitle + ":[";
jsonStr += sendData.get(endpoint);
jsonStr = jsonStr.substring(0, jsonStr.length());
jsonStr += "]}";
diff --git a/asterix-bad/src/main/resources/lang-extension/lang.txt b/asterix-bad/src/main/resources/lang-extension/lang.txt
index 58240fd..42c4223 100644
--- a/asterix-bad/src/main/resources/lang-extension/lang.txt
+++ b/asterix-bad/src/main/resources/lang-extension/lang.txt
@@ -97,7 +97,7 @@
@new
CreateChannelStatement ChannelSpecification() throws ParseException:
{
- Pair<Identifier,Identifier> nameComponents = null;
+ Pair<DataverseName,Identifier> nameComponents = null;
FunctionSignature appliedFunction = null;
CreateChannelStatement ccs = null;
String fqFunctionName = null;
@@ -133,7 +133,7 @@
Token endPos;
Statement functionBodyExpr;
Expression period = null;
- String currentDataverse = defaultDataverse;
+ DataverseName currentDataverse = defaultDataverse;
createNewScope();
}
{
@@ -200,7 +200,7 @@
CreateBrokerStatement BrokerSpecification() throws ParseException:
{
CreateBrokerStatement cbs = null;
- Pair<Identifier,Identifier> name = null;
+ Pair<DataverseName,Identifier> name = null;
String endPoint = null;
}
{
@@ -220,12 +220,12 @@
Statement ChannelSubscriptionStatement() throws ParseException:
{
Statement stmt = null;
- Pair<Identifier,Identifier> nameComponents = null;
+ Pair<DataverseName,Identifier> nameComponents = null;
List<Expression> argList = new ArrayList<Expression>();
Expression tmp = null;
String id = null;
String subscriptionId = null;
- Pair<Identifier,Identifier> brokerName = null;
+ Pair<DataverseName,Identifier> brokerName = null;
}
{
(