Introducing BAD-CQ
a. Active datasets
Active datasets are like regular datasets that can be
inserted/upserted/deleted with statements and fed with data feeds
directly. Records stored in active datasets contain an additional
active timestamp field (implemented using meta-records). The active
timestamp is assigned inside the storage, right before persistence.
This patch introduced new syntactic components to enable creating
active datasets. It also created a BAD query translator to make sure
all DML statements can operate on active datasets.
b. Continuous channels
Continuous channels are built on repetitive channels but provide
continuous query semantics. To ensure that, this patch introduced an
active timestamp manager on each node to manage channel execution
times locally. Active timestamp managers are a local class that is
tied to a JVM on a node. There is also an optimization rule for
ensuring continuous query semantics in a distributed environment.
c. Active functions
Active functions are used for helping users create continuous queries.
They were added through the extension APIs.
d. BAD Islands
As an application built on BAD-CQ, BAD islands show how we connect
data channels to data feeds and share data between different BAD
systems declaratively.
e. Cleanups and fixes
This patch also cleaned up the BAD codebase and introduced tests for
metadata, optimizer, and runtime. It fixed a type inferencing issue in
InsertBrokerNotifierForChannelRule when there is a group-by in the
query. Also, it optimized the broker notification delivery to use
separate threads.
Change-Id: I77263c3fedd03205b83fe13978649b33fccda11c
diff --git a/asterix-bad/src/main/resources/lang-extension/lang.txt b/asterix-bad/src/main/resources/lang-extension/lang.txt
index 59e1a8d..f2b6410 100644
--- a/asterix-bad/src/main/resources/lang-extension/lang.txt
+++ b/asterix-bad/src/main/resources/lang-extension/lang.txt
@@ -22,7 +22,9 @@
import org.apache.asterix.bad.lang.statement.ChannelSubscribeStatement;
import org.apache.asterix.bad.lang.statement.ChannelUnsubscribeStatement;
import org.apache.asterix.bad.lang.statement.CreateBrokerStatement;
-import org.apache.asterix.bad.lang.statement.CreateChannelStatement;
+import org.apache.asterix.bad.lang.statement.AbstractCreateChannelStatement;
+import org.apache.asterix.bad.lang.statement.CreateRepetitiveChannelStatement;
+import org.apache.asterix.bad.lang.statement.CreateContinuousChannelStatement;
import org.apache.asterix.bad.lang.statement.CreateProcedureStatement;
import org.apache.asterix.bad.lang.statement.ExecuteProcedureStatement;
import org.apache.asterix.bad.lang.statement.ProcedureDropStatement;
@@ -30,6 +32,21 @@
import org.apache.asterix.lang.sqlpp.parser.SqlppParseException;
import org.apache.asterix.lang.sqlpp.parser.Token;
+@new
+<DEFAULT,IN_DBL_BRACE>
+TOKEN [IGNORE_CASE]:
+{
+ <BROKER : "broker">
+ | <CHANNEL : "channel">
+ | <PROCEDURE : "procedure">
+ | <SUBSCRIBE : "subscribe">
+ | <ACTIVE : "active">
+ | <UNSUBSCRIBE : "unsubscribe">
+ | <REPETITIVE : "repetitive">
+ | <CONTINUOUS : "continuous">
+ | <PERIOD : "period">
+ | <PUSH : "push">
+}
@merge
Statement SingleStatement() throws ParseException:
@@ -59,9 +76,10 @@
(
// merge area 2
before:
- after: | stmt = CreateChannelStatement()
- | stmt = CreateBrokerStatement()
+ after: | stmt = CreateChannelStatement(startToken)
+ | stmt = CreateBrokerStatement(startToken)
| stmt = CreateProcedureStatement()
+ | stmt = ActiveStatementSpecification(startToken)
)
{
// merge area 3
@@ -89,29 +107,160 @@
}
@new
-CreateChannelStatement CreateChannelStatement() throws ParseException:
+Statement ActiveStatementSpecification(Token startStmtToken) throws ParseException:
+{
+ Statement stmt = null;
+}
+{
+ "ACTIVE"
+ stmt = ActiveDatasetSpecification(startStmtToken)
+ {
+ return stmt;
+ }
+}
+
+@new
+DatasetDecl ActiveDatasetSpecification(Token startStmtToken) throws ParseException:
+{
+ Pair<DataverseName,Identifier> nameComponents = null;
+ TypeExpression activeRecordTypeExpr = new TypeReferenceExpression(
+ new Pair<DataverseName,Identifier>(DataverseName.createBuiltinDataverseName("Metadata"), new Identifier("ActiveRecordType")));
+ TypeExpression datasetTypeExpr = null;
+ boolean ifNotExists = false;
+ Map<String,String> properties = null;
+ Pair<List<Integer>, List<List<String>>> primaryKeyFields = null;
+ String nodeGroupName = null;
+ Map<String,String> hints = new HashMap<String,String>();
+ DatasetDecl stmt = null;
+ boolean autogenerated = false;
+ RecordConstructor withRecord = null;
+
+}
+{
+ Dataset() nameComponents = QualifiedName()
+ datasetTypeExpr = DatasetTypeSpecification()
+ ifNotExists = IfNotExists()
+ primaryKeyFields = PrimaryKey()
+ (<AUTOGENERATED> { autogenerated = true; } )?
+ (<ON> nodeGroupName = Identifier() )?
+ ( <HINTS> hints = Properties() )?
+ ( <WITH> withRecord = RecordConstructor() )?
+ {
+ // TODO: add filters on meta records
+ InternalDetailsDecl idd = new InternalDetailsDecl(primaryKeyFields.second,
+ primaryKeyFields.first,
+ autogenerated,
+ null);
+ try{
+ stmt = new DatasetDecl(nameComponents.first,
+ nameComponents.second,
+ datasetTypeExpr,
+ activeRecordTypeExpr,
+ nodeGroupName != null ? new Identifier(nodeGroupName) : null,
+ hints,
+ DatasetType.INTERNAL,
+ idd,
+ withRecord,
+ ifNotExists);
+
+ } catch (CompilationException e){
+ throw new SqlppParseException(getSourceLocation(startStmtToken), e.getMessage());
+ }
+ }
+ {
+ return addSourceLocation(stmt, startStmtToken);
+ }
+}
+
+
+@new
+AbstractCreateChannelStatement CreateChannelStatement(Token startStmtToken) throws ParseException:
+{
+ AbstractCreateChannelStatement ccs = null;
+}
+{
+ (
+ <REPETITIVE> { ccs = CreateRepetitiveChannel(startStmtToken); }
+ | <CONTINUOUS> { ccs = CreateContinuousChannel(startStmtToken); }
+ )
+ {
+ return ccs;
+ }
+}
+
+@new
+CreateRepetitiveChannelStatement CreateRepetitiveChannel(Token startStmtToken) throws ParseException:
{
Pair<DataverseName,Identifier> nameComponents = null;
FunctionSignature appliedFunction = null;
- CreateChannelStatement ccs = null;
+ CreateRepetitiveChannelStatement ccs = null;
String fqFunctionName = null;
Expression period = null;
boolean push = false;
}
{
- (
- "repetitive"
- ( "push" { push = true; } )?
- "channel" nameComponents = QualifiedName()
- <USING> appliedFunction = FunctionSignature()
- "period" period = FunctionCallExpr()
+ ( <PUSH> { push = true; } )?
+ <CHANNEL> nameComponents = QualifiedName()
+ <USING> appliedFunction = FunctionSignature()
+ <PERIOD> period = FunctionCallExpr()
+ {
+ ccs = new CreateRepetitiveChannelStatement(nameComponents.first,
+ nameComponents.second, appliedFunction, period, push);
+ }
+ {
+ return ccs;
+ }
+}
+
+
+@new
+CreateContinuousChannelStatement CreateContinuousChannel(Token startStmtToken) throws ParseException:
+{
+ // Channel related
+ CreateContinuousChannelStatement ccs = null;
+ Expression period = null;
+ boolean push = false;
+ boolean ifNotExists = false;
+ // Function related
+ FunctionSignature signature;
+ String functionBody;
+ Expression functionBodyExpr;
+ Token beginPos;
+ Token endPos;
+ FunctionName fctName = null;
+ TypeExpression returnType = null;
+ List<Pair<VarIdentifier,TypeExpression>> params = null;
+ DataverseName currentDataverse = defaultDataverse;
+}
+{
+ ( <PUSH> { push = true; } )?
+ <CHANNEL>
+ fctName = FunctionName()
+ {
+ defaultDataverse = fctName.dataverse;
+ }
+ ifNotExists = IfNotExists()
+ params = FunctionParameters()
+ <PERIOD> period = FunctionCallExpr()
+ <LEFTBRACE>
+ {
+ createNewScope();
+ beginPos = token;
+ }
+ functionBodyExpr = FunctionBody()
+ returnType = FunctionReturnType()
+ <RIGHTBRACE>
{
- ccs = new CreateChannelStatement(nameComponents.first,
- nameComponents.second, appliedFunction, period, push);
- }
- )
- {
- return ccs;
+ endPos = token;
+ functionBody = extractFragment(beginPos.beginLine, beginPos.beginColumn, endPos.beginLine, endPos.beginColumn);
+ signature = new FunctionSignature(fctName.dataverse, fctName.function, params.size());
+ getCurrentScope().addFunctionDescriptor(signature, false);
+ removeCurrentScope();
+ defaultDataverse = currentDataverse;
+ ensureNoTypeDeclsInFunction(fctName.function, params, returnType, startStmtToken);
+ CreateFunctionStatement stmt = new CreateFunctionStatement(signature, params, functionBody, functionBodyExpr, false);
+ ccs = new CreateContinuousChannelStatement(fctName.dataverse, new Identifier(fctName.function), period, push, stmt);
+ return addSourceLocation(ccs, startStmtToken);
}
}
@@ -131,7 +280,7 @@
createNewScope();
}
{
- "procedure" fctName = FunctionName()
+ <PROCEDURE> fctName = FunctionName()
{
defaultDataverse = fctName.dataverse;
}
@@ -154,7 +303,7 @@
removeCurrentScope();
defaultDataverse = currentDataverse;
}
- ("period" period = FunctionCallExpr())?
+ (<PERIOD> period = FunctionCallExpr())?
{
List<VarIdentifier> paramListVariablesOnly = new ArrayList<VarIdentifier>();
for(Pair<VarIdentifier,TypeExpression> p: paramList){
@@ -195,18 +344,24 @@
}
@new
-CreateBrokerStatement CreateBrokerStatement() throws ParseException:
+CreateBrokerStatement CreateBrokerStatement(Token startStmtToken) throws ParseException:
{
CreateBrokerStatement cbs = null;
Pair<DataverseName,Identifier> name = null;
String endPoint = null;
+ RecordConstructor withRecord = null;
}
{
(
- "broker" name = QualifiedName()
+ <BROKER> name = QualifiedName()
<AT> endPoint = StringLiteral()
+ ( <WITH> withRecord = RecordConstructor() )?
{
- cbs = new CreateBrokerStatement(name.first, name.second,endPoint);
+ try {
+ cbs = new CreateBrokerStatement(name.first, name.second, endPoint, withRecord);
+ } catch (CompilationException e) {
+ throw new SqlppParseException(getSourceLocation(startStmtToken), e.getMessage());
+ }
}
)
{
@@ -227,7 +382,7 @@
}
{
(
- "subscribe" <TO> nameComponents = QualifiedName()
+ <SUBSCRIBE> <TO> nameComponents = QualifiedName()
<LEFTPAREN> (tmp = Expression()
{
argList.add(tmp);
@@ -240,7 +395,7 @@
{
stmt = new ChannelSubscribeStatement(nameComponents.first, nameComponents.second, argList, getVarCounter(), brokerName.first, brokerName.second, subscriptionId);
}
- | "unsubscribe" id = StringLiteral() <FROM> nameComponents = QualifiedName()
+ | <UNSUBSCRIBE> id = StringLiteral() <FROM> nameComponents = QualifiedName()
{
VariableExpr varExp = new VariableExpr(new VarIdentifier("$subscriptionPlaceholder"));
getCurrentScope().addNewVarSymbolToScope(varExp.getVar());
@@ -274,7 +429,7 @@
boolean ifExists = false;
}
{
- "channel" pairId = QualifiedName() ifExists = IfExists()
+ <CHANNEL> pairId = QualifiedName() ifExists = IfExists()
{
stmt = new ChannelDropStatement(pairId.first, pairId.second, ifExists);
return addSourceLocation(stmt, startStmtToken);
@@ -289,7 +444,7 @@
boolean ifExists = false;
}
{
- "broker" pairId = QualifiedName() ifExists = IfExists()
+ <BROKER> pairId = QualifiedName() ifExists = IfExists()
{
stmt = new BrokerDropStatement(pairId.first, pairId.second, ifExists);
return addSourceLocation(stmt, startStmtToken);
@@ -304,7 +459,7 @@
boolean ifExists = false;
}
{
- "procedure" funcSig = FunctionSignature() ifExists = IfExists()
+ <PROCEDURE> funcSig = FunctionSignature() ifExists = IfExists()
{
stmt = new ProcedureDropStatement(funcSig, ifExists);
return addSourceLocation(stmt, startStmtToken);