Initial Commit
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..6e47454
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,44 @@
+invIndex*
+opttest
+primaryBTree*
+target
+.classpath
+.settings
+.project
+ClusterControllerService
+rttest
+mdtest
+ittest
+asterix_logs
+build
+*-coredump
+*.pyc
+*.iml
+.idea
+*.ipr
+*.iws
+git.properties
+actual
+exception
+expected
+teststore1
+teststore2
+dev1
+dev2
+dev3
+dev4
+derby.log
+hadoop-conf-tmp
+metastore_db
+teststore
+output
+tmp
+dist
+*~
+.DS_Store
+*.swp
+.m2*

+/target/
+/target/
+/target/
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..25cd9c3
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,211 @@
+<!--
+ ! Licensed to the Apache Software Foundation (ASF) under one
+ ! or more contributor license agreements. See the NOTICE file
+ ! distributed with this work for additional information
+ ! regarding copyright ownership. The ASF licenses this file
+ ! to you under the Apache License, Version 2.0 (the
+ ! "License"); you may not use this file except in compliance
+ ! with the License. You may obtain a copy of the License at
+ !
+ ! http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing,
+ ! software distributed under the License is distributed on an
+ ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ! KIND, either express or implied.    See the License for the
+ ! specific language governing permissions and limitations
+ ! under the License.
+ !-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.asterix</groupId>
+    <artifactId>apache-asterixdb</artifactId>
+    <version>0.8.9-SNAPSHOT</version>
+  </parent>
+  <artifactId>asterix-opt</artifactId>
+    <properties>
+    <asterix.version>0.8.9-SNAPSHOT</asterix.version>
+  </properties>
+    <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.asterix</groupId>
+        <artifactId>asterix-grammar-extension-maven-plugin</artifactId>
+        <version>${asterix.version}</version>
+        <configuration>
+          <base>${project.basedir}</base>
+          <gbase>../asterix-lang-aql/src/main/javacc/AQL.jj</gbase>
+          <gextension>src/main/resources/lang-extension/lang.txt</gextension>
+          <output>target/generated-resources/javacc/grammar.jj</output>
+          <parserClassName>BADAQLParser</parserClassName>
+          <packageName>org.apache.asterix.bad.lang</packageName>
+        </configuration>
+        <executions>
+          <execution>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>grammarix</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>javacc-maven-plugin</artifactId>
+        <version>2.6</version>
+        <executions>
+          <execution>
+            <id>javacc</id>
+            <goals>
+              <goal>javacc</goal>
+            </goals>
+            <configuration>
+              <isStatic>false</isStatic>
+              <javaUnicodeEscape>true</javaUnicodeEscape>
+              <sourceDirectory>target/generated-resources/javacc</sourceDirectory>
+            </configuration>
+          </execution>
+          <execution>
+            <id>javacc-jjdoc</id>
+            <goals>
+              <goal>jjdoc</goal>
+            </goals>
+            <phase>process-sources</phase>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>build-helper-maven-plugin</artifactId>
+        <version>1.9</version>
+        <executions>
+          <execution>
+            <id>add-source</id>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>add-source</goal>
+            </goals>
+            <configuration>
+              <sources>
+                <source>${project.build.directory}/generated-sources/javacc/</source>
+              </sources>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+    <pluginManagement>
+      <plugins>
+            <!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
+        <plugin>
+          <groupId>org.eclipse.m2e</groupId>
+          <artifactId>lifecycle-mapping</artifactId>
+          <version>1.0.0</version>
+          <configuration>
+            <lifecycleMappingMetadata>
+              <pluginExecutions>
+                <pluginExecution>
+                  <pluginExecutionFilter>
+                    <groupId>org.apache.asterix</groupId>
+                    <artifactId>asterix-grammar-extension-maven-plugin</artifactId>
+                    <versionRange>[${asterix.version},)</versionRange>
+                    <goals>
+                      <goal>grammarix</goal>
+                    </goals>
+                  </pluginExecutionFilter>
+                  <action>
+                    <ignore></ignore>
+                  </action>
+                </pluginExecution>
+                <pluginExecution>
+                  <pluginExecutionFilter>
+                    <groupId>org.codehaus.mojo</groupId>
+                    <artifactId>javacc-maven-plugin</artifactId>
+                    <versionRange>[2.6,)</versionRange>
+                    <goals>
+                      <goal>javacc</goal>
+                    </goals>
+                  </pluginExecutionFilter>
+                  <action>
+                    <ignore></ignore>
+                  </action>
+                </pluginExecution>
+              </pluginExecutions>
+            </lifecycleMappingMetadata>
+          </configuration>
+        </plugin>
+      </plugins>
+    </pluginManagement>
+  </build>
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.asterix</groupId>
+      <artifactId>asterix-om</artifactId>
+      <version>${project.version}</version>
+      <type>jar</type>
+      <scope>compile</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hyracks</groupId>
+      <artifactId>hyracks-test-support</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.asterix</groupId>
+      <artifactId>asterix-runtime</artifactId>
+      <version>${project.version}</version>
+      <type>jar</type>
+      <scope>compile</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hyracks</groupId>
+      <artifactId>algebricks-compiler</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hyracks</groupId>
+      <artifactId>hyracks-hdfs-core</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.asterix</groupId>
+      <artifactId>asterix-common</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.asterix</groupId>
+      <artifactId>asterix-test-framework</artifactId>
+      <version>${asterix.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.asterix</groupId>
+      <artifactId>asterix-active</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.asterix</groupId>
+      <artifactId>asterix-algebra</artifactId>
+      <version>${asterix.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.asterix</groupId>
+      <artifactId>asterix-app</artifactId>
+      <version>${asterix.version}</version>
+      <type>jar</type>
+      <scope>compile</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.asterix</groupId>
+      <artifactId>asterix-app</artifactId>
+      <version>${asterix.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.asterix</groupId>
+      <artifactId>asterix-common</artifactId>
+      <version>${asterix.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+</project>
diff --git a/src/main/java/org/apache/asterix/bad/BADConstants.java b/src/main/java/org/apache/asterix/bad/BADConstants.java
new file mode 100644
index 0000000..d03df33
--- /dev/null
+++ b/src/main/java/org/apache/asterix/bad/BADConstants.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad;
+
+public interface BADConstants {
+    final String SubscriptionId = "subscriptionId";
+    final String BrokerName = "BrokerName";
+    final String ChannelName = "ChannelName";
+    final String DataverseName = "DataverseName";
+    final String BrokerEndPoint = "BrokerEndPoint";
+    final String DeliveryTime = "deliveryTime";
+    final String ResultId = "resultId";
+    final String ChannelExecutionTime = "channelExecutionTime";
+    final String ChannelSubscriptionsType = "ChannelSubscriptionsType";
+    final String ChannelResultsType = "ChannelResultsType";
+    final String ResultsDatasetName = "ResultsDatasetName";
+    final String SubscriptionsDatasetName = "SubscriptionsDatasetName";
+    final String CHANNEL_EXTENSION_NAME = "Channel";
+    final String BROKER_KEYWORD = "Broker";
+    final String RECORD_TYPENAME_BROKER = "BrokerRecordType";
+    final String RECORD_TYPENAME_CHANNEL = "ChannelRecordType";
+    final String subscriptionEnding = "Subscriptions";
+    final String resultsEnding = "Results";
+    final String BAD_METADATA_EXTENSION_NAME = "BADMetadataExtension";
+    final String BAD_DATAVERSE_NAME = "Metadata";
+    final String Duration = "Duration";
+    final String Function = "Function";
+
+    public enum ChannelJobType {
+        REPETITIVE
+    }
+}
diff --git a/src/main/java/org/apache/asterix/bad/ChannelJobInfo.java b/src/main/java/org/apache/asterix/bad/ChannelJobInfo.java
new file mode 100644
index 0000000..da0c43b
--- /dev/null
+++ b/src/main/java/org/apache/asterix/bad/ChannelJobInfo.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad;
+
+import java.util.List;
+
+import org.apache.asterix.active.ActiveJob;
+import org.apache.asterix.active.ActivityState;
+import org.apache.asterix.active.EntityId;
+import org.apache.asterix.bad.BADConstants.ChannelJobType;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobSpecification;
+
+public class ChannelJobInfo extends ActiveJob {
+
+    private static final long serialVersionUID = 1L;
+    private List<String> locations;
+
+    public ChannelJobInfo(EntityId entityId, JobId jobId, ActivityState state, JobSpecification spec) {
+        super(entityId, jobId, state, ChannelJobType.REPETITIVE, spec);
+    }
+
+    public List<String> getLocations() {
+        return locations;
+
+    }
+
+    public void setLocations(List<String> locations) {
+        this.locations = locations;
+    }
+
+}
diff --git a/src/main/java/org/apache/asterix/bad/ChannelJobService.java b/src/main/java/org/apache/asterix/bad/ChannelJobService.java
new file mode 100644
index 0000000..8310f70
--- /dev/null
+++ b/src/main/java/org/apache/asterix/bad/ChannelJobService.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad;
+
+import java.io.BufferedReader;
+import java.io.DataOutputStream;
+import java.io.InputStreamReader;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.active.EntityId;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.om.base.AOrderedList;
+import org.apache.asterix.om.base.AUUID;
+import org.apache.hyracks.api.client.HyracksConnection;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.json.JSONException;
+
+/**
+ * Provides functionality for running channel jobs and communicating with Brokers
+ */
+public class ChannelJobService {
+
+    private static final Logger LOGGER = Logger.getLogger(ChannelJobService.class.getName());
+    IHyracksClientConnection hcc;
+
+    public ChannelJobService() throws AsterixException {
+
+    }
+
+    public void runChannelJob(JobSpecification channeljobSpec, String strIP, int port) throws Exception {
+        hcc = new HyracksConnection(strIP, port);
+        JobId jobId = hcc.startJob(channeljobSpec);
+        hcc.waitForCompletion(jobId);
+    }
+
+    public void sendBrokerNotificationsForChannel(EntityId activeJobId, String brokerEndpoint,
+            AOrderedList subscriptionIds, String channelExecutionTime) throws HyracksDataException {
+        String formattedString;
+        try {
+            formattedString = formatJSON(activeJobId, subscriptionIds, channelExecutionTime);
+        } catch (JSONException e) {
+            throw new HyracksDataException(e);
+        }
+        sendMessage(brokerEndpoint, formattedString);
+    }
+
+    public String formatJSON(EntityId activeJobId, AOrderedList subscriptionIds, String channelExecutionTime)
+            throws JSONException {
+        String JSON = "{ \"dataverseName\":\"" + activeJobId.getDataverse() + "\", \"channelName\":\""
+                + activeJobId.getEntityName() + "\", \"" + BADConstants.ChannelExecutionTime + "\":\""
+                + channelExecutionTime + "\", \"subscriptionIds\":[";
+        for (int i = 0; i < subscriptionIds.size(); i++) {
+            AUUID subId = (AUUID) subscriptionIds.getItem(i);
+            String subString = subId.toSimpleString();
+            JSON += "\"" + subString + "\"";
+            if (i < subscriptionIds.size() - 1) {
+                JSON += ",";
+            }
+        }
+        JSON += "]}";
+        return JSON;
+
+    }
+
+    public static void sendMessage(String targetURL, String urlParameters) {
+        HttpURLConnection connection = null;
+        try {
+            //Create connection
+            URL url = new URL(targetURL);
+            connection = (HttpURLConnection) url.openConnection();
+            connection.setRequestMethod("POST");
+            connection.setRequestProperty("Content-Type", "application/x-www-form-urlencoded");
+
+            connection.setRequestProperty("Content-Length", Integer.toString(urlParameters.getBytes().length));
+            connection.setRequestProperty("Content-Language", "en-US");
+
+            connection.setUseCaches(false);
+            connection.setDoOutput(true);
+
+            //Send message
+            try {
+                DataOutputStream wr = new DataOutputStream(connection.getOutputStream());
+                wr.writeBytes(urlParameters);
+                wr.close();
+            } catch (Exception e) {
+                throw new AsterixException("Broker connection failed to write", e);
+            }
+
+            if (LOGGER.isLoggable(Level.INFO)) {
+                int responseCode = connection.getResponseCode();
+                LOGGER.info("\nSending 'POST' request to URL : " + url);
+                LOGGER.info("Post parameters : " + urlParameters);
+                LOGGER.info("Response Code : " + responseCode);
+            }
+
+            BufferedReader in = new BufferedReader(new InputStreamReader(connection.getInputStream()));
+            String inputLine;
+            StringBuffer response = new StringBuffer();
+
+            while ((inputLine = in.readLine()) != null) {
+                response.append(inputLine);
+            }
+            in.close();
+
+            if (LOGGER.isLoggable(Level.INFO)) {
+                System.out.println(response.toString());
+            }
+
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            if (connection != null) {
+                connection.disconnect();
+            }
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "ChannelJobService";
+    }
+
+}
diff --git a/src/main/java/org/apache/asterix/bad/lang/BADCompilationProvider.java b/src/main/java/org/apache/asterix/bad/lang/BADCompilationProvider.java
new file mode 100644
index 0000000..42036af
--- /dev/null
+++ b/src/main/java/org/apache/asterix/bad/lang/BADCompilationProvider.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.lang;
+
+import org.apache.asterix.algebra.base.ILangExpressionToPlanTranslatorFactory;
+import org.apache.asterix.compiler.provider.ILangCompilationProvider;
+import org.apache.asterix.lang.aql.rewrites.AQLRewriterFactory;
+import org.apache.asterix.lang.aql.visitor.AQLAstPrintVisitorFactory;
+import org.apache.asterix.lang.common.base.IAstPrintVisitorFactory;
+import org.apache.asterix.lang.common.base.IParserFactory;
+import org.apache.asterix.lang.common.base.IRewriterFactory;
+import org.apache.asterix.translator.AqlExpressionToPlanTranslatorFactory;
+
+public class BADCompilationProvider implements ILangCompilationProvider {
+
+    @Override
+    public IParserFactory getParserFactory() {
+        return new BADParserFactory();
+    }
+
+    @Override
+    public IRewriterFactory getRewriterFactory() {
+        return new AQLRewriterFactory();
+    }
+
+    @Override
+    public IAstPrintVisitorFactory getAstPrintVisitorFactory() {
+        return new AQLAstPrintVisitorFactory();
+    }
+
+    @Override
+    public ILangExpressionToPlanTranslatorFactory getExpressionToPlanTranslatorFactory() {
+        return new AqlExpressionToPlanTranslatorFactory();
+    }
+
+}
diff --git a/src/main/java/org/apache/asterix/bad/lang/BADLangExtension.java b/src/main/java/org/apache/asterix/bad/lang/BADLangExtension.java
new file mode 100644
index 0000000..9832fe6
--- /dev/null
+++ b/src/main/java/org/apache/asterix/bad/lang/BADLangExtension.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.lang;
+
+import java.util.List;
+
+import org.apache.asterix.algebra.base.ILangExtension;
+import org.apache.asterix.bad.metadata.Broker;
+import org.apache.asterix.bad.metadata.BrokerSearchKey;
+import org.apache.asterix.bad.metadata.Channel;
+import org.apache.asterix.bad.metadata.ChannelSearchKey;
+import org.apache.asterix.common.api.ExtensionId;
+import org.apache.asterix.compiler.provider.ILangCompilationProvider;
+import org.apache.asterix.compiler.provider.SqlppCompilationProvider;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.MetadataTransactionContext;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+
+public class BADLangExtension implements ILangExtension {
+
+    public static final ExtensionId EXTENSION_ID = new ExtensionId(BADLangExtension.class.getSimpleName(), 0);
+
+    @Override
+    public ExtensionId getId() {
+        return EXTENSION_ID;
+    }
+
+    @Override
+    public void configure(List<Pair<String, String>> args) {
+    }
+
+    @Override
+    public ILangCompilationProvider getLangCompilationProvider(Language lang) {
+        switch (lang) {
+            case AQL:
+                return new BADCompilationProvider();
+            case SQLPP:
+                return new SqlppCompilationProvider();
+            default:
+                return null;
+        }
+    }
+
+    @Override
+    public ExtensionKind getExtensionKind() {
+        return ExtensionKind.LANG;
+    }
+
+    @Override
+    public boolean unnestToDataScan(Mutable<ILogicalOperator> opRef, IOptimizationContext context,
+            UnnestOperator unnestOp, ILogicalExpression unnestExpr, AbstractFunctionCallExpression functionCallExpr)
+                    throws AlgebricksException {
+        // TODO I dont need this?????
+        return false;
+    }
+
+    public static Broker getBroker(MetadataTransactionContext mdTxnCtx, String dataverseName, String brokerName)
+            throws AlgebricksException {
+        BrokerSearchKey brokerSearchKey = new BrokerSearchKey(dataverseName, brokerName);
+        List<Broker> brokers = MetadataManager.INSTANCE.getEntities(mdTxnCtx, brokerSearchKey);
+        if (brokers.isEmpty()) {
+            return null;
+        } else if (brokers.size() > 1) {
+            throw new AlgebricksException("Broker search key returned more than one broker");
+        } else {
+            return brokers.get(0);
+        }
+    }
+
+    public static Channel getChannel(MetadataTransactionContext mdTxnCtx, String dataverseName, String channelName)
+            throws AlgebricksException {
+        ChannelSearchKey channelSearchKey = new ChannelSearchKey(dataverseName, channelName);
+        List<Channel> channels = MetadataManager.INSTANCE.getEntities(mdTxnCtx, channelSearchKey);
+        if (channels.isEmpty()) {
+            return null;
+        } else if (channels.size() > 1) {
+            throw new AlgebricksException("Channel search key returned more than one channel");
+        } else {
+            return channels.get(0);
+        }
+    }
+
+}
diff --git a/src/main/java/org/apache/asterix/bad/lang/BADParserFactory.java b/src/main/java/org/apache/asterix/bad/lang/BADParserFactory.java
new file mode 100644
index 0000000..58bca17
--- /dev/null
+++ b/src/main/java/org/apache/asterix/bad/lang/BADParserFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.lang;
+
+import java.io.Reader;
+
+import org.apache.asterix.lang.common.base.IParser;
+import org.apache.asterix.lang.common.base.IParserFactory;
+
+public class BADParserFactory implements IParserFactory {
+
+    @Override
+    public IParser createParser(String query) {
+        return new BADAQLParser(query);
+    }
+
+    @Override
+    public IParser createParser(Reader reader) {
+        return new BADAQLParser(reader);
+    }
+
+}
diff --git a/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorExtension.java b/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorExtension.java
new file mode 100644
index 0000000..4198230
--- /dev/null
+++ b/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorExtension.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.lang;
+
+import java.util.List;
+
+import org.apache.asterix.app.cc.CompilerExtensionManager;
+import org.apache.asterix.app.cc.IStatementExecutorExtension;
+import org.apache.asterix.common.api.ExtensionId;
+import org.apache.asterix.runtime.util.AsterixAppContextInfo;
+import org.apache.asterix.translator.IStatementExecutorFactory;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+
+public class BADQueryTranslatorExtension implements IStatementExecutorExtension {
+
+    public static final ExtensionId BAD_QUERY_TRANSLATOR_EXTENSION_ID = new ExtensionId(
+            BADQueryTranslatorExtension.class.getSimpleName(), 0);
+
+    private static class LazyHolder {
+        private static final IStatementExecutorFactory INSTANCE = new BADQueryTranslatorFactory(
+                (CompilerExtensionManager) AsterixAppContextInfo.INSTANCE.getExtensionManager());
+    }
+
+    @Override
+    public ExtensionId getId() {
+        return BAD_QUERY_TRANSLATOR_EXTENSION_ID;
+    }
+
+    @Override
+    public void configure(List<Pair<String, String>> args) {
+    }
+
+    @Override
+    public IStatementExecutorFactory getQueryTranslatorFactory() {
+        return LazyHolder.INSTANCE;
+    }
+}
diff --git a/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorFactory.java b/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorFactory.java
new file mode 100644
index 0000000..b8a6050
--- /dev/null
+++ b/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorFactory.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.lang;
+
+import java.util.List;
+
+import org.apache.asterix.app.cc.CompilerExtensionManager;
+import org.apache.asterix.app.translator.DefaultStatementExecutorFactory;
+import org.apache.asterix.app.translator.QueryTranslator;
+import org.apache.asterix.common.app.SessionConfig;
+import org.apache.asterix.compiler.provider.ILangCompilationProvider;
+import org.apache.asterix.lang.common.base.Statement;
+
+public class BADQueryTranslatorFactory extends DefaultStatementExecutorFactory {
+
+    public BADQueryTranslatorFactory(CompilerExtensionManager ccExtensionManager) {
+        super(ccExtensionManager);
+    }
+
+    @Override
+    public QueryTranslator create(List<Statement> statements, SessionConfig conf,
+            ILangCompilationProvider compilationProvider) {
+        return new BADStatementExecutor(statements, conf, compilationProvider, cExtensionManager);
+    }
+}
diff --git a/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java b/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
new file mode 100644
index 0000000..1d4864f
--- /dev/null
+++ b/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.lang;
+
+import java.util.List;
+
+import org.apache.asterix.app.cc.CompilerExtensionManager;
+import org.apache.asterix.app.translator.QueryTranslator;
+import org.apache.asterix.common.app.SessionConfig;
+import org.apache.asterix.compiler.provider.ILangCompilationProvider;
+import org.apache.asterix.lang.common.base.Statement;
+
+public class BADStatementExecutor extends QueryTranslator {
+
+    public BADStatementExecutor(List<Statement> aqlStatements, SessionConfig conf,
+            ILangCompilationProvider compliationProvider, CompilerExtensionManager ccExtensionManager) {
+        super(aqlStatements, conf, compliationProvider, ccExtensionManager);
+    }
+
+    /*
+    @Override
+    protected void handleDataverseDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+            IHyracksClientConnection hcc) throws Exception {
+        super(metadataProvider, stmt, hcc);
+        //TODO: need to drop channels and brokers
+        //TODO: need to check if datasets or functions are in use by channels
+    }*/
+
+}
diff --git a/src/main/java/org/apache/asterix/bad/lang/statement/BrokerDropStatement.java b/src/main/java/org/apache/asterix/bad/lang/statement/BrokerDropStatement.java
new file mode 100644
index 0000000..53b5ff7
--- /dev/null
+++ b/src/main/java/org/apache/asterix/bad/lang/statement/BrokerDropStatement.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.lang.statement;
+
+import org.apache.asterix.algebra.extension.IExtensionStatement;
+import org.apache.asterix.app.translator.QueryTranslator;
+import org.apache.asterix.bad.lang.BADLangExtension;
+import org.apache.asterix.bad.metadata.Broker;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.lang.common.struct.Identifier;
+import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.MetadataTransactionContext;
+import org.apache.asterix.metadata.declared.AqlMetadataProvider;
+import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
+import org.apache.asterix.translator.IStatementExecutor.Stats;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.dataset.IHyracksDataset;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class BrokerDropStatement implements IExtensionStatement {
+
+    private final Identifier dataverseName;
+    private final Identifier brokerName;
+    private boolean ifExists;
+
+    public BrokerDropStatement(Identifier dataverseName, Identifier brokerName, boolean ifExists) {
+        this.brokerName = brokerName;
+        this.dataverseName = dataverseName;
+        this.ifExists = ifExists;
+    }
+
+    public boolean getIfExists() {
+        return ifExists;
+    }
+
+    public Identifier getDataverseName() {
+        return dataverseName;
+    }
+
+    public Identifier getBrokerName() {
+        return brokerName;
+    }
+
+    @Override
+    public byte getKind() {
+        return Kind.EXTENSION;
+    }
+
+    @Override
+    public byte getCategory() {
+        return Category.DDL;
+    }
+
+    @Override
+    public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws AsterixException {
+        return null;
+    }
+
+    @Override
+    public void handle(IStatementExecutor statementExecutor, AqlMetadataProvider metadataProvider,
+            IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats,
+            int resultSetIdCounter) throws HyracksDataException, AlgebricksException {
+        //TODO: dont drop a broker that's being used
+        String dataverse = ((QueryTranslator) statementExecutor).getActiveDataverse(dataverseName);
+        MetadataTransactionContext mdTxnCtx = null;
+        try {
+            mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+            metadataProvider.setMetadataTxnContext(mdTxnCtx);
+            Broker broker = BADLangExtension.getBroker(mdTxnCtx, dataverse, brokerName.getValue());
+            if (broker == null) {
+                throw new AlgebricksException("A broker with this name " + brokerName + " doesn't exist.");
+            }
+            MetadataManager.INSTANCE.deleteEntity(mdTxnCtx, broker);
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+        } catch (Exception e) {
+            QueryTranslator.abort(e, e, mdTxnCtx);
+            throw new HyracksDataException(e);
+        }
+    }
+
+}
\ No newline at end of file
diff --git a/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java b/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
new file mode 100644
index 0000000..0faefa3
--- /dev/null
+++ b/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.lang.statement;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.asterix.active.ActiveJobNotificationHandler;
+import org.apache.asterix.active.ActiveRuntimeId;
+import org.apache.asterix.active.EntityId;
+import org.apache.asterix.active.message.ActiveManagerMessage;
+import org.apache.asterix.algebra.extension.IExtensionStatement;
+import org.apache.asterix.app.translator.QueryTranslator;
+import org.apache.asterix.bad.BADConstants;
+import org.apache.asterix.bad.ChannelJobInfo;
+import org.apache.asterix.bad.lang.BADLangExtension;
+import org.apache.asterix.bad.metadata.Channel;
+import org.apache.asterix.bad.metadata.ChannelEventsListener;
+import org.apache.asterix.bad.runtime.RepetitiveChannelOperatorNodePushable;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.messaging.api.ICCMessageBroker;
+import org.apache.asterix.external.feed.api.IActiveLifecycleEventSubscriber;
+import org.apache.asterix.external.feed.api.IActiveLifecycleEventSubscriber.ActiveLifecycleEvent;
+import org.apache.asterix.external.feed.management.ActiveLifecycleEventSubscriber;
+import org.apache.asterix.lang.common.statement.DropDatasetStatement;
+import org.apache.asterix.lang.common.struct.Identifier;
+import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.MetadataTransactionContext;
+import org.apache.asterix.metadata.declared.AqlMetadataProvider;
+import org.apache.asterix.runtime.util.AsterixAppContextInfo;
+import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
+import org.apache.asterix.translator.IStatementExecutor.Stats;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.dataset.IHyracksDataset;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class ChannelDropStatement implements IExtensionStatement {
+
+    private final Identifier dataverseName;
+    private final Identifier channelName;
+    private boolean ifExists;
+
+    public ChannelDropStatement(Identifier dataverseName, Identifier channelName, boolean ifExists) {
+        this.dataverseName = dataverseName;
+        this.channelName = channelName;
+        this.ifExists = ifExists;
+    }
+
+    public Identifier getDataverseName() {
+        return dataverseName;
+    }
+
+    public Identifier getChannelName() {
+        return channelName;
+    }
+
+    public boolean getIfExists() {
+        return ifExists;
+    }
+
+    @Override
+    public byte getKind() {
+        return Kind.EXTENSION;
+    }
+
+    @Override
+    public byte getCategory() {
+        return Category.DDL;
+    }
+
+    @Override
+    public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws AsterixException {
+        return null;
+    }
+
+    @Override
+    public void handle(IStatementExecutor statementExecutor, AqlMetadataProvider metadataProvider,
+            IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats,
+            int resultSetIdCounter) throws HyracksDataException, AlgebricksException {
+
+        String dataverse = ((QueryTranslator) statementExecutor).getActiveDataverse(dataverseName);
+        boolean txnActive = false;
+        EntityId entityId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, dataverse, channelName.getValue());
+        ChannelEventsListener listener = (ChannelEventsListener) ActiveJobNotificationHandler.INSTANCE
+                .getActiveEntityListener(entityId);
+        IActiveLifecycleEventSubscriber eventSubscriber = new ActiveLifecycleEventSubscriber();
+        boolean subscriberRegistered = false;
+        Channel channel = null;
+
+        MetadataTransactionContext mdTxnCtx = null;
+        try {
+            mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+            txnActive = true;
+            channel = BADLangExtension.getChannel(mdTxnCtx, dataverse, channelName.getValue());
+            txnActive = false;
+            if (channel == null) {
+                if (ifExists) {
+                    MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+                    return;
+                } else {
+                    throw new AlgebricksException("There is no channel with this name " + channelName + ".");
+                }
+            }
+            if (listener != null) {
+                subscriberRegistered = listener.isChannelActive(entityId, eventSubscriber);
+            }
+            if (!subscriberRegistered) {
+                throw new AsterixException("Channel " + channelName + " is not running");
+            }
+
+            ICCMessageBroker messageBroker = (ICCMessageBroker) AsterixAppContextInfo.INSTANCE.getCCApplicationContext()
+                    .getMessageBroker();
+
+            ChannelJobInfo cInfo = listener.getJobInfo(channel.getChannelId());;
+            Set<String> ncs = new HashSet<>(cInfo.getLocations());
+            AlgebricksAbsolutePartitionConstraint locations = new AlgebricksAbsolutePartitionConstraint(
+                    ncs.toArray(new String[ncs.size()]));
+            int partition = 0;
+            for (String location : locations.getLocations()) {
+                messageBroker.sendApplicationMessageToNC(
+                        new ActiveManagerMessage(ActiveManagerMessage.STOP_ACTIVITY, "cc",
+                                new ActiveRuntimeId(channel.getChannelId(),
+                                        RepetitiveChannelOperatorNodePushable.class.getSimpleName(), partition++)),
+                        location);
+            }
+            eventSubscriber.assertEvent(ActiveLifecycleEvent.ACTIVE_JOB_ENDED);
+
+            //Drop the Channel Datasets
+            //TODO: Need to find some way to handle if this fails.
+            //TODO: Prevent datasets for Channels from being dropped elsewhere
+            DropDatasetStatement dropStmt = new DropDatasetStatement(new Identifier(dataverse),
+                    new Identifier(channel.getResultsDatasetName()), true);
+            ((QueryTranslator) statementExecutor).handleDatasetDropStatement(metadataProvider, dropStmt, hcc);
+
+            dropStmt = new DropDatasetStatement(new Identifier(dataverse),
+                    new Identifier(channel.getSubscriptionsDataset()), true);
+            ((QueryTranslator) statementExecutor).handleDatasetDropStatement(metadataProvider, dropStmt, hcc);
+
+            if (subscriberRegistered) {
+                listener.deregisterEventSubscriber(eventSubscriber);
+            }
+
+            //Remove the Channel Metadata
+            MetadataManager.INSTANCE.deleteEntity(mdTxnCtx, channel);
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+        } catch (Exception e) {
+            e.printStackTrace();
+            if (txnActive) {
+                QueryTranslator.abort(e, e, mdTxnCtx);
+            }
+            throw new HyracksDataException(e);
+        }
+    }
+
+}
\ No newline at end of file
diff --git a/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java b/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
new file mode 100644
index 0000000..7d0cb1a
--- /dev/null
+++ b/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.lang.statement;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.asterix.algebra.extension.IExtensionStatement;
+import org.apache.asterix.app.translator.QueryTranslator;
+import org.apache.asterix.bad.BADConstants;
+import org.apache.asterix.bad.lang.BADLangExtension;
+import org.apache.asterix.bad.metadata.Broker;
+import org.apache.asterix.bad.metadata.Channel;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.functions.FunctionSignature;
+import org.apache.asterix.lang.common.base.Expression;
+import org.apache.asterix.lang.common.expression.CallExpr;
+import org.apache.asterix.lang.common.expression.FieldBinding;
+import org.apache.asterix.lang.common.expression.LiteralExpr;
+import org.apache.asterix.lang.common.expression.RecordConstructor;
+import org.apache.asterix.lang.common.literal.StringLiteral;
+import org.apache.asterix.lang.common.statement.InsertStatement;
+import org.apache.asterix.lang.common.statement.Query;
+import org.apache.asterix.lang.common.statement.UpsertStatement;
+import org.apache.asterix.lang.common.struct.Identifier;
+import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.MetadataTransactionContext;
+import org.apache.asterix.metadata.declared.AqlMetadataProvider;
+import org.apache.asterix.om.functions.AsterixBuiltinFunctions;
+import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
+import org.apache.asterix.translator.IStatementExecutor.Stats;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.dataset.IHyracksDataset;
+import org.apache.hyracks.api.dataset.ResultSetId;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class ChannelSubscribeStatement implements IExtensionStatement {
+
+    private final Identifier dataverseName;
+    private final Identifier channelName;
+    private final Identifier brokerDataverseName;
+    private final Identifier brokerName;
+    private final List<Expression> argList;
+    private final String subscriptionId;
+    private final int varCounter;
+
+    public ChannelSubscribeStatement(Identifier dataverseName, Identifier channelName, List<Expression> argList,
+            int varCounter, Identifier brokerDataverseName, Identifier brokerName, String subscriptionId) {
+        this.channelName = channelName;
+        this.dataverseName = dataverseName;
+        this.brokerDataverseName = brokerDataverseName;
+        this.brokerName = brokerName;
+        this.argList = argList;
+        this.subscriptionId = subscriptionId;
+        this.varCounter = varCounter;
+    }
+
+    public Identifier getDataverseName() {
+        return dataverseName;
+    }
+
+    public Identifier getBrokerDataverseName() {
+        return brokerDataverseName;
+    }
+
+    public Identifier getChannelName() {
+        return channelName;
+    }
+
+    public Identifier getBrokerName() {
+        return brokerName;
+    }
+
+    public List<Expression> getArgList() {
+        return argList;
+    }
+
+    public int getVarCounter() {
+        return varCounter;
+    }
+
+    public String getSubscriptionId() {
+        return subscriptionId;
+    }
+
+    @Override
+    public byte getKind() {
+        return Kind.EXTENSION;
+    }
+
+    @Override
+    public byte getCategory() {
+        return Category.QUERY;
+    }
+
+    @Override
+    public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws AsterixException {
+        return null;
+    }
+
+    @Override
+    public void handle(IStatementExecutor statementExecutor, AqlMetadataProvider metadataProvider,
+            IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats,
+            int resultSetIdCounter) throws HyracksDataException, AlgebricksException {
+
+        String dataverse = ((QueryTranslator) statementExecutor).getActiveDataverse(dataverseName);
+        String brokerDataverse = ((QueryTranslator) statementExecutor).getActiveDataverse(brokerDataverseName);
+
+        MetadataTransactionContext mdTxnCtx = null;
+        try {
+            mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+
+            Channel channel = BADLangExtension.getChannel(mdTxnCtx, dataverse, channelName.getValue());
+            if (channel == null) {
+                throw new AsterixException("There is no channel with this name " + channelName + ".");
+            }
+            Broker broker = BADLangExtension.getBroker(mdTxnCtx, brokerDataverse, brokerName.getValue());
+            if (broker == null) {
+                throw new AsterixException("There is no broker with this name " + brokerName + ".");
+            }
+
+            String subscriptionsDatasetName = channel.getSubscriptionsDataset();
+
+            if (argList.size() != channel.getFunction().getArity()) {
+                throw new AsterixException("Channel expected " + channel.getFunction().getArity()
+                        + " parameters but got " + argList.size());
+            }
+
+            Query subscriptionTuple = new Query(false);
+
+            List<FieldBinding> fb = new ArrayList<FieldBinding>();
+            LiteralExpr leftExpr = new LiteralExpr(new StringLiteral(BADConstants.DataverseName));
+            Expression rightExpr = new LiteralExpr(new StringLiteral(brokerDataverse));
+            fb.add(new FieldBinding(leftExpr, rightExpr));
+
+            leftExpr = new LiteralExpr(new StringLiteral(BADConstants.BrokerName));
+            rightExpr = new LiteralExpr(new StringLiteral(broker.getBrokerName()));
+            fb.add(new FieldBinding(leftExpr, rightExpr));
+
+            if (subscriptionId != null) {
+                leftExpr = new LiteralExpr(new StringLiteral(BADConstants.SubscriptionId));
+
+                List<Expression> UUIDList = new ArrayList<Expression>();
+                UUIDList.add(new LiteralExpr(new StringLiteral(subscriptionId)));
+                FunctionIdentifier function = AsterixBuiltinFunctions.UUID_CONSTRUCTOR;
+                FunctionSignature UUIDfunc = new FunctionSignature(function.getNamespace(), function.getName(),
+                        function.getArity());
+                CallExpr UUIDCall = new CallExpr(UUIDfunc, UUIDList);
+
+                rightExpr = UUIDCall;
+                fb.add(new FieldBinding(leftExpr, rightExpr));
+            }
+
+            for (int i = 0; i < argList.size(); i++) {
+                leftExpr = new LiteralExpr(new StringLiteral("param" + i));
+                rightExpr = argList.get(i);
+                fb.add(new FieldBinding(leftExpr, rightExpr));
+            }
+            RecordConstructor recordCon = new RecordConstructor(fb);
+            subscriptionTuple.setBody(recordCon);
+
+            subscriptionTuple.setVarCounter(varCounter);
+
+            if (subscriptionId == null) {
+                List<String> returnField = new ArrayList<>();
+                returnField.add(BADConstants.SubscriptionId);
+                metadataProvider.setResultSetId(new ResultSetId(resultSetIdCounter++));
+                metadataProvider.setResultAsyncMode(
+                        resultDelivery == ResultDelivery.ASYNC || resultDelivery == ResultDelivery.ASYNC_DEFERRED);
+                InsertStatement insert = new InsertStatement(new Identifier(dataverse),
+                        new Identifier(subscriptionsDatasetName), subscriptionTuple, varCounter, false, returnField);
+                ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider, insert, hcc, hdc,
+                        resultDelivery, stats, false);
+            } else {
+                UpsertStatement upsert = new UpsertStatement(new Identifier(dataverse),
+                        new Identifier(subscriptionsDatasetName), subscriptionTuple, varCounter);
+                ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider, upsert, hcc, hdc,
+                        resultDelivery, stats, false);
+            }
+
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+        } catch (Exception e) {
+            QueryTranslator.abort(e, e, mdTxnCtx);
+            throw new HyracksDataException(e);
+        }
+
+    }
+
+}
\ No newline at end of file
diff --git a/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java b/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
new file mode 100644
index 0000000..50696b4
--- /dev/null
+++ b/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.lang.statement;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.asterix.algebra.extension.IExtensionStatement;
+import org.apache.asterix.app.translator.QueryTranslator;
+import org.apache.asterix.bad.BADConstants;
+import org.apache.asterix.bad.lang.BADLangExtension;
+import org.apache.asterix.bad.metadata.Channel;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.functions.FunctionSignature;
+import org.apache.asterix.lang.aql.visitor.AqlDeleteRewriteVisitor;
+import org.apache.asterix.lang.common.base.Expression;
+import org.apache.asterix.lang.common.expression.CallExpr;
+import org.apache.asterix.lang.common.expression.FieldAccessor;
+import org.apache.asterix.lang.common.expression.LiteralExpr;
+import org.apache.asterix.lang.common.expression.OperatorExpr;
+import org.apache.asterix.lang.common.expression.VariableExpr;
+import org.apache.asterix.lang.common.literal.StringLiteral;
+import org.apache.asterix.lang.common.statement.DeleteStatement;
+import org.apache.asterix.lang.common.struct.Identifier;
+import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.MetadataTransactionContext;
+import org.apache.asterix.metadata.declared.AqlMetadataProvider;
+import org.apache.asterix.om.functions.AsterixBuiltinFunctions;
+import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
+import org.apache.asterix.translator.IStatementExecutor.Stats;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.dataset.IHyracksDataset;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class ChannelUnsubscribeStatement implements IExtensionStatement {
+
+    private final Identifier dataverseName;
+    private final Identifier channelName;
+    private final String subscriptionId;
+    private final int varCounter;
+    private VariableExpr vars;
+    private List<String> dataverses;
+    private List<String> datasets;
+
+    public ChannelUnsubscribeStatement(VariableExpr vars, Identifier dataverseName, Identifier channelName,
+            String subscriptionId, int varCounter, List<String> dataverses, List<String> datasets) {
+        this.vars = vars;
+        this.channelName = channelName;
+        this.dataverseName = dataverseName;
+        this.subscriptionId = subscriptionId;
+        this.varCounter = varCounter;
+        this.dataverses = dataverses;
+        this.datasets = datasets;
+    }
+
+    public Identifier getDataverseName() {
+        return dataverseName;
+    }
+
+    public VariableExpr getVariableExpr() {
+        return vars;
+    }
+
+    public Identifier getChannelName() {
+        return channelName;
+    }
+
+    public String getsubScriptionId() {
+        return subscriptionId;
+    }
+
+    public List<String> getDataverses() {
+        return dataverses;
+    }
+
+    public List<String> getDatasets() {
+        return datasets;
+    }
+
+    public int getVarCounter() {
+        return varCounter;
+    }
+
+    @Override
+    public byte getKind() {
+        return Kind.EXTENSION;
+    }
+
+    @Override
+    public byte getCategory() {
+        return Category.UPDATE;
+    }
+
+    @Override
+    public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws AsterixException {
+        return null;
+    }
+
+    @Override
+    public void handle(IStatementExecutor statementExecutor, AqlMetadataProvider metadataProvider,
+            IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats,
+            int resultSetIdCounter) throws HyracksDataException, AlgebricksException {
+        String dataverse = ((QueryTranslator) statementExecutor).getActiveDataverse(dataverseName);
+
+        MetadataTransactionContext mdTxnCtx = null;
+        try {
+            mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+
+            Channel channel = BADLangExtension.getChannel(mdTxnCtx, dataverse, channelName.getValue());
+            if (channel == null) {
+                throw new AsterixException("There is no channel with this name " + channelName + ".");
+            }
+
+            String subscriptionsDatasetName = channel.getSubscriptionsDataset();
+
+            //Need a condition to say subscription-id = sid
+            OperatorExpr condition = new OperatorExpr();
+            FieldAccessor fa = new FieldAccessor(vars, new Identifier(BADConstants.SubscriptionId));
+            condition.addOperand(fa);
+            condition.setCurrentop(true);
+            condition.addOperator("=");
+
+            List<Expression> UUIDList = new ArrayList<Expression>();
+            UUIDList.add(new LiteralExpr(new StringLiteral(subscriptionId)));
+
+            FunctionIdentifier function = AsterixBuiltinFunctions.UUID_CONSTRUCTOR;
+            FunctionSignature UUIDfunc = new FunctionSignature(function.getNamespace(), function.getName(),
+                    function.getArity());
+            CallExpr UUIDCall = new CallExpr(UUIDfunc, UUIDList);
+
+            condition.addOperand(UUIDCall);
+
+            DeleteStatement delete = new DeleteStatement(vars, new Identifier(dataverse),
+                    new Identifier(subscriptionsDatasetName), condition, varCounter, dataverses, datasets);
+            AqlDeleteRewriteVisitor visitor = new AqlDeleteRewriteVisitor();
+            delete.accept(visitor, null);
+
+            ((QueryTranslator) statementExecutor).handleDeleteStatement(metadataProvider, delete, hcc);
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+        } catch (Exception e) {
+            QueryTranslator.abort(e, e, mdTxnCtx);
+            throw new HyracksDataException(e);
+        }
+    }
+}
\ No newline at end of file
diff --git a/src/main/java/org/apache/asterix/bad/lang/statement/CreateBrokerStatement.java b/src/main/java/org/apache/asterix/bad/lang/statement/CreateBrokerStatement.java
new file mode 100644
index 0000000..a4d0eaf
--- /dev/null
+++ b/src/main/java/org/apache/asterix/bad/lang/statement/CreateBrokerStatement.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.lang.statement;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.algebra.extension.IExtensionStatement;
+import org.apache.asterix.app.translator.QueryTranslator;
+import org.apache.asterix.bad.lang.BADLangExtension;
+import org.apache.asterix.bad.metadata.Broker;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.lang.common.struct.Identifier;
+import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.MetadataTransactionContext;
+import org.apache.asterix.metadata.declared.AqlMetadataProvider;
+import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
+import org.apache.asterix.translator.IStatementExecutor.Stats;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.dataset.IHyracksDataset;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class CreateBrokerStatement implements IExtensionStatement {
+
+    private static final Logger LOGGER = Logger.getLogger(CreateBrokerStatement.class.getName());
+    private final Identifier dataverseName;
+    private final Identifier brokerName;
+    private String endPointName;
+
+    public CreateBrokerStatement(Identifier dataverseName, Identifier brokerName, String endPointName) {
+        this.brokerName = brokerName;
+        this.dataverseName = dataverseName;
+        this.endPointName = endPointName;
+    }
+
+    public String getEndPointName() {
+        return endPointName;
+    }
+
+    public Identifier getDataverseName() {
+        return dataverseName;
+    }
+
+    public Identifier getBrokerName() {
+        return brokerName;
+    }
+
+    @Override
+    public byte getKind() {
+        return Kind.EXTENSION;
+    }
+
+    @Override
+    public byte getCategory() {
+        return Category.DDL;
+    }
+
+    @Override
+    public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws AsterixException {
+        return null;
+    }
+
+    @Override
+    public void handle(IStatementExecutor statementExecutor, AqlMetadataProvider metadataProvider,
+            IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats,
+            int resultSetIdCounter) throws HyracksDataException, AlgebricksException {
+        String dataverse = ((QueryTranslator) statementExecutor).getActiveDataverse(dataverseName);
+        MetadataTransactionContext mdTxnCtx = null;
+        try {
+            mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+            metadataProvider.setMetadataTxnContext(mdTxnCtx);
+            Broker broker = BADLangExtension.getBroker(mdTxnCtx, dataverse, brokerName.getValue());
+            if (broker != null) {
+                throw new AlgebricksException("A broker with this name " + brokerName + " already exists.");
+            }
+            broker = new Broker(dataverse, brokerName.getValue(), endPointName);
+            MetadataManager.INSTANCE.addEntity(mdTxnCtx, broker);
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+        } catch (Exception e) {
+            if (mdTxnCtx != null) {
+                QueryTranslator.abort(e, e, mdTxnCtx);
+            }
+            LOGGER.log(Level.WARNING, "Failed creating a broker", e);
+            throw new HyracksDataException(e);
+        }
+    }
+}
\ No newline at end of file
diff --git a/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java b/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
new file mode 100644
index 0000000..824e725
--- /dev/null
+++ b/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.lang.statement;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.StringReader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.active.ActiveJobNotificationHandler;
+import org.apache.asterix.active.ActivityState;
+import org.apache.asterix.active.EntityId;
+import org.apache.asterix.algebra.extension.IExtensionStatement;
+import org.apache.asterix.app.translator.QueryTranslator;
+import org.apache.asterix.bad.BADConstants;
+import org.apache.asterix.bad.ChannelJobInfo;
+import org.apache.asterix.bad.lang.BADLangExtension;
+import org.apache.asterix.bad.metadata.Channel;
+import org.apache.asterix.bad.metadata.ChannelEventsListener;
+import org.apache.asterix.bad.runtime.RepetitiveChannelOperatorDescriptor;
+import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.functions.FunctionSignature;
+import org.apache.asterix.external.feed.api.IActiveLifecycleEventSubscriber;
+import org.apache.asterix.external.feed.api.IActiveLifecycleEventSubscriber.ActiveLifecycleEvent;
+import org.apache.asterix.external.feed.management.ActiveLifecycleEventSubscriber;
+import org.apache.asterix.file.JobSpecificationUtils;
+import org.apache.asterix.lang.aql.parser.AQLParserFactory;
+import org.apache.asterix.lang.common.base.Expression;
+import org.apache.asterix.lang.common.base.Statement;
+import org.apache.asterix.lang.common.expression.CallExpr;
+import org.apache.asterix.lang.common.expression.LiteralExpr;
+import org.apache.asterix.lang.common.literal.StringLiteral;
+import org.apache.asterix.lang.common.statement.DatasetDecl;
+import org.apache.asterix.lang.common.statement.IDatasetDetailsDecl;
+import org.apache.asterix.lang.common.statement.InsertStatement;
+import org.apache.asterix.lang.common.statement.InternalDetailsDecl;
+import org.apache.asterix.lang.common.struct.Identifier;
+import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
+import org.apache.asterix.metadata.MetadataException;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.MetadataTransactionContext;
+import org.apache.asterix.metadata.declared.AqlMetadataProvider;
+import org.apache.asterix.metadata.entities.Function;
+import org.apache.asterix.om.base.temporal.ADurationParserFactory;
+import org.apache.asterix.runtime.util.AsterixAppContextInfo;
+import org.apache.asterix.runtime.util.AsterixClusterProperties;
+import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
+import org.apache.asterix.translator.IStatementExecutor.Stats;
+import org.apache.asterix.util.JobUtils;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.api.application.ICCApplicationContext;
+import org.apache.hyracks.api.client.ClusterControllerInfo;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
+import org.apache.hyracks.api.dataset.IHyracksDataset;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.dataflow.common.data.parsers.IValueParser;
+
+public class CreateChannelStatement implements IExtensionStatement {
+
+    private static final Logger LOGGER = Logger.getLogger(CreateChannelStatement.class.getName());
+
+    private final Identifier dataverseName;
+    private final Identifier channelName;
+    private final FunctionSignature function;
+    private final CallExpr period;
+    private String duration;
+    private InsertStatement channelResultsInsertQuery;
+    private String subscriptionsTableName;
+    private String resultsTableName;
+
+    public CreateChannelStatement(Identifier dataverseName, Identifier channelName, FunctionSignature function,
+            Expression period) {
+        this.channelName = channelName;
+        this.dataverseName = dataverseName;
+        this.function = function;
+        this.period = (CallExpr) period;
+        this.duration = "";
+    }
+
+    public Identifier getDataverseName() {
+        return dataverseName;
+    }
+
+    public Identifier getChannelName() {
+        return channelName;
+    }
+
+    public String getResultsName() {
+        return resultsTableName;
+    }
+
+    public String getSubscriptionsName() {
+        return subscriptionsTableName;
+    }
+
+    public String getDuration() {
+        return duration;
+    }
+
+    public FunctionSignature getFunction() {
+        return function;
+    }
+
+    public Expression getPeriod() {
+        return period;
+    }
+
+    public InsertStatement getChannelResultsInsertQuery() {
+        return channelResultsInsertQuery;
+    }
+
+    @Override
+    public byte getCategory() {
+        return Category.DDL;
+    }
+
+    @Override
+    public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws AsterixException {
+        return null;
+    }
+
+    public void initialize(MetadataTransactionContext mdTxnCtx, String subscriptionsTableName, String resultsTableName)
+            throws MetadataException, HyracksDataException {
+        Function lookup = MetadataManager.INSTANCE.getFunction(mdTxnCtx, function);
+        if (lookup == null) {
+            throw new MetadataException(" Unknown function " + function.getName());
+        }
+
+        if (!period.getFunctionSignature().getName().equals("duration")) {
+            throw new MetadataException(
+                    "Expected argument period as a duration, but got " + period.getFunctionSignature().getName() + ".");
+        }
+        duration = ((StringLiteral) ((LiteralExpr) period.getExprList().get(0)).getValue()).getValue();
+        IValueParser durationParser = ADurationParserFactory.INSTANCE.createValueParser();
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        DataOutputStream outputStream = new DataOutputStream(bos);
+        durationParser.parse(duration.toCharArray(), 0, duration.toCharArray().length, outputStream);
+        this.resultsTableName = resultsTableName;
+        this.subscriptionsTableName = subscriptionsTableName;
+
+    }
+
+    @Override
+    public byte getKind() {
+        return Kind.EXTENSION;
+    }
+
+    public Pair<JobSpecification, AlgebricksAbsolutePartitionConstraint> buildChannelJobSpec(String dataverse,
+            String channelName, String duration, AqlMetadataProvider metadataProvider, JobSpecification channeljobSpec,
+            String strIP, int port) throws Exception {
+        JobSpecification spec = JobSpecificationUtils.createJobSpecification();
+        IOperatorDescriptor channelQueryExecuter;
+        AlgebricksPartitionConstraint executerPc;
+
+        Pair<IOperatorDescriptor, AlgebricksAbsolutePartitionConstraint> p = buildChannelRuntime(spec, dataverse,
+                channelName, duration, channeljobSpec, strIP, port);
+        channelQueryExecuter = p.first;
+        executerPc = p.second;
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, channelQueryExecuter, executerPc);
+        spec.addRoot(channelQueryExecuter);
+        return new Pair<>(spec, p.second);
+
+    }
+
+    public Pair<IOperatorDescriptor, AlgebricksAbsolutePartitionConstraint> buildChannelRuntime(
+            JobSpecification jobSpec, String dataverse, String channelName, String duration,
+            JobSpecification channeljobSpec, String strIP, int port) throws Exception {
+        RepetitiveChannelOperatorDescriptor channelOp = new RepetitiveChannelOperatorDescriptor(jobSpec, dataverse,
+                channelName, duration, channeljobSpec, strIP, port);
+
+        String partition = AsterixClusterProperties.INSTANCE.getClusterLocations().getLocations()[0];
+        Set<String> ncs = new HashSet<>(Arrays.asList(partition));
+        AlgebricksAbsolutePartitionConstraint partitionConstraint = new AlgebricksAbsolutePartitionConstraint(
+                ncs.toArray(new String[ncs.size()]));
+        return new Pair<IOperatorDescriptor, AlgebricksAbsolutePartitionConstraint>(channelOp, partitionConstraint);
+    }
+
+    private void createDatasets(IStatementExecutor statementExecutor, Identifier subscriptionsName,
+            Identifier resultsName, AqlMetadataProvider metadataProvider, IHyracksClientConnection hcc,
+            IHyracksDataset hdc, Stats stats, String dataverse) throws AsterixException, Exception {
+
+        Identifier subscriptionsTypeName = new Identifier(BADConstants.ChannelSubscriptionsType);
+        Identifier resultsTypeName = new Identifier(BADConstants.ChannelResultsType);
+        //Setup the subscriptions dataset
+        List<List<String>> partitionFields = new ArrayList<List<String>>();
+        List<Integer> keyIndicators = new ArrayList<Integer>();
+        keyIndicators.add(0);
+        List<String> fieldNames = new ArrayList<String>();
+        fieldNames.add(BADConstants.SubscriptionId);
+        partitionFields.add(fieldNames);
+        IDatasetDetailsDecl idd = new InternalDetailsDecl(partitionFields, keyIndicators, true, null, false);
+        DatasetDecl createSubscriptionsDataset = new DatasetDecl(new Identifier(dataverse), subscriptionsName,
+                new Identifier(BADConstants.BAD_DATAVERSE_NAME), subscriptionsTypeName, null, null, null, null,
+                new HashMap<String, String>(), new HashMap<String, String>(), DatasetType.INTERNAL, idd, true);
+
+        //Setup the results dataset
+        partitionFields = new ArrayList<List<String>>();
+        fieldNames = new ArrayList<String>();
+        fieldNames.add(BADConstants.ResultId);
+        partitionFields.add(fieldNames);
+        idd = new InternalDetailsDecl(partitionFields, keyIndicators, true, null, false);
+        DatasetDecl createResultsDataset = new DatasetDecl(new Identifier(dataverse), resultsName,
+                new Identifier(BADConstants.BAD_DATAVERSE_NAME), resultsTypeName, null, null, null, null,
+                new HashMap<String, String>(), new HashMap<String, String>(), DatasetType.INTERNAL, idd, true);
+
+        //Run both statements to create datasets
+        ((QueryTranslator) statementExecutor).handleCreateDatasetStatement(metadataProvider, createSubscriptionsDataset,
+                hcc);
+        ((QueryTranslator) statementExecutor).handleCreateDatasetStatement(metadataProvider, createResultsDataset, hcc);
+
+    }
+
+    private JobSpecification createChannelJob(IStatementExecutor statementExecutor, Identifier subscriptionsName,
+            Identifier resultsName, AqlMetadataProvider metadataProvider, IHyracksClientConnection hcc,
+            IHyracksDataset hdc, Stats stats, String dataverse) throws Exception {
+        StringBuilder builder = new StringBuilder();
+        builder.append("insert into dataset " + dataverse + "." + resultsName + " ");
+        builder.append(" (" + " let $" + BADConstants.ChannelExecutionTime + " := current-datetime() \n");
+
+        builder.append("for $sub in dataset " + dataverse + "." + subscriptionsName + "\n");
+        builder.append(
+                "for $broker in dataset " + BADConstants.BAD_DATAVERSE_NAME + "." + BADConstants.BROKER_KEYWORD + "\n");
+        builder.append("where $broker." + BADConstants.BrokerName + "= $sub." + BADConstants.BrokerName + "\n");
+        builder.append("and $broker." + BADConstants.DataverseName + "= $sub." + BADConstants.DataverseName + "\n");
+        builder.append(" for $result in " + function.getNamespace() + "." + function.getName() + "(");
+        int i = 0;
+        for (; i < function.getArity() - 1; i++) {
+            builder.append("$sub.param" + i + ",");
+        }
+        builder.append("$sub.param" + i + ")\n");
+        builder.append("return {\n");
+        builder.append("\"" + BADConstants.ChannelExecutionTime + "\":$" + BADConstants.ChannelExecutionTime + ",");
+        builder.append("\"" + BADConstants.SubscriptionId + "\":$sub." + BADConstants.SubscriptionId + ",");
+        builder.append("\"" + BADConstants.DeliveryTime + "\":current-datetime(),");
+        builder.append("\"result\":$result");
+        builder.append("}");
+        builder.append(")");
+        builder.append(" return records");
+        builder.append(";");
+        AQLParserFactory aqlFact = new AQLParserFactory();
+        List<Statement> fStatements = aqlFact.createParser(new StringReader(builder.toString())).parse();
+        return ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider, fStatements.get(0),
+                hcc, hdc, ResultDelivery.ASYNC, stats, true);
+    }
+
+    @Override
+    public void handle(IStatementExecutor statementExecutor, AqlMetadataProvider metadataProvider,
+            IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats,
+            int resultSetIdCounter) throws HyracksDataException, AlgebricksException {
+
+        //This function performs three tasks:
+        //1. Create datasets for the Channel
+        //2. Create the compiled Channel Job
+        //3. Create the metadata entry for the channel
+
+        //TODO: Figure out how to handle when a subset of the 3 tasks fails
+        //TODO: The compiled job will break if anything changes to the function or two datasets
+        // Need to make sure we do proper checking when altering these things
+
+        String dataverse = ((QueryTranslator) statementExecutor).getActiveDataverse(dataverseName);
+
+        Identifier subscriptionsName = new Identifier(channelName + BADConstants.subscriptionEnding);
+        Identifier resultsName = new Identifier(channelName + BADConstants.resultsEnding);
+        EntityId entityId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, dataverse, channelName.getValue());
+        ChannelEventsListener listener = (ChannelEventsListener) ActiveJobNotificationHandler.INSTANCE
+                .getActiveEntityListener(entityId);
+        IActiveLifecycleEventSubscriber eventSubscriber = new ActiveLifecycleEventSubscriber();
+        boolean subscriberRegistered = false;
+        Channel channel = null;
+
+        MetadataTransactionContext mdTxnCtx = null;
+        try {
+            mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+            metadataProvider.setMetadataTxnContext(mdTxnCtx);
+            channel = BADLangExtension.getChannel(mdTxnCtx, dataverse, channelName.getValue());
+            if (channel != null) {
+                throw new AlgebricksException("A channel with this name " + channelName + " already exists.");
+            }
+            if (listener != null) {
+                subscriberRegistered = listener.isChannelActive(entityId, eventSubscriber);
+            }
+            if (subscriberRegistered) {
+                throw new AsterixException("Channel " + channelName + " is already running");
+            }
+            initialize(mdTxnCtx, subscriptionsName.getValue(), resultsName.getValue());
+            channel = new Channel(dataverse, channelName.getValue(), subscriptionsTableName, resultsTableName, function,
+                    duration);
+
+            //check if names are available before creating anything
+            if (MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverse, subscriptionsName.getValue()) != null) {
+                throw new AsterixException("The channel name:" + channelName + " is not available.");
+            }
+            if (MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverse, resultsName.getValue()) != null) {
+                throw new AsterixException("The channel name:" + channelName + " is not available.");
+            }
+
+            // Now we subscribe
+            if (listener == null) {
+                listener = new ChannelEventsListener(entityId);
+                ActiveJobNotificationHandler.INSTANCE.registerListener(listener);
+            }
+            listener.registerEventSubscriber(eventSubscriber);
+            subscriberRegistered = true;
+
+            //Create Channel Datasets
+            createDatasets(statementExecutor, subscriptionsName, resultsName, metadataProvider, hcc, hdc, stats,
+                    dataverse);
+
+            //Create Channel Internal Job
+            JobSpecification channeljobSpec = createChannelJob(statementExecutor, subscriptionsName, resultsName,
+                    metadataProvider, hcc, hdc, stats, dataverse);
+
+            //Create Channel Operator
+            ICCApplicationContext iCCApp = AsterixAppContextInfo.INSTANCE.getCCApplicationContext();
+            ClusterControllerInfo ccInfo = iCCApp.getCCContext().getClusterControllerInfo();
+            String strIP = ccInfo.getClientNetAddress();
+            int port = ccInfo.getClientNetPort();
+            Pair<JobSpecification, AlgebricksAbsolutePartitionConstraint> alteredJobSpec = buildChannelJobSpec(
+                    dataverse, channelName.getValue(), duration, metadataProvider, channeljobSpec, strIP, port);
+
+            ChannelJobInfo channelJobInfo = new ChannelJobInfo(entityId, null, ActivityState.ACTIVE,
+                    alteredJobSpec.first);
+            alteredJobSpec.first.setProperty(ActiveJobNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME, channelJobInfo);
+            JobUtils.runJob(hcc, alteredJobSpec.first, false);
+
+            eventSubscriber.assertEvent(ActiveLifecycleEvent.ACTIVE_JOB_STARTED);
+
+            MetadataManager.INSTANCE.addEntity(mdTxnCtx, channel);
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+        } catch (Exception e) {
+            if (mdTxnCtx != null) {
+                QueryTranslator.abort(e, e, mdTxnCtx);
+            }
+            LOGGER.log(Level.WARNING, "Failed creating a channel", e);
+            throw new HyracksDataException(e);
+        }
+
+    }
+
+}
\ No newline at end of file
diff --git a/src/main/java/org/apache/asterix/bad/metadata/BADMetadataExtension.java b/src/main/java/org/apache/asterix/bad/metadata/BADMetadataExtension.java
new file mode 100644
index 0000000..05ab4c6
--- /dev/null
+++ b/src/main/java/org/apache/asterix/bad/metadata/BADMetadataExtension.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.metadata;
+
+import java.rmi.RemoteException;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.asterix.bad.BADConstants;
+import org.apache.asterix.common.api.ExtensionId;
+import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.MetadataTransactionContext;
+import org.apache.asterix.metadata.api.ExtensionMetadataDataset;
+import org.apache.asterix.metadata.api.IMetadataEntity;
+import org.apache.asterix.metadata.api.IMetadataExtension;
+import org.apache.asterix.metadata.api.IMetadataIndex;
+import org.apache.asterix.metadata.bootstrap.MetadataBootstrap;
+import org.apache.asterix.metadata.entities.Datatype;
+import org.apache.asterix.metadata.entities.Dataverse;
+import org.apache.asterix.metadata.entitytupletranslators.MetadataTupleTranslatorProvider;
+import org.apache.asterix.runtime.formats.NonTaggedDataFormat;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class BADMetadataExtension implements IMetadataExtension {
+
+    public static final ExtensionId BAD_METADATA_EXTENSION_ID = new ExtensionId(
+            BADConstants.BAD_METADATA_EXTENSION_NAME, 0);
+    public static final Dataverse BAD_DATAVERSE = new Dataverse(BADConstants.BAD_DATAVERSE_NAME,
+            NonTaggedDataFormat.class.getName(), IMetadataEntity.PENDING_NO_OP);
+
+    public static final Datatype BAD_SUBSCRIPTION_DATATYPE = new Datatype(BADConstants.BAD_DATAVERSE_NAME,
+            BADConstants.ChannelSubscriptionsType, BADMetadataRecordTypes.channelSubscriptionsType, false);
+    public static final Datatype BAD_RESULT_DATATYPE = new Datatype(BADConstants.BAD_DATAVERSE_NAME,
+            BADConstants.ChannelResultsType, BADMetadataRecordTypes.channelResultsType, false);
+
+    public static final Datatype BAD_BROKER_DATATYPE = new Datatype(BADConstants.BAD_DATAVERSE_NAME,
+            BADConstants.RECORD_TYPENAME_BROKER, BADMetadataRecordTypes.BROKER_RECORDTYPE, false);
+
+    public static final Datatype BAD_CHANNEL_DATATYPE = new Datatype(BADConstants.BAD_DATAVERSE_NAME,
+            BADConstants.RECORD_TYPENAME_CHANNEL, BADMetadataRecordTypes.CHANNEL_RECORDTYPE, false);
+
+    @Override
+    public ExtensionId getId() {
+        return BAD_METADATA_EXTENSION_ID;
+    }
+
+    @Override
+    public void configure(List<Pair<String, String>> args) {
+        // do nothing??
+    }
+
+    @Override
+    public MetadataTupleTranslatorProvider getMetadataTupleTranslatorProvider() {
+        return new MetadataTupleTranslatorProvider();
+    }
+
+    @SuppressWarnings("rawtypes")
+    @Override
+    public List<ExtensionMetadataDataset> getExtensionIndexes() {
+        try {
+            return Arrays.asList(BADMetadataIndexes.CHANNEL_DATASET, BADMetadataIndexes.BROKER_DATASET);
+        } catch (Throwable th) {
+            th.printStackTrace();
+            throw th;
+        }
+    }
+
+    @Override
+    public void initializeMetadata() throws HyracksDataException, RemoteException, ACIDException {
+        // enlist datasets
+        MetadataBootstrap.enlistMetadataDataset(BADMetadataIndexes.CHANNEL_DATASET);
+        MetadataBootstrap.enlistMetadataDataset(BADMetadataIndexes.BROKER_DATASET);
+        if (MetadataBootstrap.isNewUniverse()) {
+            MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+            try {
+                // add metadata datasets
+                MetadataBootstrap.insertMetadataDatasets(mdTxnCtx,
+                        new IMetadataIndex[] { BADMetadataIndexes.CHANNEL_DATASET, BADMetadataIndexes.BROKER_DATASET });
+                // insert default dataverse
+                // TODO prevent user from dropping this dataverse
+                // MetadataManager.INSTANCE.addDataverse(mdTxnCtx, BAD_DATAVERSE);
+                // insert default data type
+                MetadataManager.INSTANCE.addDatatype(mdTxnCtx, BAD_RESULT_DATATYPE);
+                MetadataManager.INSTANCE.addDatatype(mdTxnCtx, BAD_SUBSCRIPTION_DATATYPE);
+                MetadataManager.INSTANCE.addDatatype(mdTxnCtx, BAD_BROKER_DATATYPE);
+                MetadataManager.INSTANCE.addDatatype(mdTxnCtx, BAD_CHANNEL_DATATYPE);
+                // TODO prevent user from dropping these types
+                MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+            } catch (Exception e) {
+                e.printStackTrace();
+                MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+            }
+        }
+        // local recovery?
+        // nothing for now
+    }
+}
diff --git a/src/main/java/org/apache/asterix/bad/metadata/BADMetadataIndexes.java b/src/main/java/org/apache/asterix/bad/metadata/BADMetadataIndexes.java
new file mode 100644
index 0000000..848fe78
--- /dev/null
+++ b/src/main/java/org/apache/asterix/bad/metadata/BADMetadataIndexes.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.metadata;
+
+import java.util.Arrays;
+
+import org.apache.asterix.bad.BADConstants;
+import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
+import org.apache.asterix.metadata.api.ExtensionMetadataDataset;
+import org.apache.asterix.metadata.api.ExtensionMetadataDatasetId;
+import org.apache.asterix.metadata.bootstrap.MetadataRecordTypes;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.IAType;
+
+public class BADMetadataIndexes {
+
+    public static final ExtensionMetadataDatasetId BAD_CHANNEL_INDEX_ID = new ExtensionMetadataDatasetId(
+            BADMetadataExtension.BAD_METADATA_EXTENSION_ID, BADConstants.CHANNEL_EXTENSION_NAME);
+    public static final MetadataIndexImmutableProperties PROPERTIES_CHANNEL = new MetadataIndexImmutableProperties(
+            BADConstants.CHANNEL_EXTENSION_NAME,
+            MetadataIndexImmutableProperties.FIRST_AVAILABLE_EXTENSION_METADATA_DATASET_ID,
+            MetadataIndexImmutableProperties.FIRST_AVAILABLE_EXTENSION_METADATA_DATASET_ID);
+
+    public static final ExtensionMetadataDatasetId BAD_BROKER_INDEX_ID = new ExtensionMetadataDatasetId(
+            BADMetadataExtension.BAD_METADATA_EXTENSION_ID, BADConstants.BROKER_KEYWORD);
+    public static final MetadataIndexImmutableProperties PROPERTIES_BROKER = new MetadataIndexImmutableProperties(
+            BADConstants.BROKER_KEYWORD,
+            MetadataIndexImmutableProperties.FIRST_AVAILABLE_EXTENSION_METADATA_DATASET_ID + 1,
+            MetadataIndexImmutableProperties.FIRST_AVAILABLE_EXTENSION_METADATA_DATASET_ID + 1);
+
+    public static final int NUM_FIELDS_CHANNEL_IDX = 3;
+    public static final int NUM_FIELDS_BROKER_IDX = 3;
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    public static final ExtensionMetadataDataset CHANNEL_DATASET = new ExtensionMetadataDataset(PROPERTIES_CHANNEL,
+            NUM_FIELDS_CHANNEL_IDX, new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING },
+            Arrays.asList(Arrays.asList(MetadataRecordTypes.FIELD_NAME_DATAVERSE_NAME),
+                    Arrays.asList(BADConstants.ChannelName)),
+            0, BADMetadataRecordTypes.CHANNEL_RECORDTYPE, true, new int[] { 0, 1 }, BAD_CHANNEL_INDEX_ID,
+            new ChannelTupleTranslator(true));
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    public static final ExtensionMetadataDataset BROKER_DATASET = new ExtensionMetadataDataset(PROPERTIES_BROKER,
+            NUM_FIELDS_BROKER_IDX, new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING },
+            Arrays.asList(Arrays.asList(MetadataRecordTypes.FIELD_NAME_DATAVERSE_NAME),
+                    Arrays.asList(BADConstants.BrokerName)),
+            0, BADMetadataRecordTypes.BROKER_RECORDTYPE, true, new int[] { 0, 1 }, BAD_BROKER_INDEX_ID,
+            new BrokerTupleTranslator(true));
+
+}
diff --git a/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java b/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java
new file mode 100644
index 0000000..cec98d0
--- /dev/null
+++ b/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.metadata;
+
+import org.apache.asterix.bad.BADConstants;
+import org.apache.asterix.metadata.bootstrap.MetadataRecordTypes;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.IAType;
+
+public class BADMetadataRecordTypes {
+
+    // -------------------------------------- Subscriptions --------------------------------------//
+    private static final String[] subTypeFieldNames = { BADConstants.DataverseName, BADConstants.BrokerName,
+            BADConstants.SubscriptionId };
+    private static final IAType[] subTypeFieldTypes = { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.AUUID };
+    public static final ARecordType channelSubscriptionsType = new ARecordType(BADConstants.ChannelSubscriptionsType,
+            subTypeFieldNames, subTypeFieldTypes, true);
+
+    // ---------------------------------------- Results --------------------------------------------//
+    private static final String[] resultTypeFieldNames = { BADConstants.ResultId, BADConstants.ChannelExecutionTime,
+            BADConstants.SubscriptionId, BADConstants.DeliveryTime };
+    private static final IAType[] resultTypeFieldTypes = { BuiltinType.AUUID, BuiltinType.ADATETIME, BuiltinType.AUUID,
+            BuiltinType.ADATETIME };
+    public static final ARecordType channelResultsType = new ARecordType(BADConstants.ChannelResultsType,
+            resultTypeFieldNames, resultTypeFieldTypes, true);
+
+    //------------------------------------------ Channel ----------------------------------------//     
+    public static final int CHANNEL_ARECORD_DATAVERSE_NAME_FIELD_INDEX = 0;
+    public static final int CHANNEL_ARECORD_CHANNEL_NAME_FIELD_INDEX = 1;
+    public static final int CHANNEL_ARECORD_SUBSCRIPTIONS_NAME_FIELD_INDEX = 2;
+    public static final int CHANNEL_ARECORD_RESULTS_NAME_FIELD_INDEX = 3;
+    public static final int CHANNEL_ARECORD_FUNCTION_FIELD_INDEX = 4;
+    public static final int CHANNEL_ARECORD_DURATION_FIELD_INDEX = 5;
+    public static final ARecordType CHANNEL_RECORDTYPE = MetadataRecordTypes.createRecordType(
+            // RecordTypeName
+            BADConstants.RECORD_TYPENAME_CHANNEL,
+            // FieldNames
+            new String[] { BADConstants.DataverseName, BADConstants.ChannelName, BADConstants.SubscriptionsDatasetName,
+                    BADConstants.ResultsDatasetName, BADConstants.Function, BADConstants.Duration },
+            // FieldTypes
+            new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING,
+                    BuiltinType.ASTRING, BuiltinType.ASTRING },
+            //IsOpen?
+            true);
+    //------------------------------------------ Broker ----------------------------------------//
+    public static final int BROKER_DATAVERSE_NAME_FIELD_INDEX = 0;
+    public static final int BROKER_NAME_FIELD_INDEX = 1;
+    public static final int BROKER_ENDPOINT_FIELD_INDEX = 2;
+    public static final ARecordType BROKER_RECORDTYPE = MetadataRecordTypes.createRecordType(
+            // RecordTypeName
+            BADConstants.RECORD_TYPENAME_BROKER,
+            // FieldNames
+            new String[] { BADConstants.DataverseName, BADConstants.BrokerName, BADConstants.BrokerEndPoint },
+            // FieldTypes
+            new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING,
+                    BuiltinType.ASTRING, BuiltinType.ASTRING },
+            //IsOpen?
+            true);
+
+}
diff --git a/src/main/java/org/apache/asterix/bad/metadata/Broker.java b/src/main/java/org/apache/asterix/bad/metadata/Broker.java
new file mode 100644
index 0000000..006f0dc
--- /dev/null
+++ b/src/main/java/org/apache/asterix/bad/metadata/Broker.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2009-2015 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.asterix.bad.metadata;
+
+import org.apache.asterix.metadata.api.ExtensionMetadataDatasetId;
+import org.apache.asterix.metadata.api.IExtensionMetadataEntity;
+
+/**
+ * Metadata describing a broker.
+ */
+public class Broker implements IExtensionMetadataEntity {
+
+    private static final long serialVersionUID = 1L;
+
+    private final String dataverseName;
+    private final String brokerName;
+    private final String endPointName;
+
+    public Broker(String dataverseName, String brokerName, String endPointName) {
+        this.endPointName = endPointName;
+        this.dataverseName = dataverseName;
+        this.brokerName = brokerName;
+    }
+
+    public String getDataverseName() {
+        return dataverseName;
+    }
+
+    public String getBrokerName() {
+        return brokerName;
+    }
+
+    public String getEndPointName() {
+        return endPointName;
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (this == other) {
+            return true;
+        }
+        if (!(other instanceof Broker)) {
+            return false;
+        }
+        Broker otherDataset = (Broker) other;
+        if (!otherDataset.brokerName.equals(brokerName)) {
+            return false;
+        }
+        return true;
+    }
+
+    @Override
+    public ExtensionMetadataDatasetId getDatasetId() {
+        return BADMetadataIndexes.BAD_BROKER_INDEX_ID;
+    }
+}
\ No newline at end of file
diff --git a/src/main/java/org/apache/asterix/bad/metadata/BrokerSearchKey.java b/src/main/java/org/apache/asterix/bad/metadata/BrokerSearchKey.java
new file mode 100644
index 0000000..b73e9e3
--- /dev/null
+++ b/src/main/java/org/apache/asterix/bad/metadata/BrokerSearchKey.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.metadata;
+
+import org.apache.asterix.metadata.MetadataNode;
+import org.apache.asterix.metadata.api.ExtensionMetadataDatasetId;
+import org.apache.asterix.metadata.api.IExtensionMetadataSearchKey;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+
+public class BrokerSearchKey implements IExtensionMetadataSearchKey {
+    private static final long serialVersionUID = 1L;
+    private final String dataverse;
+    private final String broker;
+
+    public BrokerSearchKey(String dataverse, String broker) {
+        this.dataverse = dataverse;
+        this.broker = broker;
+    }
+
+    @Override
+    public ExtensionMetadataDatasetId getDatasetId() {
+        return BADMetadataIndexes.BAD_BROKER_INDEX_ID;
+    }
+
+    @Override
+    public ITupleReference getSearchKey() {
+        return MetadataNode.createTuple(dataverse, broker);
+    }
+}
diff --git a/src/main/java/org/apache/asterix/bad/metadata/BrokerTupleTranslator.java b/src/main/java/org/apache/asterix/bad/metadata/BrokerTupleTranslator.java
new file mode 100644
index 0000000..0a37c02
--- /dev/null
+++ b/src/main/java/org/apache/asterix/bad/metadata/BrokerTupleTranslator.java
@@ -0,0 +1,118 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.asterix.bad.metadata;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.IOException;
+
+import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import org.apache.asterix.metadata.MetadataException;
+import org.apache.asterix.metadata.entitytupletranslators.AbstractTupleTranslator;
+import org.apache.asterix.om.base.ARecord;
+import org.apache.asterix.om.base.AString;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+
+/**
+ * Translates a Channel metadata entity to an ITupleReference and vice versa.
+ */
+public class BrokerTupleTranslator extends AbstractTupleTranslator<Broker> {
+    // Field indexes of serialized Broker in a tuple.
+    // Key field.
+    public static final int BROKER_DATAVERSE_NAME_FIELD_INDEX = 0;
+
+    public static final int BROKER_NAME_FIELD_INDEX = 1;
+
+    // Payload field containing serialized broker.
+    public static final int BROKER_PAYLOAD_TUPLE_FIELD_INDEX = 2;
+
+    @SuppressWarnings("unchecked")
+    private ISerializerDeserializer<ARecord> recordSerDes = AqlSerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(BADMetadataRecordTypes.BROKER_RECORDTYPE);
+
+    @SuppressWarnings("unchecked")
+    public BrokerTupleTranslator(boolean getTuple) {
+        super(getTuple, BADMetadataIndexes.NUM_FIELDS_BROKER_IDX);
+    }
+
+    @Override
+    public Broker getMetadataEntityFromTuple(ITupleReference frameTuple) throws IOException {
+        byte[] serRecord = frameTuple.getFieldData(BROKER_PAYLOAD_TUPLE_FIELD_INDEX);
+        int recordStartOffset = frameTuple.getFieldStart(BROKER_PAYLOAD_TUPLE_FIELD_INDEX);
+        int recordLength = frameTuple.getFieldLength(BROKER_PAYLOAD_TUPLE_FIELD_INDEX);
+        ByteArrayInputStream stream = new ByteArrayInputStream(serRecord, recordStartOffset, recordLength);
+        DataInput in = new DataInputStream(stream);
+        ARecord channelRecord = recordSerDes.deserialize(in);
+        return createBrokerFromARecord(channelRecord);
+    }
+
+    private Broker createBrokerFromARecord(ARecord brokerRecord) {
+        Broker broker = null;
+        String dataverseName = ((AString) brokerRecord
+                .getValueByPos(BADMetadataRecordTypes.BROKER_DATAVERSE_NAME_FIELD_INDEX)).getStringValue();
+        String brokerName = ((AString) brokerRecord.getValueByPos(BADMetadataRecordTypes.BROKER_NAME_FIELD_INDEX))
+                .getStringValue();
+        String endPointName = ((AString) brokerRecord.getValueByPos(BADMetadataRecordTypes.BROKER_ENDPOINT_FIELD_INDEX))
+                .getStringValue();
+
+        broker = new Broker(dataverseName, brokerName, endPointName);
+        return broker;
+    }
+
+    @Override
+    public ITupleReference getTupleFromMetadataEntity(Broker broker) throws IOException, MetadataException {
+        // write the key in the first fields of the tuple
+
+        tupleBuilder.reset();
+        aString.setValue(broker.getDataverseName());
+        stringSerde.serialize(aString, tupleBuilder.getDataOutput());
+        tupleBuilder.addFieldEndOffset();
+
+        aString.setValue(broker.getBrokerName());
+        stringSerde.serialize(aString, tupleBuilder.getDataOutput());
+        tupleBuilder.addFieldEndOffset();
+
+        recordBuilder.reset(BADMetadataRecordTypes.BROKER_RECORDTYPE);
+
+        // write field 0
+        fieldValue.reset();
+        aString.setValue(broker.getDataverseName());
+        stringSerde.serialize(aString, fieldValue.getDataOutput());
+        recordBuilder.addField(BADMetadataRecordTypes.BROKER_DATAVERSE_NAME_FIELD_INDEX, fieldValue);
+
+        // write field 1
+        fieldValue.reset();
+        aString.setValue(broker.getBrokerName());
+        stringSerde.serialize(aString, fieldValue.getDataOutput());
+        recordBuilder.addField(BADMetadataRecordTypes.BROKER_NAME_FIELD_INDEX, fieldValue);
+
+        // write field 2
+        fieldValue.reset();
+        aString.setValue(broker.getEndPointName());
+        stringSerde.serialize(aString, fieldValue.getDataOutput());
+        recordBuilder.addField(BADMetadataRecordTypes.BROKER_ENDPOINT_FIELD_INDEX, fieldValue);
+
+        // write record
+        recordBuilder.write(tupleBuilder.getDataOutput(), true);
+
+        tupleBuilder.addFieldEndOffset();
+
+        tuple.reset(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray());
+        return tuple;
+    }
+}
\ No newline at end of file
diff --git a/src/main/java/org/apache/asterix/bad/metadata/Channel.java b/src/main/java/org/apache/asterix/bad/metadata/Channel.java
new file mode 100644
index 0000000..b201af6
--- /dev/null
+++ b/src/main/java/org/apache/asterix/bad/metadata/Channel.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2009-2015 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.asterix.bad.metadata;
+
+import org.apache.asterix.active.EntityId;
+import org.apache.asterix.bad.BADConstants;
+import org.apache.asterix.common.functions.FunctionSignature;
+import org.apache.asterix.metadata.api.ExtensionMetadataDatasetId;
+import org.apache.asterix.metadata.api.IExtensionMetadataEntity;
+
+/**
+ * Metadata describing a channel.
+ */
+public class Channel implements IExtensionMetadataEntity {
+
+    private static final long serialVersionUID = 1L;
+
+    /** A unique identifier for the channel */
+    protected final EntityId channelId;
+    private final String subscriptionsDatasetName;
+    private final String resultsDatasetName;
+    private final String duration;
+    private final FunctionSignature function;
+
+    public Channel(String dataverseName, String channelName, String subscriptionsDataset, String resultsDataset,
+            FunctionSignature function, String duration) {
+        this.channelId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, dataverseName, channelName);
+        this.function = function;
+        this.duration = duration;
+        this.resultsDatasetName = resultsDataset;
+        this.subscriptionsDatasetName = subscriptionsDataset;
+    }
+
+    public EntityId getChannelId() {
+        return channelId;
+    }
+
+    public String getSubscriptionsDataset() {
+        return subscriptionsDatasetName;
+    }
+
+    public String getResultsDatasetName() {
+        return resultsDatasetName;
+    }
+
+    public String getDuration() {
+        return duration;
+    }
+
+    public FunctionSignature getFunction() {
+        return function;
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (this == other) {
+            return true;
+        }
+        if (!(other instanceof Channel)) {
+            return false;
+        }
+        Channel otherDataset = (Channel) other;
+        if (!otherDataset.channelId.equals(channelId)) {
+            return false;
+        }
+        return true;
+    }
+
+    @Override
+    public ExtensionMetadataDatasetId getDatasetId() {
+        return BADMetadataIndexes.BAD_CHANNEL_INDEX_ID;
+    }
+}
\ No newline at end of file
diff --git a/src/main/java/org/apache/asterix/bad/metadata/ChannelEventsListener.java b/src/main/java/org/apache/asterix/bad/metadata/ChannelEventsListener.java
new file mode 100644
index 0000000..82c97c8
--- /dev/null
+++ b/src/main/java/org/apache/asterix/bad/metadata/ChannelEventsListener.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.metadata;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.asterix.active.ActiveEvent;
+import org.apache.asterix.active.ActiveJob;
+import org.apache.asterix.active.ActivityState;
+import org.apache.asterix.active.EntityId;
+import org.apache.asterix.active.IActiveEntityEventsListener;
+import org.apache.asterix.bad.BADConstants;
+import org.apache.asterix.bad.ChannelJobInfo;
+import org.apache.asterix.bad.runtime.RepetitiveChannelOperatorDescriptor;
+import org.apache.asterix.external.feed.api.IActiveLifecycleEventSubscriber;
+import org.apache.asterix.external.feed.api.IActiveLifecycleEventSubscriber.ActiveLifecycleEvent;
+import org.apache.asterix.external.feed.management.FeedConnectionId;
+import org.apache.asterix.runtime.util.AsterixAppContextInfo;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
+import org.apache.hyracks.api.dataflow.OperatorDescriptorId;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobInfo;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.job.JobStatus;
+import org.apache.log4j.Logger;
+
+public class ChannelEventsListener implements IActiveEntityEventsListener {
+    private static final Logger LOGGER = Logger.getLogger(ChannelEventsListener.class);
+    private final List<IActiveLifecycleEventSubscriber> subscribers;
+    private final Map<Long, ActiveJob> jobs;
+    private final Map<EntityId, ChannelJobInfo> jobInfos;
+    private EntityId entityId;
+
+    public ChannelEventsListener(EntityId entityId) {
+        this.entityId = entityId;
+        subscribers = new ArrayList<>();
+        jobs = new HashMap<>();
+        jobInfos = new HashMap<>();
+    }
+
+    @Override
+    public void notify(ActiveEvent event) {
+        try {
+            switch (event.getEventKind()) {
+                case JOB_START:
+                    handleJobStartEvent(event);
+                    break;
+                case JOB_FINISH:
+                    handleJobFinishEvent(event);
+                    break;
+                default:
+                    LOGGER.warn("Unknown Channel Event" + event);
+                    break;
+            }
+        } catch (Exception e) {
+            LOGGER.error("Unhandled Exception", e);
+        }
+    }
+
+    private synchronized void handleJobStartEvent(ActiveEvent message) throws Exception {
+        ActiveJob jobInfo = jobs.get(message.getJobId().getId());
+        handleJobStartMessage((ChannelJobInfo) jobInfo);
+    }
+
+    private synchronized void handleJobFinishEvent(ActiveEvent message) throws Exception {
+        ActiveJob jobInfo = jobs.get(message.getJobId().getId());
+        if (LOGGER.isInfoEnabled()) {
+            LOGGER.info("Channel Job finished for  " + jobInfo);
+        }
+        handleJobFinishMessage((ChannelJobInfo) jobInfo);
+    }
+
+    private synchronized void handleJobFinishMessage(ChannelJobInfo cInfo) throws Exception {
+        EntityId channelJobId = cInfo.getEntityId();
+
+        IHyracksClientConnection hcc = AsterixAppContextInfo.INSTANCE.getHcc();
+        JobInfo info = hcc.getJobInfo(cInfo.getJobId());
+        JobStatus status = info.getStatus();
+        boolean failure = status != null && status.equals(JobStatus.FAILURE);
+
+        jobInfos.remove(channelJobId);
+        jobs.remove(cInfo.getJobId().getId());
+        // notify event listeners
+        ActiveLifecycleEvent event = failure ? ActiveLifecycleEvent.ACTIVE_JOB_FAILED
+                : ActiveLifecycleEvent.ACTIVE_JOB_ENDED;
+        notifyEventSubscribers(event);
+    }
+
+    private void notifyEventSubscribers(ActiveLifecycleEvent event) {
+        if (subscribers != null && !subscribers.isEmpty()) {
+            for (IActiveLifecycleEventSubscriber subscriber : subscribers) {
+                subscriber.handleEvent(event);
+            }
+        }
+    }
+
+    private static synchronized void handleJobStartMessage(ChannelJobInfo cInfo) throws Exception {
+        List<OperatorDescriptorId> channelOperatorIds = new ArrayList<>();
+        Map<OperatorDescriptorId, IOperatorDescriptor> operators = cInfo.getSpec().getOperatorMap();
+        for (Entry<OperatorDescriptorId, IOperatorDescriptor> entry : operators.entrySet()) {
+            IOperatorDescriptor opDesc = entry.getValue();
+            if (opDesc instanceof RepetitiveChannelOperatorDescriptor) {
+                channelOperatorIds.add(opDesc.getOperatorId());
+            }
+        }
+
+        IHyracksClientConnection hcc = AsterixAppContextInfo.INSTANCE.getHcc();
+        JobInfo info = hcc.getJobInfo(cInfo.getJobId());
+        List<String> locations = new ArrayList<>();
+        for (OperatorDescriptorId channelOperatorId : channelOperatorIds) {
+            Map<Integer, String> operatorLocations = info.getOperatorLocations().get(channelOperatorId);
+            int nOperatorInstances = operatorLocations.size();
+            for (int i = 0; i < nOperatorInstances; i++) {
+                locations.add(operatorLocations.get(i));
+            }
+        }
+        cInfo.setLocations(locations);
+        cInfo.setState(ActivityState.ACTIVE);
+    }
+
+    @Override
+    public void notifyJobCreation(JobId jobId, JobSpecification spec) {
+        EntityId channelId = null;
+        try {
+            for (IOperatorDescriptor opDesc : spec.getOperatorMap().values()) {
+                if (opDesc instanceof RepetitiveChannelOperatorDescriptor) {
+                    channelId = ((RepetitiveChannelOperatorDescriptor) opDesc).getEntityId();
+                    registerJob(channelId, jobId, spec);
+                    return;
+                }
+            }
+        } catch (Exception e) {
+            LOGGER.error(e);
+        }
+    }
+
+    public synchronized void registerJob(EntityId entityId, JobId jobId, JobSpecification jobSpec) {
+        if (jobs.get(jobId.getId()) != null) {
+            throw new IllegalStateException("Channel job already registered");
+        }
+        if (jobInfos.containsKey(jobId.getId())) {
+            throw new IllegalStateException("Channel job already registered");
+        }
+
+        ChannelJobInfo cInfo = new ChannelJobInfo(entityId, jobId, ActivityState.CREATED, jobSpec);
+        jobs.put(jobId.getId(), cInfo);
+        jobInfos.put(entityId, cInfo);
+
+        if (LOGGER.isInfoEnabled()) {
+            LOGGER.info("Registered channel job [" + jobId + "]" + " for channel " + entityId);
+        }
+
+        notifyEventSubscribers(ActiveLifecycleEvent.ACTIVE_JOB_STARTED);
+
+    }
+
+    public JobSpecification getJobSpecification(EntityId activeJobId) {
+        return jobInfos.get(activeJobId).getSpec();
+    }
+
+    public ChannelJobInfo getJobInfo(EntityId activeJobId) {
+        return jobInfos.get(activeJobId);
+    }
+
+    public synchronized void registerEventSubscriber(IActiveLifecycleEventSubscriber subscriber) {
+        subscribers.add(subscriber);
+    }
+
+    public void deregisterEventSubscriber(IActiveLifecycleEventSubscriber subscriber) {
+        subscribers.remove(subscriber);
+    }
+
+    public synchronized boolean isChannelActive(EntityId activeJobId, IActiveLifecycleEventSubscriber eventSubscriber) {
+        boolean active = false;
+        ChannelJobInfo cInfo = jobInfos.get(activeJobId);
+        if (cInfo != null) {
+            active = cInfo.getState().equals(ActivityState.ACTIVE);
+        }
+        if (active) {
+            registerEventSubscriber(eventSubscriber);
+        }
+        return active;
+    }
+
+    public FeedConnectionId[] getConnections() {
+        return jobInfos.keySet().toArray(new FeedConnectionId[jobInfos.size()]);
+    }
+
+    @Override
+    public boolean isEntityActive() {
+        return !jobs.isEmpty();
+    }
+
+    @Override
+    public EntityId getEntityId() {
+        return entityId;
+    }
+
+    @Override
+    public boolean isEntityUsingDataset(String dataverseName, String datasetName) {
+        if (entityId.getDataverse().equals(dataverseName)) {
+            String subscriptionsName = entityId.getEntityName() + BADConstants.subscriptionEnding;
+            String resultsName = entityId.getEntityName() + BADConstants.resultsEnding;
+            if (datasetName.equals(subscriptionsName) || datasetName.equals(resultsName)) {
+                return true;
+            }
+        }
+        return false;
+    }
+}
diff --git a/src/main/java/org/apache/asterix/bad/metadata/ChannelSearchKey.java b/src/main/java/org/apache/asterix/bad/metadata/ChannelSearchKey.java
new file mode 100644
index 0000000..679548c
--- /dev/null
+++ b/src/main/java/org/apache/asterix/bad/metadata/ChannelSearchKey.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.metadata;
+
+import org.apache.asterix.metadata.MetadataNode;
+import org.apache.asterix.metadata.api.ExtensionMetadataDatasetId;
+import org.apache.asterix.metadata.api.IExtensionMetadataSearchKey;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+
+public class ChannelSearchKey implements IExtensionMetadataSearchKey {
+    private static final long serialVersionUID = 1L;
+    private final String dataverse;
+    private final String channel;
+
+    public ChannelSearchKey(String dataverse, String channel) {
+        this.dataverse = dataverse;
+        this.channel = channel;
+    }
+
+    @Override
+    public ExtensionMetadataDatasetId getDatasetId() {
+        return BADMetadataIndexes.BAD_CHANNEL_INDEX_ID;
+    }
+
+    @Override
+    public ITupleReference getSearchKey() {
+        return MetadataNode.createTuple(dataverse, channel);
+    }
+}
diff --git a/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java b/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java
new file mode 100644
index 0000000..18b2067
--- /dev/null
+++ b/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java
@@ -0,0 +1,159 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.asterix.bad.metadata;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.IOException;
+
+import org.apache.asterix.common.functions.FunctionSignature;
+import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import org.apache.asterix.metadata.MetadataException;
+import org.apache.asterix.metadata.entitytupletranslators.AbstractTupleTranslator;
+import org.apache.asterix.om.base.ARecord;
+import org.apache.asterix.om.base.AString;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+
+/**
+ * Translates a Channel metadata entity to an ITupleReference and vice versa.
+ */
+public class ChannelTupleTranslator extends AbstractTupleTranslator<Channel> {
+    // Field indexes of serialized Feed in a tuple.
+    // Key field.
+    public static final int CHANNEL_DATAVERSE_NAME_FIELD_INDEX = 0;
+
+    public static final int CHANNEL_NAME_FIELD_INDEX = 1;
+
+    // Payload field containing serialized feed.
+    public static final int CHANNEL_PAYLOAD_TUPLE_FIELD_INDEX = 2;
+
+    @SuppressWarnings("unchecked")
+    private ISerializerDeserializer<ARecord> recordSerDes = AqlSerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(BADMetadataRecordTypes.CHANNEL_RECORDTYPE);
+
+    @SuppressWarnings("unchecked")
+    public ChannelTupleTranslator(boolean getTuple) {
+        super(getTuple, BADMetadataIndexes.NUM_FIELDS_CHANNEL_IDX);
+    }
+
+    @Override
+    public Channel getMetadataEntityFromTuple(ITupleReference frameTuple) throws IOException {
+        byte[] serRecord = frameTuple.getFieldData(CHANNEL_PAYLOAD_TUPLE_FIELD_INDEX);
+        int recordStartOffset = frameTuple.getFieldStart(CHANNEL_PAYLOAD_TUPLE_FIELD_INDEX);
+        int recordLength = frameTuple.getFieldLength(CHANNEL_PAYLOAD_TUPLE_FIELD_INDEX);
+        ByteArrayInputStream stream = new ByteArrayInputStream(serRecord, recordStartOffset, recordLength);
+        DataInput in = new DataInputStream(stream);
+        ARecord channelRecord = recordSerDes.deserialize(in);
+        return createChannelFromARecord(channelRecord);
+    }
+
+    private Channel createChannelFromARecord(ARecord channelRecord) {
+        Channel channel = null;
+        String dataverseName = ((AString) channelRecord
+                .getValueByPos(BADMetadataRecordTypes.CHANNEL_ARECORD_DATAVERSE_NAME_FIELD_INDEX)).getStringValue();
+        String channelName = ((AString) channelRecord
+                .getValueByPos(BADMetadataRecordTypes.CHANNEL_ARECORD_CHANNEL_NAME_FIELD_INDEX)).getStringValue();
+        String subscriptionsName = ((AString) channelRecord
+                .getValueByPos(BADMetadataRecordTypes.CHANNEL_ARECORD_SUBSCRIPTIONS_NAME_FIELD_INDEX)).getStringValue();
+        String resultsName = ((AString) channelRecord
+                .getValueByPos(BADMetadataRecordTypes.CHANNEL_ARECORD_RESULTS_NAME_FIELD_INDEX)).getStringValue();
+        String fName = ((AString) channelRecord
+                .getValueByPos(BADMetadataRecordTypes.CHANNEL_ARECORD_FUNCTION_FIELD_INDEX)).getStringValue();
+        String duration = ((AString) channelRecord
+                .getValueByPos(BADMetadataRecordTypes.CHANNEL_ARECORD_DURATION_FIELD_INDEX)).getStringValue();
+
+        FunctionSignature signature = null;
+
+        String[] qnameComponents = fName.split("\\.");
+        String functionDataverse;
+        String functionName;
+        if (qnameComponents.length == 2) {
+            functionDataverse = qnameComponents[0];
+            functionName = qnameComponents[1];
+        } else {
+            functionDataverse = dataverseName;
+            functionName = qnameComponents[0];
+        }
+
+        String[] nameComponents = functionName.split("@");
+        signature = new FunctionSignature(functionDataverse, nameComponents[0], Integer.parseInt(nameComponents[1]));
+
+        channel = new Channel(dataverseName, channelName, subscriptionsName, resultsName, signature, duration);
+        return channel;
+    }
+
+    @Override
+    public ITupleReference getTupleFromMetadataEntity(Channel channel) throws IOException, MetadataException {
+        // write the key in the first fields of the tuple
+
+        tupleBuilder.reset();
+        aString.setValue(channel.getChannelId().getDataverse());
+        stringSerde.serialize(aString, tupleBuilder.getDataOutput());
+        tupleBuilder.addFieldEndOffset();
+
+        aString.setValue(channel.getChannelId().getEntityName());
+        stringSerde.serialize(aString, tupleBuilder.getDataOutput());
+        tupleBuilder.addFieldEndOffset();
+
+        recordBuilder.reset(BADMetadataRecordTypes.CHANNEL_RECORDTYPE);
+
+        // write field 0
+        fieldValue.reset();
+        aString.setValue(channel.getChannelId().getDataverse());
+        stringSerde.serialize(aString, fieldValue.getDataOutput());
+        recordBuilder.addField(BADMetadataRecordTypes.CHANNEL_ARECORD_DATAVERSE_NAME_FIELD_INDEX, fieldValue);
+
+        // write field 1
+        fieldValue.reset();
+        aString.setValue(channel.getChannelId().getEntityName());
+        stringSerde.serialize(aString, fieldValue.getDataOutput());
+        recordBuilder.addField(BADMetadataRecordTypes.CHANNEL_ARECORD_CHANNEL_NAME_FIELD_INDEX, fieldValue);
+
+        // write field 2
+        fieldValue.reset();
+        aString.setValue(channel.getSubscriptionsDataset());
+        stringSerde.serialize(aString, fieldValue.getDataOutput());
+        recordBuilder.addField(BADMetadataRecordTypes.CHANNEL_ARECORD_SUBSCRIPTIONS_NAME_FIELD_INDEX, fieldValue);
+
+        // write field 3
+        fieldValue.reset();
+        aString.setValue(channel.getResultsDatasetName());
+        stringSerde.serialize(aString, fieldValue.getDataOutput());
+        recordBuilder.addField(BADMetadataRecordTypes.CHANNEL_ARECORD_RESULTS_NAME_FIELD_INDEX, fieldValue);
+
+        // write field 4
+        fieldValue.reset();
+        aString.setValue(channel.getFunction().toString());
+        stringSerde.serialize(aString, fieldValue.getDataOutput());
+        recordBuilder.addField(BADMetadataRecordTypes.CHANNEL_ARECORD_FUNCTION_FIELD_INDEX, fieldValue);
+
+        // write field 5
+        fieldValue.reset();
+        aString.setValue(channel.getDuration());
+        stringSerde.serialize(aString, fieldValue.getDataOutput());
+        recordBuilder.addField(BADMetadataRecordTypes.CHANNEL_ARECORD_DURATION_FIELD_INDEX, fieldValue);
+
+        // write record
+        recordBuilder.write(tupleBuilder.getDataOutput(), true);
+
+        tupleBuilder.addFieldEndOffset();
+
+        tuple.reset(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray());
+        return tuple;
+    }
+}
\ No newline at end of file
diff --git a/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java b/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
new file mode 100644
index 0000000..8e19fc0
--- /dev/null
+++ b/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.rules;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.asterix.active.EntityId;
+import org.apache.asterix.algebra.operators.CommitOperator;
+import org.apache.asterix.bad.BADConstants;
+import org.apache.asterix.bad.runtime.NotifyBrokerOperator;
+import org.apache.asterix.bad.runtime.NotifyBrokerPOperator;
+import org.apache.asterix.lang.common.util.FunctionUtil;
+import org.apache.asterix.metadata.declared.DatasetDataSource;
+import org.apache.asterix.om.base.AString;
+import org.apache.asterix.om.constants.AsterixConstantValue;
+import org.apache.asterix.om.functions.AsterixBuiltinFunctions;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
+import org.apache.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
+import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+public class InsertBrokerNotifierForChannelRule implements IAlgebraicRewriteRule {
+
+    @Override
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
+        return false;
+    }
+
+    @Override
+    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
+        AbstractLogicalOperator op1 = (AbstractLogicalOperator) opRef.getValue();
+        if (op1.getOperatorTag() != LogicalOperatorTag.DISTRIBUTE_RESULT) {
+            return false;
+        }
+        AbstractLogicalOperator op = (AbstractLogicalOperator) op1.getInputs().get(0).getValue();
+        if (op.getOperatorTag() != LogicalOperatorTag.EXTENSION_OPERATOR) {
+            return false;
+        }
+        ExtensionOperator eOp = (ExtensionOperator) op;
+        if (!(eOp.getDelegate() instanceof CommitOperator)) {
+            return false;
+        }
+        AbstractLogicalOperator descendantOp = (AbstractLogicalOperator) eOp.getInputs().get(0).getValue();
+        if (descendantOp.getOperatorTag() != LogicalOperatorTag.INSERT_DELETE_UPSERT) {
+            return false;
+        }
+        InsertDeleteUpsertOperator insertOp = (InsertDeleteUpsertOperator) descendantOp;
+        if (insertOp.getOperation() != InsertDeleteUpsertOperator.Kind.INSERT) {
+            return false;
+        }
+        DatasetDataSource dds = (DatasetDataSource) insertOp.getDataSource();
+        String datasetName = dds.getDataset().getDatasetName();
+        if (!dds.getDataset().getItemTypeDataverseName().equals("Metadata")
+                || !dds.getDataset().getItemTypeName().equals("ChannelResultsType")
+                || !datasetName.endsWith("Results")) {
+            return false;
+        }
+        String channelDataverse = dds.getDataset().getDataverseName();
+        //Now we know that we are inserting into results
+
+        String channelName = datasetName.substring(0, datasetName.length() - 7);
+        String subscriptionsName = channelName + "Subscriptions";
+        //TODO: Can we check here to see if there is a channel with such a name?
+
+        DataSourceScanOperator subscriptionsScan = (DataSourceScanOperator) findOp(op, subscriptionsName);
+        if (subscriptionsScan == null) {
+            return false;
+        }
+
+        //Now we want to make sure and set the commit to be a nonsink commit
+        ((CommitOperator) eOp.getDelegate()).setSink(false);
+
+        //Now we need to get the broker EndPoint 
+        LogicalVariable brokerEndpointVar = context.newVar();
+        AbstractLogicalOperator opAboveBrokersScan = findOp(op, "brokers");
+        AssignOperator assignOp = createbrokerEndPointAssignOperator(brokerEndpointVar, opAboveBrokersScan);
+        //now brokerNameVar holds the brokerName for use farther up in the plan
+
+        //Place assignOp between the scan and the op above it
+        assignOp.getInputs().addAll(opAboveBrokersScan.getInputs());
+        opAboveBrokersScan.getInputs().clear();
+        opAboveBrokersScan.getInputs().add(new MutableObject<ILogicalOperator>(assignOp));
+        context.computeAndSetTypeEnvironmentForOperator(assignOp);
+        context.computeAndSetTypeEnvironmentForOperator(opAboveBrokersScan);
+        context.computeAndSetTypeEnvironmentForOperator(eOp);
+
+        //get subscriptionIdVar
+        LogicalVariable subscriptionIdVar = subscriptionsScan.getVariables().get(0);
+
+        //The channelExecutionTime is created just before the scan
+        LogicalVariable channelExecutionVar = ((AssignOperator) subscriptionsScan.getInputs().get(0).getValue())
+                .getVariables().get(0);
+
+        ProjectOperator badProject = (ProjectOperator) findOp(op, "project");
+        badProject.getVariables().add(subscriptionIdVar);
+        badProject.getVariables().add(brokerEndpointVar);
+        badProject.getVariables().add(channelExecutionVar);
+        context.computeAndSetTypeEnvironmentForOperator(badProject);
+
+        //Create my brokerNotify plan above the extension Operator
+        ExtensionOperator dOp = createNotifyBrokerPlan(brokerEndpointVar, subscriptionIdVar, channelExecutionVar,
+                context, eOp, (DistributeResultOperator) op1, channelDataverse, channelName);
+
+        opRef.setValue(dOp);
+
+        return true;
+    }
+
+    private ExtensionOperator createNotifyBrokerPlan(LogicalVariable brokerEndpointVar,
+            LogicalVariable subscriptionIdVar, LogicalVariable channelExecutionVar, IOptimizationContext context,
+            ILogicalOperator eOp, DistributeResultOperator distributeOp, String channelDataverse, String channelName)
+                    throws AlgebricksException {
+        //create the Distinct Op
+        ArrayList<Mutable<ILogicalExpression>> expressions = new ArrayList<Mutable<ILogicalExpression>>();
+        VariableReferenceExpression vExpr = new VariableReferenceExpression(subscriptionIdVar);
+        expressions.add(new MutableObject<ILogicalExpression>(vExpr));
+        DistinctOperator distinctOp = new DistinctOperator(expressions);
+
+        //create the GroupBy Op
+        //And set the distinct as input
+        List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> groupByList = new ArrayList<Pair<LogicalVariable, Mutable<ILogicalExpression>>>();
+        List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> groupByDecorList = new ArrayList<Pair<LogicalVariable, Mutable<ILogicalExpression>>>();
+        List<ILogicalPlan> nestedPlans = new ArrayList<ILogicalPlan>();
+
+        //create group by operator
+        GroupByOperator groupbyOp = new GroupByOperator(groupByList, groupByDecorList, nestedPlans);
+        groupbyOp.addGbyExpression(null, new VariableReferenceExpression(brokerEndpointVar));
+        groupbyOp.addGbyExpression(null, new VariableReferenceExpression(channelExecutionVar));
+        groupbyOp.getInputs().add(new MutableObject<ILogicalOperator>(distinctOp));
+
+        //create nested plan for subscription ids in group by
+        NestedTupleSourceOperator nestedTupleSourceOp = new NestedTupleSourceOperator(
+                new MutableObject<ILogicalOperator>(groupbyOp));
+        //TODO: This is from translationcontext. It might be needed to make the variable exist outside of the subplan
+        //LogicalVariable subscriptionListVar = context.newSubplanOutputVar();
+        LogicalVariable subscriptionListVar = context.newVar();
+        List<LogicalVariable> aggVars = new ArrayList<LogicalVariable>();
+        aggVars.add(subscriptionListVar);
+        AggregateFunctionCallExpression funAgg = AsterixBuiltinFunctions.makeAggregateFunctionExpression(
+                AsterixBuiltinFunctions.LISTIFY, new ArrayList<Mutable<ILogicalExpression>>());
+        funAgg.getArguments()
+                .add(new MutableObject<ILogicalExpression>(new VariableReferenceExpression(subscriptionIdVar)));
+        List<Mutable<ILogicalExpression>> aggExpressions = new ArrayList<Mutable<ILogicalExpression>>();
+        aggExpressions.add(new MutableObject<ILogicalExpression>(funAgg));
+        AggregateOperator listifyOp = new AggregateOperator(aggVars, aggExpressions);
+        listifyOp.getInputs().add(new MutableObject<ILogicalOperator>(nestedTupleSourceOp));
+
+        //add nested plans
+        nestedPlans.add(new ALogicalPlanImpl(new MutableObject<ILogicalOperator>(listifyOp)));
+
+        //Create the NotifyBrokerOperator
+        NotifyBrokerOperator notifyBrokerOp = new NotifyBrokerOperator(brokerEndpointVar, subscriptionListVar,
+                channelExecutionVar);
+        EntityId activeId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, channelDataverse, channelName);
+        NotifyBrokerPOperator notifyBrokerPOp = new NotifyBrokerPOperator(activeId);
+        notifyBrokerOp.setPhysicalOperator(notifyBrokerPOp);
+        ExtensionOperator extensionOp = new ExtensionOperator(notifyBrokerOp);
+        extensionOp.setPhysicalOperator(notifyBrokerPOp);
+        extensionOp.getInputs().add(new MutableObject<ILogicalOperator>(groupbyOp));
+
+        //Set the input for the brokerNotify as the replicate operator
+        distinctOp.getInputs().add(new MutableObject<ILogicalOperator>(eOp));
+
+        //compute environment bottom up
+
+        context.computeAndSetTypeEnvironmentForOperator(distinctOp);
+        context.computeAndSetTypeEnvironmentForOperator(groupbyOp);
+        context.computeAndSetTypeEnvironmentForOperator(nestedTupleSourceOp);
+        context.computeAndSetTypeEnvironmentForOperator(listifyOp);
+        context.computeAndSetTypeEnvironmentForOperator(extensionOp);
+
+        return extensionOp;
+
+    }
+
+    @SuppressWarnings("unchecked")
+    private AssignOperator createbrokerEndPointAssignOperator(LogicalVariable brokerEndpointVar,
+            AbstractLogicalOperator opAboveBrokersScan) {
+        Mutable<ILogicalExpression> fieldRef = new MutableObject<ILogicalExpression>(
+                new ConstantExpression(new AsterixConstantValue(new AString(BADConstants.BrokerEndPoint))));
+        DataSourceScanOperator brokerScan = null;
+        for (Mutable<ILogicalOperator> subOp : opAboveBrokersScan.getInputs()) {
+            if (isBrokerScan((AbstractLogicalOperator) subOp.getValue())) {
+                brokerScan = (DataSourceScanOperator) subOp.getValue();
+            }
+        }
+        Mutable<ILogicalExpression> varRef = new MutableObject<ILogicalExpression>(
+                new VariableReferenceExpression(brokerScan.getVariables().get(2)));
+
+        ScalarFunctionCallExpression fieldAccessByName = new ScalarFunctionCallExpression(
+                FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.FIELD_ACCESS_BY_NAME), varRef, fieldRef);
+        ArrayList<LogicalVariable> varArray = new ArrayList<LogicalVariable>(1);
+        varArray.add(brokerEndpointVar);
+        ArrayList<Mutable<ILogicalExpression>> exprArray = new ArrayList<Mutable<ILogicalExpression>>(1);
+        exprArray.add(new MutableObject<ILogicalExpression>(fieldAccessByName));
+        return new AssignOperator(varArray, exprArray);
+    }
+
+    /*This function searches for the needed op
+     * If lookingForBrokers, find the op above the brokers scan
+     * Else find the suscbriptionsScan
+     */
+    private AbstractLogicalOperator findOp(AbstractLogicalOperator op, String lookingForString) {
+        if (!op.hasInputs()) {
+            return null;
+        }
+        for (Mutable<ILogicalOperator> subOp : op.getInputs()) {
+            if (lookingForString.equals("brokers")) {
+                if (isBrokerScan((AbstractLogicalOperator) subOp.getValue())) {
+                    return op;
+                } else {
+                    AbstractLogicalOperator nestedOp = findOp((AbstractLogicalOperator) subOp.getValue(),
+                            lookingForString);
+                    if (nestedOp != null) {
+                        return nestedOp;
+                    }
+                }
+
+            } else if (lookingForString.equals("project")) {
+                if (subOp.getValue().getOperatorTag() == LogicalOperatorTag.PROJECT) {
+                    return (AbstractLogicalOperator) subOp.getValue();
+                } else {
+                    AbstractLogicalOperator nestedOp = findOp((AbstractLogicalOperator) subOp.getValue(),
+                            lookingForString);
+                    if (nestedOp != null) {
+                        return nestedOp;
+                    }
+                }
+            }
+
+            else {
+                if (isSubscriptionsScan((AbstractLogicalOperator) subOp.getValue(), lookingForString)) {
+                    return (AbstractLogicalOperator) subOp.getValue();
+                } else {
+                    AbstractLogicalOperator nestedOp = findOp((AbstractLogicalOperator) subOp.getValue(),
+                            lookingForString);
+                    if (nestedOp != null) {
+                        return nestedOp;
+                    }
+                }
+
+            }
+        }
+        return null;
+    }
+
+    private boolean isBrokerScan(AbstractLogicalOperator op) {
+        if (op instanceof DataSourceScanOperator) {
+            if (((DataSourceScanOperator) op).getDataSource() instanceof DatasetDataSource) {
+                DatasetDataSource dds = (DatasetDataSource) ((DataSourceScanOperator) op).getDataSource();
+                if (dds.getDataset().getDataverseName().equals("Metadata")
+                        && dds.getDataset().getDatasetName().equals("Broker")) {
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
+
+    private boolean isSubscriptionsScan(AbstractLogicalOperator op, String subscriptionsName) {
+        if (op instanceof DataSourceScanOperator) {
+            if (((DataSourceScanOperator) op).getDataSource() instanceof DatasetDataSource) {
+                DatasetDataSource dds = (DatasetDataSource) ((DataSourceScanOperator) op).getDataSource();
+                if (dds.getDataset().getItemTypeDataverseName().equals("Metadata")
+                        && dds.getDataset().getItemTypeName().equals("ChannelSubscriptionsType")) {
+                    if (dds.getDataset().getDatasetName().equals(subscriptionsName)) {
+                        return true;
+                    }
+                }
+            }
+        }
+        return false;
+    }
+
+}
diff --git a/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerOperator.java b/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerOperator.java
new file mode 100644
index 0000000..c680988
--- /dev/null
+++ b/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerOperator.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.runtime;
+
+import java.util.Collection;
+
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractExtensibleLogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorExtension;
+import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
+
+/**
+ * A repetitive channel operator, which uses a Java timer to run a given query periodically
+ */
+public class NotifyBrokerOperator extends AbstractExtensibleLogicalOperator {
+    private final LogicalVariable subscriptionIdVar;
+    private final LogicalVariable brokerEndpointVar;
+    private final LogicalVariable channelExecutionVar;
+
+    public NotifyBrokerOperator(LogicalVariable brokerEndpointVar, LogicalVariable subscriptionIdVar,
+            LogicalVariable resultSetVar) {
+        this.brokerEndpointVar = brokerEndpointVar;
+        this.subscriptionIdVar = subscriptionIdVar;
+        this.channelExecutionVar = resultSetVar;
+    }
+
+    public LogicalVariable getSubscriptionVariable() {
+        return subscriptionIdVar;
+    }
+
+    public LogicalVariable getBrokerEndpointVariable() {
+        return brokerEndpointVar;
+    }
+
+    public LogicalVariable getChannelExecutionVariable() {
+        return channelExecutionVar;
+    }
+
+    @Override
+    public String toString() {
+        return "notify-brokers";
+    }
+
+    @Override
+    public boolean isMap() {
+        return false;
+    }
+
+    @Override
+    public IOperatorExtension newInstance() {
+        return new NotifyBrokerOperator(brokerEndpointVar, subscriptionIdVar, channelExecutionVar);
+    }
+
+    @Override
+    public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform transform)
+            throws AlgebricksException {
+        return false;
+    }
+
+    @Override
+    public void getUsedVariables(Collection<LogicalVariable> usedVars) {
+        usedVars.add(subscriptionIdVar);
+        usedVars.add(brokerEndpointVar);
+        usedVars.add(channelExecutionVar);
+    }
+
+    @Override
+    public void getProducedVariables(Collection<LogicalVariable> producedVars) {
+        // none produced
+
+    }
+
+}
diff --git a/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerPOperator.java b/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerPOperator.java
new file mode 100644
index 0000000..753ece7
--- /dev/null
+++ b/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerPOperator.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.bad.runtime;
+
+import org.apache.asterix.active.EntityId;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractPhysicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.evaluators.ColumnAccessEvalFactory;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+
+public class NotifyBrokerPOperator extends AbstractPhysicalOperator {
+
+    private final EntityId entityId;
+
+    public NotifyBrokerPOperator(EntityId entityId) {
+        this.entityId = entityId;
+    }
+
+    @Override
+    public PhysicalOperatorTag getOperatorTag() {
+        return PhysicalOperatorTag.EXTENSION_OPERATOR;
+    }
+
+    @Override
+    public String toString() {
+        return "NOTIFY_BROKERS";
+    }
+
+    @Override
+    public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
+        return emptyUnaryRequirements();
+    }
+
+    @Override
+    public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context)
+            throws AlgebricksException {
+        AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
+        deliveredProperties = op2.getDeliveredPhysicalProperties().clone();
+    }
+
+    @Override
+    public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+            IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+                    throws AlgebricksException {
+        ExtensionOperator notify = (ExtensionOperator) op;
+        LogicalVariable subVar = ((NotifyBrokerOperator) notify.getDelegate()).getSubscriptionVariable();
+        LogicalVariable brokerVar = ((NotifyBrokerOperator) notify.getDelegate()).getBrokerEndpointVariable();
+        LogicalVariable executionVar = ((NotifyBrokerOperator) notify.getDelegate()).getChannelExecutionVariable();
+
+        int brokerColumn = inputSchemas[0].findVariable(brokerVar);
+        int subColumn = inputSchemas[0].findVariable(subVar);
+        int executionColumn = inputSchemas[0].findVariable(executionVar);
+
+        IScalarEvaluatorFactory brokerEvalFactory = new ColumnAccessEvalFactory(brokerColumn);
+        IScalarEvaluatorFactory subEvalFactory = new ColumnAccessEvalFactory(subColumn);
+        IScalarEvaluatorFactory channelExecutionEvalFactory = new ColumnAccessEvalFactory(executionColumn);
+
+        NotifyBrokerRuntimeFactory runtime = new NotifyBrokerRuntimeFactory(brokerEvalFactory, subEvalFactory,
+                channelExecutionEvalFactory, entityId);
+
+        RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema,
+                context);
+
+        builder.contributeMicroOperator(op, runtime, recDesc);
+
+        // and contribute one edge from its child
+        ILogicalOperator src = op.getInputs().get(0).getValue();
+        builder.contributeGraphEdge(src, 0, notify, 0);
+    }
+
+    @Override
+    public boolean isMicroOperator() {
+        return true;
+    }
+
+    @Override
+    public boolean expensiveThanMaterialization() {
+        return false;
+    }
+}
diff --git a/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java b/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java
new file mode 100644
index 0000000..d55080c
--- /dev/null
+++ b/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.bad.runtime;
+
+import java.io.DataInputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.asterix.active.ActiveManager;
+import org.apache.asterix.active.EntityId;
+import org.apache.asterix.bad.ChannelJobService;
+import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
+import org.apache.asterix.dataflow.data.nontagged.serde.ADateTimeSerializerDeserializer;
+import org.apache.asterix.dataflow.data.nontagged.serde.AOrderedListSerializerDeserializer;
+import org.apache.asterix.dataflow.data.nontagged.serde.AStringSerializerDeserializer;
+import org.apache.asterix.om.base.ADateTime;
+import org.apache.asterix.om.base.AOrderedList;
+import org.apache.asterix.om.base.AString;
+import org.apache.asterix.om.types.AOrderedListType;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
+import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+
+public class NotifyBrokerRuntime extends AbstractOneInputOneOutputOneFramePushRuntime {
+
+    private final ByteBufferInputStream bbis = new ByteBufferInputStream();
+    private final DataInputStream di = new DataInputStream(bbis);
+    private final AOrderedListSerializerDeserializer subSerDes = new AOrderedListSerializerDeserializer(
+            new AOrderedListType(BuiltinType.AUUID, null));
+
+    private IPointable inputArg0 = new VoidPointable();
+    private IPointable inputArg1 = new VoidPointable();
+    private IPointable inputArg2 = new VoidPointable();
+    private IScalarEvaluator eval0;
+    private IScalarEvaluator eval1;
+    private IScalarEvaluator eval2;
+    private final ActiveManager activeManager;
+    private final EntityId entityId;
+    private ChannelJobService channelJobService;
+
+    public NotifyBrokerRuntime(IHyracksTaskContext ctx, IScalarEvaluatorFactory brokerEvalFactory,
+            IScalarEvaluatorFactory subEvalFactory, IScalarEvaluatorFactory channelExecutionEvalFactory,
+            EntityId activeJobId) throws AlgebricksException {
+        this.tRef = new FrameTupleReference();
+        eval0 = brokerEvalFactory.createScalarEvaluator(ctx);
+        eval1 = subEvalFactory.createScalarEvaluator(ctx);
+        eval2 = channelExecutionEvalFactory.createScalarEvaluator(ctx);
+        this.activeManager = (ActiveManager) ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext()
+                .getApplicationObject()).getActiveManager();
+        this.entityId = activeJobId;
+        channelJobService = new ChannelJobService();
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        return;
+    }
+
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        tAccess.reset(buffer);
+        int nTuple = tAccess.getTupleCount();
+        for (int t = 0; t < nTuple; t++) {
+            tRef.reset(tAccess, t);
+            try {
+                eval0.evaluate(tRef, inputArg0);
+                eval1.evaluate(tRef, inputArg1);
+                eval2.evaluate(tRef, inputArg2);
+            } catch (AlgebricksException e) {
+                throw new HyracksDataException(e);
+            }
+            int serBrokerOffset = inputArg0.getStartOffset();
+            bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), serBrokerOffset + 1);
+            AString endpoint = AStringSerializerDeserializer.INSTANCE.deserialize(di);
+
+            int serSubOffset = inputArg1.getStartOffset();
+            bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), serSubOffset + 1);
+            AOrderedList subs = subSerDes.deserialize(di);
+
+            int resultSetOffset = inputArg2.getStartOffset();
+            bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), resultSetOffset + 1);
+            ADateTime executionTime = ADateTimeSerializerDeserializer.INSTANCE.deserialize(di);
+            String executionTimeString = executionTime.toSimpleString();
+
+            channelJobService.sendBrokerNotificationsForChannel(entityId, endpoint.getStringValue(), subs,
+                    executionTimeString);
+
+        }
+
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        return;
+    }
+
+    @Override
+    public void setInputRecordDescriptor(int index, RecordDescriptor recordDescriptor) {
+        this.inputRecordDesc = recordDescriptor;
+        this.tAccess = new FrameTupleAccessor(inputRecordDesc);
+    }
+
+    @Override
+    public void flush() throws HyracksDataException {
+        return;
+    }
+
+    @Override
+    public void fail() throws HyracksDataException {
+        failed = true;
+    }
+}
diff --git a/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntimeFactory.java b/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntimeFactory.java
new file mode 100644
index 0000000..d5452d4
--- /dev/null
+++ b/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntimeFactory.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.bad.runtime;
+
+import org.apache.asterix.active.EntityId;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+
+public class NotifyBrokerRuntimeFactory implements IPushRuntimeFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    private final IScalarEvaluatorFactory brokerEvalFactory;
+    private final IScalarEvaluatorFactory subEvalFactory;
+    private final IScalarEvaluatorFactory channelExecutionEvalFactory;
+    private final EntityId entityId;
+
+    public NotifyBrokerRuntimeFactory(IScalarEvaluatorFactory brokerEvalFactory, IScalarEvaluatorFactory subEvalFactory,
+            IScalarEvaluatorFactory channelExecutionEvalFactory, EntityId entityId) {
+        this.brokerEvalFactory = brokerEvalFactory;
+        this.subEvalFactory = subEvalFactory;
+        this.channelExecutionEvalFactory = channelExecutionEvalFactory;
+        this.entityId = entityId;
+    }
+
+    @Override
+    public String toString() {
+        return "notify-broker";
+    }
+
+    @Override
+    public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) throws AlgebricksException {
+        return new NotifyBrokerRuntime(ctx, brokerEvalFactory, subEvalFactory, channelExecutionEvalFactory, entityId);
+    }
+}
diff --git a/src/main/java/org/apache/asterix/bad/runtime/RepetitiveChannelOperatorDescriptor.java b/src/main/java/org/apache/asterix/bad/runtime/RepetitiveChannelOperatorDescriptor.java
new file mode 100644
index 0000000..f3b0a90
--- /dev/null
+++ b/src/main/java/org/apache/asterix/bad/runtime/RepetitiveChannelOperatorDescriptor.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.asterix.bad.runtime;
+
+import java.util.logging.Logger;
+
+import org.apache.asterix.active.ActiveRuntimeId;
+import org.apache.asterix.active.EntityId;
+import org.apache.asterix.bad.BADConstants;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+
+/**
+ * A repetitive channel operator, which uses a Java timer to run a given query periodically
+ */
+public class RepetitiveChannelOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOGGER = Logger.getLogger(RepetitiveChannelOperatorDescriptor.class.getName());
+
+    /** The unique identifier of the job. **/
+    protected final EntityId entityId;
+
+    protected final JobSpecification jobSpec;
+
+    private final String duration;
+
+    private String strIP;
+    private int port;
+
+    public RepetitiveChannelOperatorDescriptor(JobSpecification spec, String dataverseName, String channelName,
+            String duration, JobSpecification channeljobSpec, String strIP, int port) {
+        super(spec, 0, 0);
+        this.entityId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, dataverseName, channelName);
+        this.jobSpec = channeljobSpec;
+        this.duration = duration;
+        this.strIP = strIP;
+        this.port = port;
+    }
+
+    @Override
+    public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
+        ActiveRuntimeId runtimeId = new ActiveRuntimeId(entityId,
+                RepetitiveChannelOperatorNodePushable.class.getSimpleName(), partition);
+        try {
+            return new RepetitiveChannelOperatorNodePushable(ctx, runtimeId, jobSpec, duration, strIP, port);
+        } catch (AsterixException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    public String getDuration() {
+        return duration;
+    }
+
+    public EntityId getEntityId() {
+        return entityId;
+    }
+
+    public JobSpecification getJobSpec() {
+        return jobSpec;
+    }
+
+}
diff --git a/src/main/java/org/apache/asterix/bad/runtime/RepetitiveChannelOperatorNodePushable.java b/src/main/java/org/apache/asterix/bad/runtime/RepetitiveChannelOperatorNodePushable.java
new file mode 100644
index 0000000..873d2e7
--- /dev/null
+++ b/src/main/java/org/apache/asterix/bad/runtime/RepetitiveChannelOperatorNodePushable.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.runtime;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Logger;
+
+import org.apache.asterix.active.ActiveRuntimeId;
+import org.apache.asterix.active.ActiveSourceOperatorNodePushable;
+import org.apache.asterix.bad.ChannelJobService;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobSpecification;
+
+public class RepetitiveChannelOperatorNodePushable extends ActiveSourceOperatorNodePushable {
+
+    private static final Logger LOGGER = Logger.getLogger(RepetitiveChannelOperatorNodePushable.class.getName());
+
+    private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
+    private final JobSpecification jobSpec;
+    private long duration;
+    private ChannelJobService channelJobService;
+    private String strIP;
+    private int port;
+
+    public RepetitiveChannelOperatorNodePushable(IHyracksTaskContext ctx, ActiveRuntimeId runtimeId,
+            JobSpecification channeljobSpec, String duration, String strIP, int port) throws AsterixException {
+        super(ctx, runtimeId);
+        this.jobSpec = channeljobSpec;
+        this.duration = findPeriod(duration);
+        //TODO: we should share channelJobService as a single instance
+        //And only create one hcc
+        channelJobService = new ChannelJobService();
+        this.strIP = strIP;
+        this.port = port;
+    }
+
+    public void executeJob() throws Exception {
+        LOGGER.info("Executing Job: " + runtimeId.toString());
+        channelJobService.runChannelJob(jobSpec, strIP, port);
+    }
+
+    @Override
+    public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc)
+            throws HyracksDataException {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    protected void start() throws HyracksDataException, InterruptedException {
+        scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    executeJob();
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        }, duration, duration, TimeUnit.MILLISECONDS);
+
+        while (!scheduledExecutorService.isTerminated()) {
+
+        }
+
+    }
+
+    @Override
+    protected void abort() throws HyracksDataException, InterruptedException {
+        scheduledExecutorService.shutdown();
+    }
+
+    private long findPeriod(String duration) {
+        //TODO: Allow Repetitive Channels to use YMD durations  
+        String hoursMinutesSeconds = "";
+        if (duration.indexOf('T') != -1) {
+            hoursMinutesSeconds = duration.substring(duration.indexOf('T') + 1);
+        }
+        double seconds = 0;
+        if (hoursMinutesSeconds != "") {
+            int pos = 0;
+            if (hoursMinutesSeconds.indexOf('H') != -1) {
+                Double hours = Double.parseDouble(hoursMinutesSeconds.substring(pos, hoursMinutesSeconds.indexOf('H')));
+                seconds += (hours * 60 * 60);
+                pos = hoursMinutesSeconds.indexOf('H') + 1;
+
+            }
+            if (hoursMinutesSeconds.indexOf('M') != -1) {
+                Double minutes = Double
+                        .parseDouble(hoursMinutesSeconds.substring(pos, hoursMinutesSeconds.indexOf('M')));
+                seconds += (minutes * 60);
+                pos = hoursMinutesSeconds.indexOf('M') + 1;
+            }
+            if (hoursMinutesSeconds.indexOf('S') != -1) {
+                Double s = Double.parseDouble(hoursMinutesSeconds.substring(pos, hoursMinutesSeconds.indexOf('S')));
+                seconds += (s);
+            }
+
+        }
+        return (long) (seconds * 1000);
+    }
+
+}
diff --git a/src/main/resources/lang-extension/lang.txt b/src/main/resources/lang-extension/lang.txt
new file mode 100644
index 0000000..233ec97
--- /dev/null
+++ b/src/main/resources/lang-extension/lang.txt
@@ -0,0 +1,178 @@
+import org.apache.asterix.bad.lang.statement.BrokerDropStatement;
+import org.apache.asterix.bad.lang.statement.ChannelDropStatement;
+import org.apache.asterix.bad.lang.statement.ChannelSubscribeStatement;
+import org.apache.asterix.bad.lang.statement.ChannelUnsubscribeStatement;
+import org.apache.asterix.bad.lang.statement.CreateBrokerStatement;
+import org.apache.asterix.bad.lang.statement.CreateChannelStatement;
+
+
+@merge
+Statement SingleStatement() throws ParseException:
+{
+  // merge area 1
+  before:
+  after:
+}
+{
+  (
+    // merge area 2
+    before:
+    after:    | stmt = ChannelSubscriptionStatement())
+  {
+    // merge area 3
+  }
+}
+
+@merge
+Statement CreateStatement() throws ParseException:
+{
+  // merge area 1
+  before:
+  after:
+}
+{
+  (
+    // merge area 2
+    before:
+    after:    | stmt = ChannelSpecification() | stmt = BrokerSpecification())
+  {
+    // merge area 3
+  }
+}
+
+@merge
+Statement DropStatement() throws ParseException:
+{
+  // merge area 1
+  before:
+  after:
+}
+{
+  (
+    // merge area 2
+    before:
+    after:    | "channel" pairId = QualifiedName() ifExists = IfExists()
+      {
+        stmt = new ChannelDropStatement(pairId.first, pairId.second, ifExists);
+      }
+      	      | <BROKER> pairId = QualifiedName() ifExists = IfExists()	
+      {	
+        stmt = new BrokerDropStatement(pairId.first, pairId.second, ifExists);	
+      }
+      )
+  {
+    // merge area 3
+  }
+}
+
+@new
+CreateChannelStatement ChannelSpecification() throws ParseException:
+{
+  Pair<Identifier,Identifier> nameComponents = null;
+  FunctionSignature appliedFunction = null;
+  CreateChannelStatement ccs = null;
+  String fqFunctionName = null;
+  Expression period = null;
+}
+{
+  (
+    "repetitive" "channel"  nameComponents = QualifiedName()
+    <USING> appliedFunction = FunctionSignature()
+    "period" period = FunctionCallExpr()
+    {
+      ccs = new CreateChannelStatement(nameComponents.first,
+                                   nameComponents.second, appliedFunction, period);
+    }
+  )
+    {
+      return ccs;
+    }
+}
+
+@new
+CreateBrokerStatement BrokerSpecification() throws ParseException:
+{
+  CreateBrokerStatement cbs = null;
+  Pair<Identifier,Identifier> name = null;
+  String endPoint = null;
+}
+{
+  (
+    <BROKER>  name = QualifiedName()
+    <AT>  endPoint = StringLiteral()
+    {
+      cbs = new CreateBrokerStatement(name.first, name.second,endPoint);
+    }
+  )
+    {
+      return cbs;
+    }
+}
+
+@new
+Statement ChannelSubscriptionStatement() throws ParseException:
+{
+  Statement stmt = null;
+  Pair<Identifier,Identifier> nameComponents = null;
+  List<Expression> argList = new ArrayList<Expression>();
+  Expression tmp = null;
+  String id = null;
+  String subscriptionId = null;
+  Pair<Identifier,Identifier> brokerName = null;
+}
+{
+  (
+  "subscribe" <TO> nameComponents = QualifiedName()
+   <LEFTPAREN> (tmp = Expression()
+   {
+      argList.add(tmp);
+   }
+   (<COMMA> tmp = Expression()
+   {
+      argList.add(tmp);
+   }
+   )*)? <RIGHTPAREN> <ON> brokerName = QualifiedName()
+   {
+      stmt = new ChannelSubscribeStatement(nameComponents.first, nameComponents.second, argList, getVarCounter(), brokerName.first, brokerName.second, subscriptionId);
+   }
+   | "unsubscribe" id = StringLiteral() <FROM> nameComponents = QualifiedName()
+      {
+        setDataverses(new ArrayList<String>());
+        setDatasets(new ArrayList<String>());
+        VariableExpr varExp = new VariableExpr();
+        VarIdentifier var = new VarIdentifier();
+        varExp.setVar(var);
+        var.setValue("$subscriptionPlaceholder");
+        getCurrentScope().addNewVarSymbolToScope(varExp.getVar());
+        List<String> dataverses = getDataverses();
+        List<String> datasets = getDatasets();
+        // we remove the pointer to the dataverses and datasets
+        setDataverses(null);
+        setDatasets(null);
+        stmt = new ChannelUnsubscribeStatement(varExp, nameComponents.first, nameComponents.second, id, getVarCounter(), dataverses, datasets);
+      }
+     | "change" "subscription" subscriptionId = StringLiteral()  <ON> nameComponents = QualifiedName()
+       <LEFTPAREN> (tmp = Expression()
+       {
+         argList.add(tmp);
+       }
+       (<COMMA> tmp = Expression()
+       {
+         argList.add(tmp);
+       }
+       )*)? <RIGHTPAREN>
+        <TO> brokerName = QualifiedName()
+      {
+        stmt = new ChannelSubscribeStatement(nameComponents.first, nameComponents.second, argList, getVarCounter(), brokerName.first, brokerName.second, subscriptionId);
+      }
+    )
+    {
+      return stmt;
+    }
+}
+
+<DEFAULT,IN_DBL_BRACE>
+TOKEN [IGNORE_CASE]:
+{
+    <BROKER : "broker">
+}
\ No newline at end of file
diff --git a/src/test/java/org/apache/asterix/bad/test/BADExecutionTest.java b/src/test/java/org/apache/asterix/bad/test/BADExecutionTest.java
new file mode 100644
index 0000000..77e8afe
--- /dev/null
+++ b/src/test/java/org/apache/asterix/bad/test/BADExecutionTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.test;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.config.AsterixTransactionProperties;
+import org.apache.asterix.test.aql.TestExecutor;
+import org.apache.asterix.test.runtime.ExecutionTestUtil;
+import org.apache.asterix.testframework.context.TestCaseContext;
+import org.apache.asterix.testframework.xml.TestGroup;
+import org.apache.commons.lang3.StringUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+/**
+ * Runs the runtime test cases under 'src/test/resources/runtimets'.
+ */
+@RunWith(Parameterized.class)
+public class BADExecutionTest {
+
+    protected static final Logger LOGGER = Logger.getLogger(BADExecutionTest.class.getName());
+
+    protected static final String PATH_ACTUAL = "target/rttest" + File.separator;
+    protected static final String PATH_BASE = StringUtils.join(new String[] { "src", "test", "resources", "runtimets" },
+            File.separator);
+
+    protected static final String TEST_CONFIG_FILE_NAME = "src/test/resources/conf/asterix-build-configuration.xml";
+
+    protected static AsterixTransactionProperties txnProperties;
+    private static final TestExecutor testExecutor = new TestExecutor();
+    private static final boolean cleanupOnStart = true;
+    private static final boolean cleanupOnStop = true;
+
+    protected static TestGroup FailedGroup;
+
+    @BeforeClass
+    public static void setUp() throws Exception {
+        File outdir = new File(PATH_ACTUAL);
+        outdir.mkdirs();
+        ExecutionTestUtil.setUp(cleanupOnStart, TEST_CONFIG_FILE_NAME);
+    }
+
+    @AfterClass
+    public static void tearDown() throws Exception {
+        ExecutionTestUtil.tearDown(cleanupOnStop);
+        ExecutionTestUtil.integrationUtil.removeTestStorageFiles();
+    }
+
+    @Parameters(name = "BADExecutionTest {index}: {0}")
+    public static Collection<Object[]> tests() throws Exception {
+        return buildTestsInXml("testsuite.xml");
+    }
+
+    protected static Collection<Object[]> buildTestsInXml(String xmlfile) throws Exception {
+        Collection<Object[]> testArgs = new ArrayList<Object[]>();
+        TestCaseContext.Builder b = new TestCaseContext.Builder();
+        for (TestCaseContext ctx : b.build(new File(PATH_BASE), xmlfile)) {
+            testArgs.add(new Object[] { ctx });
+        }
+        return testArgs;
+
+    }
+
+    protected TestCaseContext tcCtx;
+
+    public BADExecutionTest(TestCaseContext tcCtx) {
+        this.tcCtx = tcCtx;
+    }
+
+    @Test
+    public void test() throws Exception {
+        testExecutor.executeTest(PATH_ACTUAL, tcCtx, null, false, FailedGroup);
+    }
+}
diff --git a/src/test/java/org/apache/asterix/bad/test/BADOptimizerTest.java b/src/test/java/org/apache/asterix/bad/test/BADOptimizerTest.java
new file mode 100644
index 0000000..4949b34
--- /dev/null
+++ b/src/test/java/org/apache/asterix/bad/test/BADOptimizerTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.test;
+
+import java.io.File;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.config.GlobalConfig;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.IdentitiyResolverFactory;
+import org.apache.asterix.test.optimizer.OptimizerTest;
+import org.junit.BeforeClass;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class BADOptimizerTest extends OptimizerTest {
+
+    private static final Logger LOGGER = Logger.getLogger(BADOptimizerTest.class.getName());
+
+    @BeforeClass
+    public static void setUp() throws Exception {
+        TEST_CONFIG_FILE_NAME = "src/test/resources/conf/asterix-build-configuration.xml";
+        System.setProperty(GlobalConfig.CONFIG_FILE_PROPERTY, TEST_CONFIG_FILE_NAME);
+        final File outdir = new File(PATH_ACTUAL);
+        outdir.mkdirs();
+
+        integrationUtil.init(true);
+        // Set the node resolver to be the identity resolver that expects node names
+        // to be node controller ids; a valid assumption in test environment.
+        System.setProperty(ExternalDataConstants.NODE_RESOLVER_FACTORY_PROPERTY,
+                IdentitiyResolverFactory.class.getName());
+    }
+
+    public BADOptimizerTest(File queryFile, File expectedFile, File actualFile) {
+        super(queryFile, expectedFile, actualFile);
+    }
+
+}
diff --git a/src/test/resources/conf/asterix-build-configuration.xml b/src/test/resources/conf/asterix-build-configuration.xml
new file mode 100644
index 0000000..c2f5d41
--- /dev/null
+++ b/src/test/resources/conf/asterix-build-configuration.xml
@@ -0,0 +1,110 @@
+<!--
+ ! Licensed to the Apache Software Foundation (ASF) under one
+ ! or more contributor license agreements.  See the NOTICE file
+ ! distributed with this work for additional information
+ ! regarding copyright ownership.  The ASF licenses this file
+ ! to you under the Apache License, Version 2.0 (the
+ ! "License"); you may not use this file except in compliance
+ ! with the License.  You may obtain a copy of the License at
+ !
+ !   http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing,
+ ! software distributed under the License is distributed on an
+ ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ! KIND, either express or implied.  See the License for the
+ ! specific language governing permissions and limitations
+ ! under the License.
+ !-->
+<asterixConfiguration xmlns="asterixconf">
+  <metadataNode>asterix_nc1</metadataNode>
+  <store>
+    <ncId>asterix_nc1</ncId>
+    <storeDirs>iodevice0,iodevice1</storeDirs>
+  </store>
+  <store>
+    <ncId>asterix_nc2</ncId>
+    <storeDirs>iodevice0,iodevice1</storeDirs>
+  </store>
+  <transactionLogDir>
+    <ncId>asterix_nc1</ncId>
+    <txnLogDirPath>target/txnLogDir/asterix_nc1</txnLogDirPath>
+  </transactionLogDir>
+  <transactionLogDir>
+    <ncId>asterix_nc2</ncId>
+    <txnLogDirPath>target/txnLogDir/asterix_nc2</txnLogDirPath>
+  </transactionLogDir>
+  <extensions>
+    <extension>
+      <extensionClassName>org.apache.asterix.bad.lang.BADQueryTranslatorExtension</extensionClassName>
+    </extension>
+    <extension>
+      <extensionClassName>org.apache.asterix.bad.lang.BADLangExtension</extensionClassName>
+    </extension>
+    <extension>
+      <extensionClassName>org.apache.asterix.bad.metadata.BADMetadataExtension</extensionClassName>
+    </extension>
+  </extensions>
+  <property>
+    <name>max.wait.active.cluster</name>
+    <value>60</value>
+    <description>Maximum wait (in seconds) for a cluster to be ACTIVE (all
+      nodes are available)
+      before a submitted query/statement can be
+      executed. (Default = 60 seconds)
+    </description>
+  </property>
+  <property>
+    <name>log.level</name>
+    <value>WARNING</value>
+    <description>Log level for running tests/build</description>
+  </property>
+  <property>
+    <name>compiler.framesize</name>
+    <value>32768</value>
+  </property>
+  <property>
+    <name>compiler.sortmemory</name>
+    <value>327680</value>
+  </property>
+  <property>
+    <name>compiler.groupmemory</name>
+    <value>163840</value>
+  </property>
+  <property>
+    <name>compiler.joinmemory</name>
+    <value>163840</value>
+  </property>
+  <property>
+    <name>compiler.pregelix.home</name>
+    <value>~/pregelix</value>
+  </property>
+  <property>
+    <name>storage.buffercache.pagesize</name>
+    <value>32768</value>
+    <description>The page size in bytes for pages in the buffer cache.
+      (Default = "32768" // 32KB)
+    </description>
+  </property>
+  <property>
+    <name>storage.buffercache.size</name>
+    <value>33554432</value>
+    <description>The size of memory allocated to the disk buffer cache.
+      The value should be a multiple of the buffer cache page size(Default
+      = "33554432" // 32MB)
+    </description>
+  </property>
+  <property>
+    <name>storage.memorycomponent.numpages</name>
+    <value>8</value>
+    <description>The number of pages to allocate for a memory component.
+      (Default = 8)
+    </description>
+  </property>
+  <property>
+    <name>plot.activate</name>
+    <value>false</value>
+    <description>Enabling plot of Algebricks plan to tmp folder. (Default = false)
+    </description>
+  </property>
+</asterixConfiguration>
diff --git a/src/test/resources/conf/cluster.xml b/src/test/resources/conf/cluster.xml
new file mode 100644
index 0000000..8f0b694
--- /dev/null
+++ b/src/test/resources/conf/cluster.xml
@@ -0,0 +1,49 @@
+<!--
+ ! Licensed to the Apache Software Foundation (ASF) under one
+ ! or more contributor license agreements.  See the NOTICE file
+ ! distributed with this work for additional information
+ ! regarding copyright ownership.  The ASF licenses this file
+ ! to you under the Apache License, Version 2.0 (the
+ ! "License"); you may not use this file except in compliance
+ ! with the License.  You may obtain a copy of the License at
+ !
+ !   http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing,
+ ! software distributed under the License is distributed on an
+ ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ! KIND, either express or implied.  See the License for the
+ ! specific language governing permissions and limitations
+ ! under the License.
+ !-->
+<cluster xmlns="cluster">
+  <instance_name>asterix</instance_name>
+  <store>storage</store>
+
+  <data_replication>
+    <enabled>false</enabled>
+    <replication_port>2016</replication_port>
+    <replication_factor>2</replication_factor>
+    <auto_failover>false</auto_failover>
+    <replication_time_out>30</replication_time_out>
+  </data_replication>
+
+  <master_node>
+    <id>master</id>
+    <client_ip>127.0.0.1</client_ip>
+    <cluster_ip>127.0.0.1</cluster_ip>
+    <client_port>1098</client_port>
+    <cluster_port>1099</cluster_port>
+    <http_port>8888</http_port>
+  </master_node>
+  <node>
+    <id>nc1</id>
+    <cluster_ip>127.0.0.1</cluster_ip>
+    <replication_port>2016</replication_port>
+  </node>
+  <node>
+    <id>nc2</id>
+    <cluster_ip>127.0.0.1</cluster_ip>
+    <replication_port>2017</replication_port>
+  </node>
+</cluster>
\ No newline at end of file
diff --git a/src/test/resources/conf/hyracks-deployment.properties b/src/test/resources/conf/hyracks-deployment.properties
new file mode 100644
index 0000000..17a6772
--- /dev/null
+++ b/src/test/resources/conf/hyracks-deployment.properties
@@ -0,0 +1,21 @@
+#/*
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+cc.bootstrap.class=org.apache.asterix.hyracks.bootstrap.CCBootstrapImpl
+nc.bootstrap.class=org.apache.asterix.hyracks.bootstrap.NCBootstrapImpl
+cc.ip=127.0.0.1
+cc.port=1098
diff --git a/src/test/resources/conf/test.properties b/src/test/resources/conf/test.properties
new file mode 100644
index 0000000..86269c8
--- /dev/null
+++ b/src/test/resources/conf/test.properties
@@ -0,0 +1,22 @@
+#/*
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+MetadataNode=nc1
+NewUniverse=true
+nc1.stores=nc1data
+nc2.stores=nc2data
+OutputDir=/tmp/asterix_output/
diff --git a/src/test/resources/optimizerts/queries/channel/channel-create.aql b/src/test/resources/optimizerts/queries/channel/channel-create.aql
new file mode 100644
index 0000000..4dc9291
--- /dev/null
+++ b/src/test/resources/optimizerts/queries/channel/channel-create.aql
@@ -0,0 +1,36 @@
+/*
+ * Description  : Check the Plan used by a channel
+ * Expected Res : Success
+ * Date         : Mar 2015
+ */
+
+drop dataverse channels if exists;
+create dataverse channels;
+use dataverse channels;
+
+
+create type TweetMessageTypeuuid as closed {
+  tweetid: uuid,
+  sender-location: point,
+  send-time: datetime,
+  referred-topics: {{ string }},
+  message-text: string,
+  countA: int32,
+  countB: int32
+}
+
+
+create dataset TweetMessageuuids(TweetMessageTypeuuid)
+primary key tweetid autogenerated;
+
+create function NearbyTweetsContainingText($location, $text) {
+  for $tweet in dataset TweetMessageuuids
+  let $circle := create-circle($location,30.0)
+  where contains($tweet.message-text,$text)
+  and spatial-intersect($tweet.sender-location, $location)
+  return $tweet.message-text 
+};
+
+write output to nc1:"rttest/channel-create.adm";
+
+create repetitive channel nearbyTweetChannel using NearbyTweetsContainingText@2 period duration("PT10M");
\ No newline at end of file
diff --git a/src/test/resources/optimizerts/queries/channel/channel-subscribe.aql b/src/test/resources/optimizerts/queries/channel/channel-subscribe.aql
new file mode 100644
index 0000000..682bd6d
--- /dev/null
+++ b/src/test/resources/optimizerts/queries/channel/channel-subscribe.aql
@@ -0,0 +1,40 @@
+/*
+ * Description  : Check the Plan for Subscribing to a channel    
+ * Expected Res : Success
+ * Date         : Mar 2015
+ */
+
+drop dataverse channels if exists;
+create dataverse channels;
+use dataverse channels;
+
+
+create type TweetMessageTypeuuid as closed {
+  tweetid: uuid,
+  sender-location: point,
+  send-time: datetime,
+  referred-topics: {{ string }},
+  message-text: string,
+  countA: int32,
+  countB: int32
+}
+
+
+create dataset TweetMessageuuids(TweetMessageTypeuuid)
+primary key tweetid autogenerated;
+
+create function NearbyTweetsContainingText($location, $text) {
+  for $tweet in dataset TweetMessageuuids
+  let $circle := create-circle($location,30.0)
+  where contains($tweet.message-text,$text)
+  and spatial-intersect($tweet.sender-location, $location)
+  return $tweet.message-text 
+};
+
+create repetitive channel nearbyTweetChannel using NearbyTweetsContainingText@2 period duration("PT10M");
+
+write output to nc1:"rttest/channel-subscribe.adm";
+
+create broker brokerA at "http://www.hello.com";
+
+subscribe to nearbyTweetChannel (point("30.0, 30.0"), "Live") on brokerA;
\ No newline at end of file
diff --git a/src/test/resources/optimizerts/queries/channel/channel-unsubscribe.aql b/src/test/resources/optimizerts/queries/channel/channel-unsubscribe.aql
new file mode 100644
index 0000000..7cdec50
--- /dev/null
+++ b/src/test/resources/optimizerts/queries/channel/channel-unsubscribe.aql
@@ -0,0 +1,38 @@
+/*
+ * Description  : Check the Plan for Unsubscribing to a channel
+ * Expected Res : Success
+ * Date         : Mar 2015
+ */
+
+drop dataverse channels if exists;
+create dataverse channels;
+use dataverse channels;
+
+
+create type TweetMessageTypeuuid as closed {
+  tweetid: uuid,
+  sender-location: point,
+  send-time: datetime,
+  referred-topics: {{ string }},
+  message-text: string,
+  countA: int32,
+  countB: int32
+}
+
+
+create dataset TweetMessageuuids(TweetMessageTypeuuid)
+primary key tweetid autogenerated;
+
+create function NearbyTweetsContainingText($location, $text) {
+  for $tweet in dataset TweetMessageuuids
+  let $circle := create-circle($location,30.0)
+  where contains($tweet.message-text,$text)
+  and spatial-intersect($tweet.sender-location, $location)
+  return $tweet.message-text 
+};
+
+create repetitive channel nearbyTweetChannel using NearbyTweetsContainingText@2 period duration("PT10M");
+
+write output to nc1:"rttest/channel-unsubscribe.adm";
+
+unsubscribe "c45ef6d0-c5ae-4b9e-b5da-cf1932718296" from nearbyTweetChannel;
\ No newline at end of file
diff --git a/src/test/resources/optimizerts/results/channel/channel-create.plan b/src/test/resources/optimizerts/results/channel/channel-create.plan
new file mode 100644
index 0000000..f597191
--- /dev/null
+++ b/src/test/resources/optimizerts/results/channel/channel-create.plan
@@ -0,0 +1,30 @@
+-- COMMIT  |PARTITIONED|
+  -- STREAM_PROJECT  |PARTITIONED|
+    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+      -- INSERT_DELETE  |PARTITIONED|
+        -- HASH_PARTITION_EXCHANGE [$$22]  |PARTITIONED|
+          -- ASSIGN  |PARTITIONED|
+            -- STREAM_PROJECT  |PARTITIONED|
+              -- ASSIGN  |PARTITIONED|
+                -- STREAM_PROJECT  |PARTITIONED|
+                  -- ASSIGN  |PARTITIONED|
+                    -- STREAM_PROJECT  |PARTITIONED|
+                      -- ASSIGN  |PARTITIONED|
+                        -- STREAM_PROJECT  |PARTITIONED|
+                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                            -- NESTED_LOOP  |PARTITIONED|
+                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                -- STREAM_PROJECT  |PARTITIONED|
+                                  -- ASSIGN  |PARTITIONED|
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      -- DATASOURCE_SCAN  |PARTITIONED|
+                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                              -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                -- STREAM_PROJECT  |PARTITIONED|
+                                  -- ASSIGN  |PARTITIONED|
+                                    -- STREAM_PROJECT  |PARTITIONED|
+                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                        -- DATASOURCE_SCAN  |PARTITIONED|
+                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                            -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
\ No newline at end of file
diff --git a/src/test/resources/optimizerts/results/channel/channel-subscribe.plan b/src/test/resources/optimizerts/results/channel/channel-subscribe.plan
new file mode 100644
index 0000000..4530923
--- /dev/null
+++ b/src/test/resources/optimizerts/results/channel/channel-subscribe.plan
@@ -0,0 +1,44 @@
+-- COMMIT  |PARTITIONED|
+  -- STREAM_PROJECT  |PARTITIONED|
+    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+      -- INSERT_DELETE  |PARTITIONED|
+        -- HASH_PARTITION_EXCHANGE [$$22]  |PARTITIONED|
+          -- ASSIGN  |PARTITIONED|
+            -- STREAM_PROJECT  |PARTITIONED|
+              -- ASSIGN  |PARTITIONED|
+                -- STREAM_PROJECT  |PARTITIONED|
+                  -- ASSIGN  |PARTITIONED|
+                    -- STREAM_PROJECT  |PARTITIONED|
+                      -- ASSIGN  |PARTITIONED|
+                        -- STREAM_PROJECT  |PARTITIONED|
+                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                            -- NESTED_LOOP  |PARTITIONED|
+                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                -- STREAM_PROJECT  |PARTITIONED|
+                                  -- ASSIGN  |PARTITIONED|
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      -- DATASOURCE_SCAN  |PARTITIONED|
+                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                              -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                -- STREAM_PROJECT  |PARTITIONED|
+                                  -- ASSIGN  |PARTITIONED|
+                                    -- STREAM_PROJECT  |PARTITIONED|
+                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                        -- DATASOURCE_SCAN  |PARTITIONED|
+                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                            -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    -- COMMIT  |PARTITIONED|
+      -- STREAM_PROJECT  |PARTITIONED|
+        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+          -- INSERT_DELETE  |PARTITIONED|
+            -- HASH_PARTITION_EXCHANGE [$$8]  |PARTITIONED|
+              -- ASSIGN  |UNPARTITIONED|
+                -- STREAM_PROJECT  |UNPARTITIONED|
+                  -- ASSIGN  |UNPARTITIONED|
+                    -- STREAM_PROJECT  |UNPARTITIONED|
+                      -- ASSIGN  |UNPARTITIONED|
+                        -- ASSIGN  |UNPARTITIONED|
+                          -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
\ No newline at end of file
diff --git a/src/test/resources/optimizerts/results/channel/channel-unsubscribe.plan b/src/test/resources/optimizerts/results/channel/channel-unsubscribe.plan
new file mode 100644
index 0000000..a9e383a
--- /dev/null
+++ b/src/test/resources/optimizerts/results/channel/channel-unsubscribe.plan
@@ -0,0 +1,44 @@
+-- COMMIT  |PARTITIONED|
+  -- STREAM_PROJECT  |PARTITIONED|
+    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+      -- INSERT_DELETE  |PARTITIONED|
+        -- HASH_PARTITION_EXCHANGE [$$22]  |PARTITIONED|
+          -- ASSIGN  |PARTITIONED|
+            -- STREAM_PROJECT  |PARTITIONED|
+              -- ASSIGN  |PARTITIONED|
+                -- STREAM_PROJECT  |PARTITIONED|
+                  -- ASSIGN  |PARTITIONED|
+                    -- STREAM_PROJECT  |PARTITIONED|
+                      -- ASSIGN  |PARTITIONED|
+                        -- STREAM_PROJECT  |PARTITIONED|
+                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                            -- NESTED_LOOP  |PARTITIONED|
+                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                -- STREAM_PROJECT  |PARTITIONED|
+                                  -- ASSIGN  |PARTITIONED|
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      -- DATASOURCE_SCAN  |PARTITIONED|
+                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                              -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                -- STREAM_PROJECT  |PARTITIONED|
+                                  -- ASSIGN  |PARTITIONED|
+                                    -- STREAM_PROJECT  |PARTITIONED|
+                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                        -- DATASOURCE_SCAN  |PARTITIONED|
+                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                            -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+-- COMMIT  |PARTITIONED|
+  -- STREAM_PROJECT  |PARTITIONED|
+    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+      -- INSERT_DELETE  |PARTITIONED|
+        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+          -- MATERIALIZE  |PARTITIONED|
+            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+              -- ASSIGN  |PARTITIONED|
+                -- STREAM_PROJECT  |PARTITIONED|
+                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                    -- BTREE_SEARCH  |PARTITIONED|
+                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                        -- ASSIGN  |PARTITIONED|
+                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
\ No newline at end of file
diff --git a/src/test/resources/runtimets/queries/channel/create_channel_check_datasets/create_channel_check_datasets.1.ddl.aql b/src/test/resources/runtimets/queries/channel/create_channel_check_datasets/create_channel_check_datasets.1.ddl.aql
new file mode 100644
index 0000000..41b036a
--- /dev/null
+++ b/src/test/resources/runtimets/queries/channel/create_channel_check_datasets/create_channel_check_datasets.1.ddl.aql
@@ -0,0 +1,34 @@
+/*
+* Description  : Create Channel Test. Confirms that the subscription and result datasets are created
+* Expected Res : Success
+* Date         : March 2015
+* Author       : Steven Jacobs
+*/
+
+drop dataverse channels if exists;
+create dataverse channels;
+use dataverse channels;
+
+create type TweetMessageTypeuuid as closed {
+  tweetid: uuid,
+  sender-location: point,
+  send-time: datetime,
+  referred-topics: {{ string }},
+  message-text: string,
+  countA: int32,
+  countB: int32
+}
+
+
+create dataset TweetMessageuuids(TweetMessageTypeuuid)
+primary key tweetid autogenerated;
+
+create function NearbyTweetsContainingText($location, $text) {
+  for $tweet in dataset TweetMessageuuids
+  let $circle := create-circle($location,30.0)
+  where contains($tweet.message-text,$text)
+  and spatial-intersect($tweet.sender-location, $location)
+  return $tweet.message-text
+};
+
+create repetitive channel nearbyTweetChannel using NearbyTweetsContainingText@2 period duration("PT10M");
\ No newline at end of file
diff --git a/src/test/resources/runtimets/queries/channel/create_channel_check_datasets/create_channel_check_datasets.3.query.aql b/src/test/resources/runtimets/queries/channel/create_channel_check_datasets/create_channel_check_datasets.3.query.aql
new file mode 100644
index 0000000..eb341e9
--- /dev/null
+++ b/src/test/resources/runtimets/queries/channel/create_channel_check_datasets/create_channel_check_datasets.3.query.aql
@@ -0,0 +1,7 @@
+use dataverse channels;
+
+for $result in dataset Metadata.Channel where $result.ChannelName = "nearbyTweetChannel"
+for $x in dataset Metadata.Dataset
+where $x.DatasetName = $result.SubscriptionsDatasetName
+or $x.DatasetName = $result.ResultsDatasetName
+return $x;
\ No newline at end of file
diff --git a/src/test/resources/runtimets/queries/channel/create_channel_check_metadata/create_channel_check_metadata.1.ddl.aql b/src/test/resources/runtimets/queries/channel/create_channel_check_metadata/create_channel_check_metadata.1.ddl.aql
new file mode 100644
index 0000000..7bace03
--- /dev/null
+++ b/src/test/resources/runtimets/queries/channel/create_channel_check_metadata/create_channel_check_metadata.1.ddl.aql
@@ -0,0 +1,34 @@
+/*
+* Description  : Create Channel Test
+* Expected Res : Success
+* Date         : March 2015
+* Author       : Steven Jacobs
+*/
+
+drop dataverse channels if exists;
+create dataverse channels;
+use dataverse channels;
+
+create type TweetMessageTypeuuid as closed {
+  tweetid: uuid,
+  sender-location: point,
+  send-time: datetime,
+  referred-topics: {{ string }},
+  message-text: string,
+  countA: int32,
+  countB: int32
+}
+
+
+create dataset TweetMessageuuids(TweetMessageTypeuuid)
+primary key tweetid autogenerated;
+
+create function NearbyTweetsContainingText($location, $text) {
+  for $tweet in dataset TweetMessageuuids
+  let $circle := create-circle($location,30.0)
+  where contains($tweet.message-text,$text)
+  and spatial-intersect($tweet.sender-location, $location)
+  return $tweet.message-text
+};
+
+create repetitive channel nearbyTweetChannel using NearbyTweetsContainingText@2 period duration("PT10M");
\ No newline at end of file
diff --git a/src/test/resources/runtimets/queries/channel/create_channel_check_metadata/create_channel_check_metadata.3.query.aql b/src/test/resources/runtimets/queries/channel/create_channel_check_metadata/create_channel_check_metadata.3.query.aql
new file mode 100644
index 0000000..9a1e170
--- /dev/null
+++ b/src/test/resources/runtimets/queries/channel/create_channel_check_metadata/create_channel_check_metadata.3.query.aql
@@ -0,0 +1,3 @@
+use dataverse channels;
+
+for $result in dataset Metadata.Channel return $result;
\ No newline at end of file
diff --git a/src/test/resources/runtimets/queries/channel/drop_channel_check_datasets/drop_channel_check_datasets.1.ddl.aql b/src/test/resources/runtimets/queries/channel/drop_channel_check_datasets/drop_channel_check_datasets.1.ddl.aql
new file mode 100644
index 0000000..afc7d5e
--- /dev/null
+++ b/src/test/resources/runtimets/queries/channel/drop_channel_check_datasets/drop_channel_check_datasets.1.ddl.aql
@@ -0,0 +1,38 @@
+/*
+* Description  : Drop Channel Test. Check Metadata
+* Expected Res : Success
+* Date         : March 2015
+* Author       : Steven Jacobs
+*/
+
+drop dataverse channels if exists;
+create dataverse channels;
+use dataverse channels;
+
+create type TweetMessageTypeuuid as closed {
+  tweetid: uuid,
+  sender-location: point,
+  send-time: datetime,
+  referred-topics: {{ string }},
+  message-text: string,
+  countA: int32,
+  countB: int32
+}
+
+
+create dataset TweetMessageuuids(TweetMessageTypeuuid)
+primary key tweetid autogenerated;
+
+create function NearbyTweetsContainingText($location, $text) {
+  for $tweet in dataset TweetMessageuuids
+  let $circle := create-circle($location,30.0)
+  where contains($tweet.message-text,$text)
+  and spatial-intersect($tweet.sender-location, $location)
+  return $tweet.message-text
+};
+
+create repetitive channel nearbyTweetChannel1 using NearbyTweetsContainingText@2 period duration("PT10M");
+
+create repetitive channel nearbyTweetChannel2 using NearbyTweetsContainingText@2 period duration("PT10M");
+
+create repetitive channel nearbyTweetChannel3 using NearbyTweetsContainingText@2 period duration("PT10M");
\ No newline at end of file
diff --git a/src/test/resources/runtimets/queries/channel/drop_channel_check_datasets/drop_channel_check_datasets.2.ddl.aql b/src/test/resources/runtimets/queries/channel/drop_channel_check_datasets/drop_channel_check_datasets.2.ddl.aql
new file mode 100644
index 0000000..f466b9c
--- /dev/null
+++ b/src/test/resources/runtimets/queries/channel/drop_channel_check_datasets/drop_channel_check_datasets.2.ddl.aql
@@ -0,0 +1,3 @@
+use dataverse channels;
+
+drop channel nearbyTweetChannel2;
\ No newline at end of file
diff --git a/src/test/resources/runtimets/queries/channel/drop_channel_check_datasets/drop_channel_check_datasets.3.query.aql b/src/test/resources/runtimets/queries/channel/drop_channel_check_datasets/drop_channel_check_datasets.3.query.aql
new file mode 100644
index 0000000..e762a27
--- /dev/null
+++ b/src/test/resources/runtimets/queries/channel/drop_channel_check_datasets/drop_channel_check_datasets.3.query.aql
@@ -0,0 +1,7 @@
+use dataverse channels;
+
+for $result in dataset Metadata.Channel
+for $x in dataset Metadata.Dataset
+where $x.DatasetName = $result.SubscriptionsDatasetName
+or $x.DatasetName = $result.ResultsDatasetName
+return $x;
\ No newline at end of file
diff --git a/src/test/resources/runtimets/queries/channel/drop_channel_check_metadata/drop_channel_check_metadata.1.ddl.aql b/src/test/resources/runtimets/queries/channel/drop_channel_check_metadata/drop_channel_check_metadata.1.ddl.aql
new file mode 100644
index 0000000..afc7d5e
--- /dev/null
+++ b/src/test/resources/runtimets/queries/channel/drop_channel_check_metadata/drop_channel_check_metadata.1.ddl.aql
@@ -0,0 +1,38 @@
+/*
+* Description  : Drop Channel Test. Check Metadata
+* Expected Res : Success
+* Date         : March 2015
+* Author       : Steven Jacobs
+*/
+
+drop dataverse channels if exists;
+create dataverse channels;
+use dataverse channels;
+
+create type TweetMessageTypeuuid as closed {
+  tweetid: uuid,
+  sender-location: point,
+  send-time: datetime,
+  referred-topics: {{ string }},
+  message-text: string,
+  countA: int32,
+  countB: int32
+}
+
+
+create dataset TweetMessageuuids(TweetMessageTypeuuid)
+primary key tweetid autogenerated;
+
+create function NearbyTweetsContainingText($location, $text) {
+  for $tweet in dataset TweetMessageuuids
+  let $circle := create-circle($location,30.0)
+  where contains($tweet.message-text,$text)
+  and spatial-intersect($tweet.sender-location, $location)
+  return $tweet.message-text
+};
+
+create repetitive channel nearbyTweetChannel1 using NearbyTweetsContainingText@2 period duration("PT10M");
+
+create repetitive channel nearbyTweetChannel2 using NearbyTweetsContainingText@2 period duration("PT10M");
+
+create repetitive channel nearbyTweetChannel3 using NearbyTweetsContainingText@2 period duration("PT10M");
\ No newline at end of file
diff --git a/src/test/resources/runtimets/queries/channel/drop_channel_check_metadata/drop_channel_check_metadata.2.ddl.aql b/src/test/resources/runtimets/queries/channel/drop_channel_check_metadata/drop_channel_check_metadata.2.ddl.aql
new file mode 100644
index 0000000..f466b9c
--- /dev/null
+++ b/src/test/resources/runtimets/queries/channel/drop_channel_check_metadata/drop_channel_check_metadata.2.ddl.aql
@@ -0,0 +1,3 @@
+use dataverse channels;
+
+drop channel nearbyTweetChannel2;
\ No newline at end of file
diff --git a/src/test/resources/runtimets/queries/channel/drop_channel_check_metadata/drop_channel_check_metadata.3.query.aql b/src/test/resources/runtimets/queries/channel/drop_channel_check_metadata/drop_channel_check_metadata.3.query.aql
new file mode 100644
index 0000000..9a1e170
--- /dev/null
+++ b/src/test/resources/runtimets/queries/channel/drop_channel_check_metadata/drop_channel_check_metadata.3.query.aql
@@ -0,0 +1,3 @@
+use dataverse channels;
+
+for $result in dataset Metadata.Channel return $result;
\ No newline at end of file
diff --git a/src/test/resources/runtimets/queries/channel/room_occupants/room_occupants.1.ddl.aql b/src/test/resources/runtimets/queries/channel/room_occupants/room_occupants.1.ddl.aql
new file mode 100644
index 0000000..29b56e1
--- /dev/null
+++ b/src/test/resources/runtimets/queries/channel/room_occupants/room_occupants.1.ddl.aql
@@ -0,0 +1,56 @@
+/*
+* Description  : Room Occupants Test
+* Expected Res : Success
+* Date         : Sep 2016
+* Author       : Steven Jacobs
+*/
+
+drop dataverse channels if exists;
+create dataverse channels;
+use dataverse channels;
+
+create type userLocation as {
+  userId: int,
+  roomNumber: int
+}
+create type watchedUser as {
+  userId: int,
+  name: string
+}
+create type roomSecurity as {
+  roomNumber: int,
+  securityGuardName: string,
+  securityGuardNumber: string 
+}
+
+create dataset watchedUsers(watchedUser)
+primary key userId;
+
+create dataset roomSecurityAssignments(roomSecurity)
+primary key roomNumber;
+
+upsert into dataset roomSecurityAssignments([
+{"roomNumber":123, "securityGuardName":"Mike", "securityGuardNumber":"555-4815"},
+{"roomNumber":222, "securityGuardName":"Steven", "securityGuardNumber":"555-1623"},
+{"roomNumber":350, "securityGuardName":"Vassilis", "securityGuardNumber":"555-1234"}]
+);
+
+upsert into dataset watchedUsers([
+{"userId":1, "name":"suspectNumber1"}]
+);
+
+
+create dataset UserLocations(userLocation)
+primary key userId;
+
+create function RoomOccupants($room) {
+    for $location in dataset UserLocations 
+    where $location.roomNumber = $room 
+    return $location.userId
+};
+
+create broker brokerA at "http://www.notifyA.com";
+create broker brokerB at "http://www.notifyB.com";
+
+
+create repetitive channel roomRecords using RoomOccupants@1 period duration("PT5S");
\ No newline at end of file
diff --git a/src/test/resources/runtimets/queries/channel/room_occupants/room_occupants.2.update.aql b/src/test/resources/runtimets/queries/channel/room_occupants/room_occupants.2.update.aql
new file mode 100644
index 0000000..8e15e19
--- /dev/null
+++ b/src/test/resources/runtimets/queries/channel/room_occupants/room_occupants.2.update.aql
@@ -0,0 +1,12 @@
+/*
+* Description  : Room Occupants Test
+* Expected Res : Success
+* Date         : Sep 2016
+* Author       : Steven Jacobs
+*/
+
+
+use dataverse channels;
+
+subscribe to roomRecords (123) on brokerA;
+subscribe to roomRecords (350) on brokerB;
\ No newline at end of file
diff --git a/src/test/resources/runtimets/queries/channel/room_occupants/room_occupants.3.query.aql b/src/test/resources/runtimets/queries/channel/room_occupants/room_occupants.3.query.aql
new file mode 100644
index 0000000..15d6a5e
--- /dev/null
+++ b/src/test/resources/runtimets/queries/channel/room_occupants/room_occupants.3.query.aql
@@ -0,0 +1,15 @@
+/*
+* Description  : Room Occupants Test
+* Expected Res : Success
+* Date         : Sep 2016
+* Author       : Steven Jacobs
+*/
+
+use dataverse channels;
+
+from $test in dataset roomRecordsSubscriptions
+order by $test.BrokerName
+select {
+"broker":$test.BrokerName,
+"parameter":$test.param0
+}
\ No newline at end of file
diff --git a/src/test/resources/runtimets/queries/channel/room_occupants/room_occupants.4.update.aql b/src/test/resources/runtimets/queries/channel/room_occupants/room_occupants.4.update.aql
new file mode 100644
index 0000000..15ebf7f
--- /dev/null
+++ b/src/test/resources/runtimets/queries/channel/room_occupants/room_occupants.4.update.aql
@@ -0,0 +1,16 @@
+/*
+* Description  : Room Occupants Test
+* Expected Res : Success
+* Date         : Sep 2016
+* Author       : Steven Jacobs
+*/
+
+
+use dataverse channels;
+
+
+upsert into dataset UserLocations([
+{"userId":1, "roomNumber":123},
+{"userId":2, "roomNumber":222},
+{"userId":3, "roomNumber":350}]
+);
\ No newline at end of file
diff --git a/src/test/resources/runtimets/queries/channel/room_occupants/room_occupants.5.sleep.aql b/src/test/resources/runtimets/queries/channel/room_occupants/room_occupants.5.sleep.aql
new file mode 100644
index 0000000..891eeea
--- /dev/null
+++ b/src/test/resources/runtimets/queries/channel/room_occupants/room_occupants.5.sleep.aql
@@ -0,0 +1,7 @@
+/*
+* Description  : Room Occupants Test
+* Expected Res : Success
+* Date         : Sep 2016
+* Author       : Steven Jacobs
+*/
+5000
\ No newline at end of file
diff --git a/src/test/resources/runtimets/queries/channel/room_occupants/room_occupants.6.update.aql b/src/test/resources/runtimets/queries/channel/room_occupants/room_occupants.6.update.aql
new file mode 100644
index 0000000..74f39a4
--- /dev/null
+++ b/src/test/resources/runtimets/queries/channel/room_occupants/room_occupants.6.update.aql
@@ -0,0 +1,14 @@
+/*
+* Description  : Room Occupants Test
+* Expected Res : Success
+* Date         : Sep 2016
+* Author       : Steven Jacobs
+*/
+
+use dataverse channels;
+
+upsert into dataset UserLocations([
+{"userId":1, "roomNumber":222},
+{"userId":2, "roomNumber":222},
+{"userId":3, "roomNumber":222}]
+);
\ No newline at end of file
diff --git a/src/test/resources/runtimets/queries/channel/room_occupants/room_occupants.7.query.aql b/src/test/resources/runtimets/queries/channel/room_occupants/room_occupants.7.query.aql
new file mode 100644
index 0000000..f6295f0
--- /dev/null
+++ b/src/test/resources/runtimets/queries/channel/room_occupants/room_occupants.7.query.aql
@@ -0,0 +1,12 @@
+/*
+* Description  : Room Occupants Test
+* Expected Res : Success
+* Date         : Sep 2016
+* Author       : Steven Jacobs
+*/
+
+use dataverse channels;
+
+from $result in dataset roomRecordsResults
+order by $result.result
+select $result.result;
\ No newline at end of file
diff --git a/src/test/resources/runtimets/queries/channel/subscribe_channel_check_subscriptions/subscribe_channel_check_subscriptions.1.ddl.aql b/src/test/resources/runtimets/queries/channel/subscribe_channel_check_subscriptions/subscribe_channel_check_subscriptions.1.ddl.aql
new file mode 100644
index 0000000..41b036a
--- /dev/null
+++ b/src/test/resources/runtimets/queries/channel/subscribe_channel_check_subscriptions/subscribe_channel_check_subscriptions.1.ddl.aql
@@ -0,0 +1,34 @@
+/*
+* Description  : Create Channel Test. Confirms that the subscription and result datasets are created
+* Expected Res : Success
+* Date         : March 2015
+* Author       : Steven Jacobs
+*/
+
+drop dataverse channels if exists;
+create dataverse channels;
+use dataverse channels;
+
+create type TweetMessageTypeuuid as closed {
+  tweetid: uuid,
+  sender-location: point,
+  send-time: datetime,
+  referred-topics: {{ string }},
+  message-text: string,
+  countA: int32,
+  countB: int32
+}
+
+
+create dataset TweetMessageuuids(TweetMessageTypeuuid)
+primary key tweetid autogenerated;
+
+create function NearbyTweetsContainingText($location, $text) {
+  for $tweet in dataset TweetMessageuuids
+  let $circle := create-circle($location,30.0)
+  where contains($tweet.message-text,$text)
+  and spatial-intersect($tweet.sender-location, $location)
+  return $tweet.message-text
+};
+
+create repetitive channel nearbyTweetChannel using NearbyTweetsContainingText@2 period duration("PT10M");
\ No newline at end of file
diff --git a/src/test/resources/runtimets/queries/channel/subscribe_channel_check_subscriptions/subscribe_channel_check_subscriptions.2.update.aql b/src/test/resources/runtimets/queries/channel/subscribe_channel_check_subscriptions/subscribe_channel_check_subscriptions.2.update.aql
new file mode 100644
index 0000000..6d35506
--- /dev/null
+++ b/src/test/resources/runtimets/queries/channel/subscribe_channel_check_subscriptions/subscribe_channel_check_subscriptions.2.update.aql
@@ -0,0 +1,7 @@
+use dataverse channels;
+
+subscribe to nearbyTweetChannel (point("30.0, 30.0"), "Live");
+
+subscribe to nearbyTweetChannel (point("20.0, 20.0"), "Long");
+
+subscribe to nearbyTweetChannel (point("10.0, 10.0"), "Prosper");
\ No newline at end of file
diff --git a/src/test/resources/runtimets/queries/channel/subscribe_channel_check_subscriptions/subscribe_channel_check_subscriptions.3.query.aql b/src/test/resources/runtimets/queries/channel/subscribe_channel_check_subscriptions/subscribe_channel_check_subscriptions.3.query.aql
new file mode 100644
index 0000000..4937840
--- /dev/null
+++ b/src/test/resources/runtimets/queries/channel/subscribe_channel_check_subscriptions/subscribe_channel_check_subscriptions.3.query.aql
@@ -0,0 +1,5 @@
+use dataverse channels;
+
+for $test in dataset nearbyTweetChannelSubscriptions
+order by $test.param1
+return $test.param1;
\ No newline at end of file
diff --git a/src/test/resources/runtimets/results/channel/create_channel_check_datasets/create_channel_check_datasets.1.adm b/src/test/resources/runtimets/results/channel/create_channel_check_datasets/create_channel_check_datasets.1.adm
new file mode 100644
index 0000000..dde1ee9
--- /dev/null
+++ b/src/test/resources/runtimets/results/channel/create_channel_check_datasets/create_channel_check_datasets.1.adm
@@ -0,0 +1,2 @@
+{ "DataverseName": "channels", "DatasetName": "nearbyTweetChannelResults", "DatatypeDataverseName": "Metadata", "DatatypeName": "ChannelResultsType", "DatasetType": "INTERNAL", "GroupName": "DEFAULT_NG_ALL_NODES", "CompactionPolicy": "prefix", "CompactionPolicyProperties": [ { "Name": "max-mergable-component-size", "Value": "1073741824" }, { "Name": "max-tolerance-component-count", "Value": "5" } ], "InternalDetails": { "FileStructure": "BTREE", "PartitioningStrategy": "HASH", "PartitioningKey": [ [ "resultId" ] ], "PrimaryKey": [ [ "resultId" ] ], "Autogenerated": true }, "Hints": {{  }}, "Timestamp": "Mon Sep 12 13:48:16 PDT 2016", "DatasetId": 103, "PendingOp": 0 }
+{ "DataverseName": "channels", "DatasetName": "nearbyTweetChannelSubscriptions", "DatatypeDataverseName": "Metadata", "DatatypeName": "ChannelSubscriptionsType", "DatasetType": "INTERNAL", "GroupName": "DEFAULT_NG_ALL_NODES", "CompactionPolicy": "prefix", "CompactionPolicyProperties": [ { "Name": "max-mergable-component-size", "Value": "1073741824" }, { "Name": "max-tolerance-component-count", "Value": "5" } ], "InternalDetails": { "FileStructure": "BTREE", "PartitioningStrategy": "HASH", "PartitioningKey": [ [ "subscriptionId" ] ], "PrimaryKey": [ [ "subscriptionId" ] ], "Autogenerated": true }, "Hints": {{  }}, "Timestamp": "Mon Sep 12 13:48:16 PDT 2016", "DatasetId": 102, "PendingOp": 0 }
\ No newline at end of file
diff --git a/src/test/resources/runtimets/results/channel/create_channel_check_metadata/create_channel_check_metadata.1.adm b/src/test/resources/runtimets/results/channel/create_channel_check_metadata/create_channel_check_metadata.1.adm
new file mode 100644
index 0000000..e009733
--- /dev/null
+++ b/src/test/resources/runtimets/results/channel/create_channel_check_metadata/create_channel_check_metadata.1.adm
@@ -0,0 +1 @@
+{ "DataverseName": "channels", "ChannelName": "nearbyTweetChannel", "SubscriptionsDatasetName": "nearbyTweetChannelSubscriptions", "ResultsDatasetName": "nearbyTweetChannelResults", "Function": "channels.NearbyTweetsContainingText@2", "Duration": "PT10M" }
\ No newline at end of file
diff --git a/src/test/resources/runtimets/results/channel/drop_channel_check_datasets/drop_channel_check_datasets.1.adm b/src/test/resources/runtimets/results/channel/drop_channel_check_datasets/drop_channel_check_datasets.1.adm
new file mode 100644
index 0000000..4002a62
--- /dev/null
+++ b/src/test/resources/runtimets/results/channel/drop_channel_check_datasets/drop_channel_check_datasets.1.adm
@@ -0,0 +1,4 @@
+{ "DataverseName": "channels", "DatasetName": "nearbyTweetChannel1Results", "DatatypeDataverseName": "Metadata", "DatatypeName": "ChannelResultsType", "DatasetType": "INTERNAL", "GroupName": "DEFAULT_NG_ALL_NODES", "CompactionPolicy": "prefix", "CompactionPolicyProperties": [ { "Name": "max-mergable-component-size", "Value": "1073741824" }, { "Name": "max-tolerance-component-count", "Value": "5" } ], "InternalDetails": { "FileStructure": "BTREE", "PartitioningStrategy": "HASH", "PartitioningKey": [ [ "resultId" ] ], "PrimaryKey": [ [ "resultId" ] ], "Autogenerated": true }, "Hints": {{  }}, "Timestamp": "Tue Sep 13 09:50:56 PDT 2016", "DatasetId": 103, "PendingOp": 0 }
+{ "DataverseName": "channels", "DatasetName": "nearbyTweetChannel1Subscriptions", "DatatypeDataverseName": "Metadata", "DatatypeName": "ChannelSubscriptionsType", "DatasetType": "INTERNAL", "GroupName": "DEFAULT_NG_ALL_NODES", "CompactionPolicy": "prefix", "CompactionPolicyProperties": [ { "Name": "max-mergable-component-size", "Value": "1073741824" }, { "Name": "max-tolerance-component-count", "Value": "5" } ], "InternalDetails": { "FileStructure": "BTREE", "PartitioningStrategy": "HASH", "PartitioningKey": [ [ "subscriptionId" ] ], "PrimaryKey": [ [ "subscriptionId" ] ], "Autogenerated": true }, "Hints": {{  }}, "Timestamp": "Tue Sep 13 09:50:56 PDT 2016", "DatasetId": 102, "PendingOp": 0 }
+{ "DataverseName": "channels", "DatasetName": "nearbyTweetChannel3Results", "DatatypeDataverseName": "Metadata", "DatatypeName": "ChannelResultsType", "DatasetType": "INTERNAL", "GroupName": "DEFAULT_NG_ALL_NODES", "CompactionPolicy": "prefix", "CompactionPolicyProperties": [ { "Name": "max-mergable-component-size", "Value": "1073741824" }, { "Name": "max-tolerance-component-count", "Value": "5" } ], "InternalDetails": { "FileStructure": "BTREE", "PartitioningStrategy": "HASH", "PartitioningKey": [ [ "resultId" ] ], "PrimaryKey": [ [ "resultId" ] ], "Autogenerated": true }, "Hints": {{  }}, "Timestamp": "Tue Sep 13 09:50:58 PDT 2016", "DatasetId": 107, "PendingOp": 0 }
+{ "DataverseName": "channels", "DatasetName": "nearbyTweetChannel3Subscriptions", "DatatypeDataverseName": "Metadata", "DatatypeName": "ChannelSubscriptionsType", "DatasetType": "INTERNAL", "GroupName": "DEFAULT_NG_ALL_NODES", "CompactionPolicy": "prefix", "CompactionPolicyProperties": [ { "Name": "max-mergable-component-size", "Value": "1073741824" }, { "Name": "max-tolerance-component-count", "Value": "5" } ], "InternalDetails": { "FileStructure": "BTREE", "PartitioningStrategy": "HASH", "PartitioningKey": [ [ "subscriptionId" ] ], "PrimaryKey": [ [ "subscriptionId" ] ], "Autogenerated": true }, "Hints": {{  }}, "Timestamp": "Tue Sep 13 09:50:58 PDT 2016", "DatasetId": 106, "PendingOp": 0 }
\ No newline at end of file
diff --git a/src/test/resources/runtimets/results/channel/drop_channel_check_metadata/drop_channel_check_metadata.1.adm b/src/test/resources/runtimets/results/channel/drop_channel_check_metadata/drop_channel_check_metadata.1.adm
new file mode 100644
index 0000000..7307d37
--- /dev/null
+++ b/src/test/resources/runtimets/results/channel/drop_channel_check_metadata/drop_channel_check_metadata.1.adm
@@ -0,0 +1,2 @@
+{ "DataverseName": "channels", "ChannelName": "nearbyTweetChannel1", "SubscriptionsDatasetName": "nearbyTweetChannel1Subscriptions", "ResultsDatasetName": "nearbyTweetChannel1Results", "Function": "channels.NearbyTweetsContainingText@2", "Duration": "PT10M" }
+, { "DataverseName": "channels", "ChannelName": "nearbyTweetChannel3", "SubscriptionsDatasetName": "nearbyTweetChannel3Subscriptions", "ResultsDatasetName": "nearbyTweetChannel3Results", "Function": "channels.NearbyTweetsContainingText@2", "Duration": "PT10M" }
\ No newline at end of file
diff --git a/src/test/resources/runtimets/results/channel/room_occupants/room_occupants.3.adm b/src/test/resources/runtimets/results/channel/room_occupants/room_occupants.3.adm
new file mode 100644
index 0000000..8f3c264
--- /dev/null
+++ b/src/test/resources/runtimets/results/channel/room_occupants/room_occupants.3.adm
@@ -0,0 +1,2 @@
+{ "broker": "brokerA", "parameter": 123 }
+{ "broker": "brokerB", "parameter": 350 }
\ No newline at end of file
diff --git a/src/test/resources/runtimets/results/channel/room_occupants/room_occupants.7.adm b/src/test/resources/runtimets/results/channel/room_occupants/room_occupants.7.adm
new file mode 100644
index 0000000..c396a2c
--- /dev/null
+++ b/src/test/resources/runtimets/results/channel/room_occupants/room_occupants.7.adm
@@ -0,0 +1,2 @@
+1
+3
\ No newline at end of file
diff --git a/src/test/resources/runtimets/results/channel/subscribe_channel_check_subscriptions/subscribe_channel_check_subscriptions.1.adm b/src/test/resources/runtimets/results/channel/subscribe_channel_check_subscriptions/subscribe_channel_check_subscriptions.1.adm
new file mode 100644
index 0000000..a2e74f1
--- /dev/null
+++ b/src/test/resources/runtimets/results/channel/subscribe_channel_check_subscriptions/subscribe_channel_check_subscriptions.1.adm
@@ -0,0 +1,3 @@
+"Live"
+, "Long"
+, "Prosper"
\ No newline at end of file