Introducing BAD-CQ

a. Active datasets
  Active datasets are like regular datasets that can be
  inserted/upserted/deleted with statements and fed with data feeds
  directly.  Records stored in active datasets contain an additional
  active timestamp field (implemented using meta-records). The active
  timestamp is assigned inside the storage, right before persistence.
  This patch introduced new syntactic components to enable creating
  active datasets.  It also created a BAD query translator to make sure
  all DML statements can operate on active datasets.
b. Continuous channels
  Continuous channels are built on repetitive channels but provide
  continuous query semantics. To ensure that, this patch introduced an
  active timestamp manager on each node to manage channel execution
  times locally. Active timestamp managers are a local class that is
  tied to a JVM on a node. There is also an optimization rule for
  ensuring continuous query semantics in a distributed environment.
c. Active functions
  Active functions are used for helping users create continuous queries.
  They were added through the extension APIs.
d. BAD Islands
  As an application built on BAD-CQ, BAD islands show how we connect
  data channels to data feeds and share data between different BAD
  systems declaratively.
e. Cleanups and fixes
  This patch also cleaned up the BAD codebase and introduced tests for
  metadata, optimizer, and runtime. It fixed a type inferencing issue in
  InsertBrokerNotifierForChannelRule when there is a group-by in the
  query. Also, it optimized the broker notification delivery to use
  separate threads.

Change-Id: I77263c3fedd03205b83fe13978649b33fccda11c
diff --git a/asterix-bad/src/main/resources/lang-extension/lang.txt b/asterix-bad/src/main/resources/lang-extension/lang.txt
index 59e1a8d..f2b6410 100644
--- a/asterix-bad/src/main/resources/lang-extension/lang.txt
+++ b/asterix-bad/src/main/resources/lang-extension/lang.txt
@@ -22,7 +22,9 @@
 import org.apache.asterix.bad.lang.statement.ChannelSubscribeStatement;
 import org.apache.asterix.bad.lang.statement.ChannelUnsubscribeStatement;
 import org.apache.asterix.bad.lang.statement.CreateBrokerStatement;
-import org.apache.asterix.bad.lang.statement.CreateChannelStatement;
+import org.apache.asterix.bad.lang.statement.AbstractCreateChannelStatement;
+import org.apache.asterix.bad.lang.statement.CreateRepetitiveChannelStatement;
+import org.apache.asterix.bad.lang.statement.CreateContinuousChannelStatement;
 import org.apache.asterix.bad.lang.statement.CreateProcedureStatement;
 import org.apache.asterix.bad.lang.statement.ExecuteProcedureStatement;
 import org.apache.asterix.bad.lang.statement.ProcedureDropStatement;
@@ -30,6 +32,21 @@
 import org.apache.asterix.lang.sqlpp.parser.SqlppParseException;
 import org.apache.asterix.lang.sqlpp.parser.Token;
 
+@new
+<DEFAULT,IN_DBL_BRACE>
+TOKEN [IGNORE_CASE]:
+{
+  <BROKER : "broker">
+  |  <CHANNEL : "channel">
+  |  <PROCEDURE : "procedure">
+  |  <SUBSCRIBE : "subscribe">
+  |  <ACTIVE : "active">
+  | <UNSUBSCRIBE : "unsubscribe">
+  | <REPETITIVE : "repetitive">
+  | <CONTINUOUS : "continuous">
+  | <PERIOD : "period">
+  | <PUSH : "push">
+}
 
 @merge
 Statement SingleStatement() throws ParseException:
@@ -59,9 +76,10 @@
   (
     // merge area 2
     before:
-    after:    | stmt = CreateChannelStatement()
-              | stmt = CreateBrokerStatement()
+    after:    | stmt = CreateChannelStatement(startToken)
+              | stmt = CreateBrokerStatement(startToken)
               | stmt = CreateProcedureStatement()
+              | stmt = ActiveStatementSpecification(startToken)
   )
   {
     // merge area 3
@@ -89,29 +107,160 @@
 }
 
 @new
-CreateChannelStatement CreateChannelStatement() throws ParseException:
+Statement ActiveStatementSpecification(Token startStmtToken) throws ParseException:
+{
+    Statement stmt = null;
+}
+{
+  "ACTIVE"
+   stmt = ActiveDatasetSpecification(startStmtToken)
+  {
+      return stmt;
+  }
+}
+
+@new
+DatasetDecl ActiveDatasetSpecification(Token startStmtToken) throws ParseException:
+{
+  Pair<DataverseName,Identifier> nameComponents = null;
+  TypeExpression activeRecordTypeExpr = new TypeReferenceExpression(
+                new Pair<DataverseName,Identifier>(DataverseName.createBuiltinDataverseName("Metadata"), new Identifier("ActiveRecordType")));
+  TypeExpression datasetTypeExpr = null;
+  boolean ifNotExists = false;
+  Map<String,String> properties = null;
+  Pair<List<Integer>, List<List<String>>> primaryKeyFields = null;
+  String nodeGroupName = null;
+  Map<String,String> hints = new HashMap<String,String>();
+  DatasetDecl stmt = null;
+  boolean autogenerated = false;
+  RecordConstructor withRecord = null;
+
+}
+{
+    Dataset() nameComponents = QualifiedName()
+    datasetTypeExpr = DatasetTypeSpecification()
+    ifNotExists = IfNotExists()
+    primaryKeyFields = PrimaryKey()
+    (<AUTOGENERATED> { autogenerated = true; } )?
+    (<ON> nodeGroupName = Identifier() )?
+    ( <HINTS> hints = Properties() )?
+    ( <WITH> withRecord = RecordConstructor() )?
+      {
+        // TODO: add filters on meta records
+        InternalDetailsDecl idd = new InternalDetailsDecl(primaryKeyFields.second,
+                                                          primaryKeyFields.first,
+                                                          autogenerated,
+                                                          null);
+        try{
+        stmt = new DatasetDecl(nameComponents.first,
+                                   nameComponents.second,
+                                   datasetTypeExpr,
+                                   activeRecordTypeExpr,
+                                   nodeGroupName != null ? new Identifier(nodeGroupName) : null,
+                                   hints,
+                                   DatasetType.INTERNAL,
+                                   idd,
+                                   withRecord,
+                                   ifNotExists);
+
+        } catch (CompilationException e){
+           throw new SqlppParseException(getSourceLocation(startStmtToken), e.getMessage());
+        }
+      }
+    {
+      return addSourceLocation(stmt, startStmtToken);
+    }
+}
+
+
+@new
+AbstractCreateChannelStatement CreateChannelStatement(Token startStmtToken) throws ParseException:
+{
+  AbstractCreateChannelStatement ccs = null;
+}
+{
+  (
+    <REPETITIVE> { ccs = CreateRepetitiveChannel(startStmtToken); }
+    | <CONTINUOUS> { ccs = CreateContinuousChannel(startStmtToken); }
+  )
+  {
+    return ccs;
+  }
+}
+
+@new
+CreateRepetitiveChannelStatement CreateRepetitiveChannel(Token startStmtToken) throws ParseException:
 {
   Pair<DataverseName,Identifier> nameComponents = null;
   FunctionSignature appliedFunction = null;
-  CreateChannelStatement ccs = null;
+  CreateRepetitiveChannelStatement ccs = null;
   String fqFunctionName = null;
   Expression period = null;
   boolean push = false;
 }
 {
-  (
-    "repetitive"
-    ( "push" { push = true; } )?
-     "channel"  nameComponents = QualifiedName()
-    <USING> appliedFunction = FunctionSignature()
-    "period" period = FunctionCallExpr()
+  ( <PUSH> { push = true; } )?
+  <CHANNEL>  nameComponents = QualifiedName()
+  <USING> appliedFunction = FunctionSignature()
+  <PERIOD> period = FunctionCallExpr()
+  {
+    ccs = new CreateRepetitiveChannelStatement(nameComponents.first,
+                                 nameComponents.second, appliedFunction, period, push);
+  }
+  {
+    return ccs;
+  }
+}
+
+
+@new
+CreateContinuousChannelStatement CreateContinuousChannel(Token startStmtToken) throws ParseException:
+{
+  // Channel related
+  CreateContinuousChannelStatement ccs = null;
+  Expression period = null;
+  boolean push = false;
+  boolean ifNotExists = false;
+  // Function related
+  FunctionSignature signature;
+  String functionBody;
+  Expression functionBodyExpr;
+  Token beginPos;
+  Token endPos;
+  FunctionName fctName = null;
+  TypeExpression returnType = null;
+  List<Pair<VarIdentifier,TypeExpression>> params = null;
+  DataverseName currentDataverse = defaultDataverse;
+}
+{
+     ( <PUSH> { push = true; } )?
+     <CHANNEL>
+     fctName = FunctionName()
+     {
+        defaultDataverse = fctName.dataverse;
+     }
+     ifNotExists = IfNotExists()
+     params = FunctionParameters()
+     <PERIOD> period = FunctionCallExpr()
+    <LEFTBRACE>
+      {
+        createNewScope();
+        beginPos = token;
+      }
+      functionBodyExpr = FunctionBody()
+      returnType = FunctionReturnType()
+    <RIGHTBRACE>
     {
-      ccs = new CreateChannelStatement(nameComponents.first,
-                                   nameComponents.second, appliedFunction, period, push);
-    }
-  )
-    {
-      return ccs;
+      endPos = token;
+      functionBody = extractFragment(beginPos.beginLine, beginPos.beginColumn, endPos.beginLine, endPos.beginColumn);
+      signature = new FunctionSignature(fctName.dataverse, fctName.function, params.size());
+      getCurrentScope().addFunctionDescriptor(signature, false);
+      removeCurrentScope();
+      defaultDataverse = currentDataverse;
+      ensureNoTypeDeclsInFunction(fctName.function, params, returnType, startStmtToken);
+      CreateFunctionStatement stmt = new CreateFunctionStatement(signature, params, functionBody, functionBodyExpr, false);
+      ccs =  new CreateContinuousChannelStatement(fctName.dataverse, new Identifier(fctName.function), period, push, stmt);
+      return addSourceLocation(ccs, startStmtToken);
     }
 }
 
@@ -131,7 +280,7 @@
   createNewScope();
 }
 {
-     "procedure" fctName = FunctionName()
+     <PROCEDURE> fctName = FunctionName()
      {
         defaultDataverse = fctName.dataverse;
      }
@@ -154,7 +303,7 @@
       removeCurrentScope();
       defaultDataverse = currentDataverse;
     }
-  ("period" period = FunctionCallExpr())?
+  (<PERIOD> period = FunctionCallExpr())?
   {
   List<VarIdentifier> paramListVariablesOnly = new ArrayList<VarIdentifier>();
   for(Pair<VarIdentifier,TypeExpression> p: paramList){
@@ -195,18 +344,24 @@
 }
 
 @new
-CreateBrokerStatement CreateBrokerStatement() throws ParseException:
+CreateBrokerStatement CreateBrokerStatement(Token startStmtToken) throws ParseException:
 {
   CreateBrokerStatement cbs = null;
   Pair<DataverseName,Identifier> name = null;
   String endPoint = null;
+  RecordConstructor withRecord = null;
 }
 {
   (
-    "broker"  name = QualifiedName()
+    <BROKER>  name = QualifiedName()
     <AT>  endPoint = StringLiteral()
+    ( <WITH> withRecord = RecordConstructor() )?
     {
-      cbs = new CreateBrokerStatement(name.first, name.second,endPoint);
+      try {
+        cbs = new CreateBrokerStatement(name.first, name.second, endPoint, withRecord);
+      } catch (CompilationException e) {
+        throw new SqlppParseException(getSourceLocation(startStmtToken), e.getMessage());
+      }
     }
   )
     {
@@ -227,7 +382,7 @@
 }
 {
   (
-  "subscribe" <TO> nameComponents = QualifiedName()
+  <SUBSCRIBE> <TO> nameComponents = QualifiedName()
    <LEFTPAREN> (tmp = Expression()
    {
       argList.add(tmp);
@@ -240,7 +395,7 @@
    {
       stmt = new ChannelSubscribeStatement(nameComponents.first, nameComponents.second, argList, getVarCounter(), brokerName.first, brokerName.second, subscriptionId);
    }
-   | "unsubscribe" id = StringLiteral() <FROM> nameComponents = QualifiedName()
+   | <UNSUBSCRIBE> id = StringLiteral() <FROM> nameComponents = QualifiedName()
       {
         VariableExpr varExp = new VariableExpr(new VarIdentifier("$subscriptionPlaceholder"));
         getCurrentScope().addNewVarSymbolToScope(varExp.getVar());
@@ -274,7 +429,7 @@
   boolean ifExists = false;
 }
 {
-  "channel" pairId = QualifiedName() ifExists = IfExists()
+  <CHANNEL> pairId = QualifiedName() ifExists = IfExists()
   {
     stmt = new ChannelDropStatement(pairId.first, pairId.second, ifExists);
     return addSourceLocation(stmt, startStmtToken);
@@ -289,7 +444,7 @@
   boolean ifExists = false;
 }
 {
-  "broker" pairId = QualifiedName() ifExists = IfExists()
+  <BROKER> pairId = QualifiedName() ifExists = IfExists()
   {
     stmt = new BrokerDropStatement(pairId.first, pairId.second, ifExists);
     return addSourceLocation(stmt, startStmtToken);
@@ -304,7 +459,7 @@
   boolean ifExists = false;
 }
 {
-  "procedure" funcSig = FunctionSignature() ifExists = IfExists()
+  <PROCEDURE> funcSig = FunctionSignature() ifExists = IfExists()
   {
     stmt = new ProcedureDropStatement(funcSig, ifExists);
     return addSourceLocation(stmt, startStmtToken);