diff --git a/asterix-bad/src/main/resources/META-INF/services/org.apache.asterix.om.functions.IFunctionRegistrant b/asterix-bad/src/main/resources/META-INF/services/org.apache.asterix.om.functions.IFunctionRegistrant
new file mode 100644
index 0000000..fdb3724
--- /dev/null
+++ b/asterix-bad/src/main/resources/META-INF/services/org.apache.asterix.om.functions.IFunctionRegistrant
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+org.apache.asterix.bad.function.BADFunctionRegistrant
\ No newline at end of file
diff --git a/asterix-bad/src/main/resources/asterix-build-configuration.xml b/asterix-bad/src/main/resources/asterix-build-configuration.xml
index 6007416..0f3cec9 100644
--- a/asterix-bad/src/main/resources/asterix-build-configuration.xml
+++ b/asterix-bad/src/main/resources/asterix-build-configuration.xml
@@ -36,13 +36,13 @@
   </transactionLogDir>
   <extensions>
     <extension>
-      <extensionClassName>org.apache.asterix.bad.lang.BADQueryTranslatorExtension</extensionClassName>
+      <extensionClassName>org.apache.asterix.bad.extension.BADQueryTranslatorExtension</extensionClassName>
     </extension>
     <extension>
-      <extensionClassName>org.apache.asterix.bad.lang.BADLangExtension</extensionClassName>
+      <extensionClassName>org.apache.asterix.bad.extension.BADLangExtension</extensionClassName>
     </extension>
     <extension>
-      <extensionClassName>org.apache.asterix.bad.metadata.BADMetadataExtension</extensionClassName>
+      <extensionClassName>org.apache.asterix.bad.extension.BADMetadataExtension</extensionClassName>
     </extension>
   </extensions>
   <property>
diff --git a/asterix-bad/src/main/resources/cc.conf b/asterix-bad/src/main/resources/cc.conf
index 371cbe8..9e9d26b 100644
--- a/asterix-bad/src/main/resources/cc.conf
+++ b/asterix-bad/src/main/resources/cc.conf
@@ -53,11 +53,11 @@
 messaging.frame.size=4096
 messaging.frame.count=512
 
-[extension/org.apache.asterix.bad.lang.BADQueryTranslatorExtension]
+[extension/org.apache.asterix.bad.extension.BADQueryTranslatorExtension]
 enabled = true
-[extension/org.apache.asterix.bad.lang.BADLangExtension]
+[extension/org.apache.asterix.bad.extension.BADLangExtension]
 enabled = true
-[extension/org.apache.asterix.bad.metadata.BADMetadataExtension]
+[extension/org.apache.asterix.bad.extension.BADMetadataExtension]
 enabled = true
-[extension/org.apache.asterix.bad.recovery.BADRecoveryExtension]
+[extension/org.apache.asterix.bad.extension.BADRecoveryExtension]
 enabled = true
\ No newline at end of file
diff --git a/asterix-bad/src/main/resources/lang-extension/lang.txt b/asterix-bad/src/main/resources/lang-extension/lang.txt
index 59e1a8d..f2b6410 100644
--- a/asterix-bad/src/main/resources/lang-extension/lang.txt
+++ b/asterix-bad/src/main/resources/lang-extension/lang.txt
@@ -22,7 +22,9 @@
 import org.apache.asterix.bad.lang.statement.ChannelSubscribeStatement;
 import org.apache.asterix.bad.lang.statement.ChannelUnsubscribeStatement;
 import org.apache.asterix.bad.lang.statement.CreateBrokerStatement;
-import org.apache.asterix.bad.lang.statement.CreateChannelStatement;
+import org.apache.asterix.bad.lang.statement.AbstractCreateChannelStatement;
+import org.apache.asterix.bad.lang.statement.CreateRepetitiveChannelStatement;
+import org.apache.asterix.bad.lang.statement.CreateContinuousChannelStatement;
 import org.apache.asterix.bad.lang.statement.CreateProcedureStatement;
 import org.apache.asterix.bad.lang.statement.ExecuteProcedureStatement;
 import org.apache.asterix.bad.lang.statement.ProcedureDropStatement;
@@ -30,6 +32,21 @@
 import org.apache.asterix.lang.sqlpp.parser.SqlppParseException;
 import org.apache.asterix.lang.sqlpp.parser.Token;
 
+@new
+<DEFAULT,IN_DBL_BRACE>
+TOKEN [IGNORE_CASE]:
+{
+  <BROKER : "broker">
+  |  <CHANNEL : "channel">
+  |  <PROCEDURE : "procedure">
+  |  <SUBSCRIBE : "subscribe">
+  |  <ACTIVE : "active">
+  | <UNSUBSCRIBE : "unsubscribe">
+  | <REPETITIVE : "repetitive">
+  | <CONTINUOUS : "continuous">
+  | <PERIOD : "period">
+  | <PUSH : "push">
+}
 
 @merge
 Statement SingleStatement() throws ParseException:
@@ -59,9 +76,10 @@
   (
     // merge area 2
     before:
-    after:    | stmt = CreateChannelStatement()
-              | stmt = CreateBrokerStatement()
+    after:    | stmt = CreateChannelStatement(startToken)
+              | stmt = CreateBrokerStatement(startToken)
               | stmt = CreateProcedureStatement()
+              | stmt = ActiveStatementSpecification(startToken)
   )
   {
     // merge area 3
@@ -89,29 +107,160 @@
 }
 
 @new
-CreateChannelStatement CreateChannelStatement() throws ParseException:
+Statement ActiveStatementSpecification(Token startStmtToken) throws ParseException:
+{
+    Statement stmt = null;
+}
+{
+  "ACTIVE"
+   stmt = ActiveDatasetSpecification(startStmtToken)
+  {
+      return stmt;
+  }
+}
+
+@new
+DatasetDecl ActiveDatasetSpecification(Token startStmtToken) throws ParseException:
+{
+  Pair<DataverseName,Identifier> nameComponents = null;
+  TypeExpression activeRecordTypeExpr = new TypeReferenceExpression(
+                new Pair<DataverseName,Identifier>(DataverseName.createBuiltinDataverseName("Metadata"), new Identifier("ActiveRecordType")));
+  TypeExpression datasetTypeExpr = null;
+  boolean ifNotExists = false;
+  Map<String,String> properties = null;
+  Pair<List<Integer>, List<List<String>>> primaryKeyFields = null;
+  String nodeGroupName = null;
+  Map<String,String> hints = new HashMap<String,String>();
+  DatasetDecl stmt = null;
+  boolean autogenerated = false;
+  RecordConstructor withRecord = null;
+
+}
+{
+    Dataset() nameComponents = QualifiedName()
+    datasetTypeExpr = DatasetTypeSpecification()
+    ifNotExists = IfNotExists()
+    primaryKeyFields = PrimaryKey()
+    (<AUTOGENERATED> { autogenerated = true; } )?
+    (<ON> nodeGroupName = Identifier() )?
+    ( <HINTS> hints = Properties() )?
+    ( <WITH> withRecord = RecordConstructor() )?
+      {
+        // TODO: add filters on meta records
+        InternalDetailsDecl idd = new InternalDetailsDecl(primaryKeyFields.second,
+                                                          primaryKeyFields.first,
+                                                          autogenerated,
+                                                          null);
+        try{
+        stmt = new DatasetDecl(nameComponents.first,
+                                   nameComponents.second,
+                                   datasetTypeExpr,
+                                   activeRecordTypeExpr,
+                                   nodeGroupName != null ? new Identifier(nodeGroupName) : null,
+                                   hints,
+                                   DatasetType.INTERNAL,
+                                   idd,
+                                   withRecord,
+                                   ifNotExists);
+
+        } catch (CompilationException e){
+           throw new SqlppParseException(getSourceLocation(startStmtToken), e.getMessage());
+        }
+      }
+    {
+      return addSourceLocation(stmt, startStmtToken);
+    }
+}
+
+
+@new
+AbstractCreateChannelStatement CreateChannelStatement(Token startStmtToken) throws ParseException:
+{
+  AbstractCreateChannelStatement ccs = null;
+}
+{
+  (
+    <REPETITIVE> { ccs = CreateRepetitiveChannel(startStmtToken); }
+    | <CONTINUOUS> { ccs = CreateContinuousChannel(startStmtToken); }
+  )
+  {
+    return ccs;
+  }
+}
+
+@new
+CreateRepetitiveChannelStatement CreateRepetitiveChannel(Token startStmtToken) throws ParseException:
 {
   Pair<DataverseName,Identifier> nameComponents = null;
   FunctionSignature appliedFunction = null;
-  CreateChannelStatement ccs = null;
+  CreateRepetitiveChannelStatement ccs = null;
   String fqFunctionName = null;
   Expression period = null;
   boolean push = false;
 }
 {
-  (
-    "repetitive"
-    ( "push" { push = true; } )?
-     "channel"  nameComponents = QualifiedName()
-    <USING> appliedFunction = FunctionSignature()
-    "period" period = FunctionCallExpr()
+  ( <PUSH> { push = true; } )?
+  <CHANNEL>  nameComponents = QualifiedName()
+  <USING> appliedFunction = FunctionSignature()
+  <PERIOD> period = FunctionCallExpr()
+  {
+    ccs = new CreateRepetitiveChannelStatement(nameComponents.first,
+                                 nameComponents.second, appliedFunction, period, push);
+  }
+  {
+    return ccs;
+  }
+}
+
+
+@new
+CreateContinuousChannelStatement CreateContinuousChannel(Token startStmtToken) throws ParseException:
+{
+  // Channel related
+  CreateContinuousChannelStatement ccs = null;
+  Expression period = null;
+  boolean push = false;
+  boolean ifNotExists = false;
+  // Function related
+  FunctionSignature signature;
+  String functionBody;
+  Expression functionBodyExpr;
+  Token beginPos;
+  Token endPos;
+  FunctionName fctName = null;
+  TypeExpression returnType = null;
+  List<Pair<VarIdentifier,TypeExpression>> params = null;
+  DataverseName currentDataverse = defaultDataverse;
+}
+{
+     ( <PUSH> { push = true; } )?
+     <CHANNEL>
+     fctName = FunctionName()
+     {
+        defaultDataverse = fctName.dataverse;
+     }
+     ifNotExists = IfNotExists()
+     params = FunctionParameters()
+     <PERIOD> period = FunctionCallExpr()
+    <LEFTBRACE>
+      {
+        createNewScope();
+        beginPos = token;
+      }
+      functionBodyExpr = FunctionBody()
+      returnType = FunctionReturnType()
+    <RIGHTBRACE>
     {
-      ccs = new CreateChannelStatement(nameComponents.first,
-                                   nameComponents.second, appliedFunction, period, push);
-    }
-  )
-    {
-      return ccs;
+      endPos = token;
+      functionBody = extractFragment(beginPos.beginLine, beginPos.beginColumn, endPos.beginLine, endPos.beginColumn);
+      signature = new FunctionSignature(fctName.dataverse, fctName.function, params.size());
+      getCurrentScope().addFunctionDescriptor(signature, false);
+      removeCurrentScope();
+      defaultDataverse = currentDataverse;
+      ensureNoTypeDeclsInFunction(fctName.function, params, returnType, startStmtToken);
+      CreateFunctionStatement stmt = new CreateFunctionStatement(signature, params, functionBody, functionBodyExpr, false);
+      ccs =  new CreateContinuousChannelStatement(fctName.dataverse, new Identifier(fctName.function), period, push, stmt);
+      return addSourceLocation(ccs, startStmtToken);
     }
 }
 
@@ -131,7 +280,7 @@
   createNewScope();
 }
 {
-     "procedure" fctName = FunctionName()
+     <PROCEDURE> fctName = FunctionName()
      {
         defaultDataverse = fctName.dataverse;
      }
@@ -154,7 +303,7 @@
       removeCurrentScope();
       defaultDataverse = currentDataverse;
     }
-  ("period" period = FunctionCallExpr())?
+  (<PERIOD> period = FunctionCallExpr())?
   {
   List<VarIdentifier> paramListVariablesOnly = new ArrayList<VarIdentifier>();
   for(Pair<VarIdentifier,TypeExpression> p: paramList){
@@ -195,18 +344,24 @@
 }
 
 @new
-CreateBrokerStatement CreateBrokerStatement() throws ParseException:
+CreateBrokerStatement CreateBrokerStatement(Token startStmtToken) throws ParseException:
 {
   CreateBrokerStatement cbs = null;
   Pair<DataverseName,Identifier> name = null;
   String endPoint = null;
+  RecordConstructor withRecord = null;
 }
 {
   (
-    "broker"  name = QualifiedName()
+    <BROKER>  name = QualifiedName()
     <AT>  endPoint = StringLiteral()
+    ( <WITH> withRecord = RecordConstructor() )?
     {
-      cbs = new CreateBrokerStatement(name.first, name.second,endPoint);
+      try {
+        cbs = new CreateBrokerStatement(name.first, name.second, endPoint, withRecord);
+      } catch (CompilationException e) {
+        throw new SqlppParseException(getSourceLocation(startStmtToken), e.getMessage());
+      }
     }
   )
     {
@@ -227,7 +382,7 @@
 }
 {
   (
-  "subscribe" <TO> nameComponents = QualifiedName()
+  <SUBSCRIBE> <TO> nameComponents = QualifiedName()
    <LEFTPAREN> (tmp = Expression()
    {
       argList.add(tmp);
@@ -240,7 +395,7 @@
    {
       stmt = new ChannelSubscribeStatement(nameComponents.first, nameComponents.second, argList, getVarCounter(), brokerName.first, brokerName.second, subscriptionId);
    }
-   | "unsubscribe" id = StringLiteral() <FROM> nameComponents = QualifiedName()
+   | <UNSUBSCRIBE> id = StringLiteral() <FROM> nameComponents = QualifiedName()
       {
         VariableExpr varExp = new VariableExpr(new VarIdentifier("$subscriptionPlaceholder"));
         getCurrentScope().addNewVarSymbolToScope(varExp.getVar());
@@ -274,7 +429,7 @@
   boolean ifExists = false;
 }
 {
-  "channel" pairId = QualifiedName() ifExists = IfExists()
+  <CHANNEL> pairId = QualifiedName() ifExists = IfExists()
   {
     stmt = new ChannelDropStatement(pairId.first, pairId.second, ifExists);
     return addSourceLocation(stmt, startStmtToken);
@@ -289,7 +444,7 @@
   boolean ifExists = false;
 }
 {
-  "broker" pairId = QualifiedName() ifExists = IfExists()
+  <BROKER> pairId = QualifiedName() ifExists = IfExists()
   {
     stmt = new BrokerDropStatement(pairId.first, pairId.second, ifExists);
     return addSourceLocation(stmt, startStmtToken);
@@ -304,7 +459,7 @@
   boolean ifExists = false;
 }
 {
-  "procedure" funcSig = FunctionSignature() ifExists = IfExists()
+  <PROCEDURE> funcSig = FunctionSignature() ifExists = IfExists()
   {
     stmt = new ProcedureDropStatement(funcSig, ifExists);
     return addSourceLocation(stmt, startStmtToken);
diff --git a/asterix-bad/src/main/resources/log4j2-bad.xml b/asterix-bad/src/main/resources/log4j2-bad.xml
new file mode 100644
index 0000000..fcbb6bb
--- /dev/null
+++ b/asterix-bad/src/main/resources/log4j2-bad.xml
@@ -0,0 +1,45 @@
+<!--
+ ! Licensed to the Apache Software Foundation (ASF) under one
+ ! or more contributor license agreements.  See the NOTICE file
+ ! distributed with this work for additional information
+ ! regarding copyright ownership.  The ASF licenses this file
+ ! to you under the Apache License, Version 2.0 (the
+ ! "License"); you may not use this file except in compliance
+ ! with the License.  You may obtain a copy of the License at
+ !
+ !   http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing,
+ ! software distributed under the License is distributed on an
+ ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ! KIND, either express or implied.  See the License for the
+ ! specific language governing permissions and limitations
+ ! under the License.
+ !-->
+<Configuration status="WARN">
+  <Appenders>
+    <Console name="Console" target="SYSTEM_OUT">
+      <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
+    </Console>
+    <File name="InfoLog" fileName="target/info.log">
+        <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
+    </File>
+  </Appenders>
+  <Loggers>
+    <Root level="INFO">
+      <AppenderRef ref="InfoLog"/>
+    </Root>
+    <Logger name="org.apache.hyracks.control.nc.service" level="INFO"/>
+    <Logger name="org.apache.hyracks" level="INFO"/>
+    <Logger name="org.apache.asterix" level="INFO"/>
+    <Logger name="org.apache.hyracks.algebricks" level="TRACE">
+      <AppenderRef ref="Console"/>
+    </Logger>
+    <Logger name="org.apache.hyracks.algebricks" level="TRACE">
+      <AppenderRef ref="InfoLog"/>
+    </Logger>
+    <Logger name="org.apache.asterix.bad.metadata" level="WARN">
+      <AppenderRef ref="Console"/>
+    </Logger>
+  </Loggers>
+</Configuration>
