diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java b/asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java
index a787778..391d84e 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java
@@ -18,28 +18,15 @@
  */
 package org.apache.asterix.bad;
 
-import org.apache.asterix.common.metadata.DataverseName;
-
 public interface BADConstants {
     String SubscriptionId = "subscriptionId";
-    String BrokerName = "BrokerName";
     String ChannelName = "ChannelName";
     String ProcedureName = "ProcedureName";
-    String DataverseName = "DataverseName";
-    String BrokerEndPoint = "BrokerEndPoint";
     String DeliveryTime = "deliveryTime";
     String ResultId = "resultId";
-    String ChannelExecutionTime = "channelExecutionTime";
-    String ChannelSubscriptionsType = "ChannelSubscriptionsType";
     String ChannelResultsType = "ChannelResultsType";
     String ResultsDatasetName = "ResultsDatasetName";
     String SubscriptionsDatasetName = "SubscriptionsDatasetName";
-    String CHANNEL_EXTENSION_NAME = "Channel";
-    String PROCEDURE_KEYWORD = "Procedure";
-    String BROKER_KEYWORD = "Broker";
-    String RECORD_TYPENAME_BROKER = "BrokerRecordType";
-    String RECORD_TYPENAME_CHANNEL = "ChannelRecordType";
-    String RECORD_TYPENAME_PROCEDURE = "ProcedureRecordType";
     String subscriptionEnding = "Subscriptions";
     String resultsEnding = "Results";
     String BAD_METADATA_EXTENSION_NAME = "BADMetadataExtension";
@@ -52,10 +39,54 @@
     String FIELD_NAME_DEFINITION = "Definition";
     String FIELD_NAME_LANGUAGE = "Language";
     String FIELD_NAME_BODY = "Body";
+
+    /* --- Notification Fields --- */
+    String ChannelExecutionTime = "channelExecutionTime";
+    String CHANNEL_EXECUTION_EPOCH_TIME = "channelExecutionEpochTime";
+
+    // --- Active Dataset
+    String RECORD_TYPENAME_ACTIVE_RECORD = "ActiveRecordType";
+    String FIELD_NAME_ACTIVE_TS = "_active_timestamp";
+
     //To enable new Asterix TxnId for separate deployed job spec invocations
     byte[] TRANSACTION_ID_PARAMETER_NAME = "TxnIdParameter".getBytes();
     int EXECUTOR_TIMEOUT = 20;
 
+    /* --- Metadata Common --- */
+    String METADATA_TYPE_NAME_DATAVERSENAME = "DataverseName";
+
+    String METADATA_DATASET_CHANNEL = "Channel";
+    String METADATA_DATASET_PROCEDURE = "Procedure";
+    String METADATA_DATASET_BROKER = "Broker";
+
+    /* --- Metadata Datatypes --- */
+    String METADATA_TYPENAME_SUBSCRIPTIONS = "ChannelSubscriptionsType";
+    String METADATA_TYPENAME_BROKER = "BrokerRecordType";
+    String METADATA_TYPENAME_CHANNEL = "ChannelRecordType";
+    String METADATA_TYPENAME_PROCEDURE = "ProcedureRecordType";
+
+    /* --- Broker Field Names --- */
+    String METADATA_TYPE_FIELD_NAME_BROKERNAME = "BrokerName";
+    String METADATA_TYPE_FIELD_NAME_BROKER_END_POINT = "BrokerEndPoint";
+    String METADATA_TYPE_FIELD_NAME_BROKER_TYPE = "BrokerType";
+
+    /*  ---  Runtime Entities ---  */
+    String RUNTIME_ENTITY_PROCEDURE = "Procedure";
+    String RUNTIME_ENTITY_CHANNEL = "Channel";
+
+    /* --- Query Compilation --- */
+    String CONFIG_CHANNEL_NAME = "_internal_channelName";
+
+    /* --- BAD ISLANDS --- */
+    String GENERAL_BROKER_TYPE_NAME = "general";
+    String BAD_BROKER_TYPE_NAME = "bad";
+    String BAD_BROKER_FIELD_NAME_TYPE = "broker-type";
+    String BAD_FEED_FIELD_NAME_HOST = "bad-host";
+    String BAD_FEED_FIELD_NAME_CHANNEL = "bad-channel";
+    String BAD_FEED_FIELD_NAME_PARAMETERS = "bad-channel-parameters";
+    String BAD_FEED_FIELD_NAME_CHANNEL_DV = "bad-dataverse";
+    // String BAD_FEED_TYPE = "type"
+
     public enum ChannelJobType {
         REPETITIVE
     }
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/BADJobService.java b/asterix-bad/src/main/java/org/apache/asterix/bad/BADJobService.java
index af4ff03..4342dc6 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/BADJobService.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/BADJobService.java
@@ -18,7 +18,6 @@
  */
 package org.apache.asterix.bad;
 
-import java.io.StringReader;
 import java.time.Instant;
 import java.util.Date;
 import java.util.HashMap;
@@ -37,7 +36,7 @@
 import org.apache.asterix.app.result.fields.ResultsPrinter;
 import org.apache.asterix.app.translator.QueryTranslator;
 import org.apache.asterix.bad.lang.BADParserFactory;
-import org.apache.asterix.bad.lang.BADStatementExecutor;
+import org.apache.asterix.bad.lang.BADQueryTranslator;
 import org.apache.asterix.bad.metadata.DeployedJobSpecEventListener;
 import org.apache.asterix.common.api.IResponsePrinter;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
@@ -66,44 +65,44 @@
     private static final Logger LOGGER = Logger.getLogger(BADJobService.class.getName());
 
     //pool size one (only running one thread at a time)
-    private static final int POOL_SIZE = 1;
+    public static final int POOL_SIZE = 1;
 
     private static final long millisecondTimeout = BADConstants.EXECUTOR_TIMEOUT * 1000;
 
+    public static ScheduledExecutorService createExecutorServe() {
+        return Executors.newScheduledThreadPool(POOL_SIZE);
+    }
+
     public static void setupExecutorJob(EntityId entityId, JobSpecification channeljobSpec,
             IHyracksClientConnection hcc, DeployedJobSpecEventListener listener, ITxnIdFactory txnIdFactory,
             String duration) throws Exception {
+        ScheduledExecutorService ses = createExecutorServe();
+        listener.setExecutorService(ses);
         if (channeljobSpec != null) {
             channeljobSpec.setProperty(ActiveNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME, entityId);
             DeployedJobSpecId deployedId = hcc.deployJobSpec(channeljobSpec);
-            ScheduledExecutorService ses = startRepetitiveDeployedJobSpec(deployedId, hcc, findPeriod(duration),
-                    new HashMap<>(), entityId, txnIdFactory, listener);
+            startRepetitiveDeployedJobSpec(ses, deployedId, hcc, findPeriod(duration), new HashMap<>(), entityId,
+                    txnIdFactory, listener);
             listener.setDeployedJobSpecId(deployedId);
-            listener.setExecutorService(ses);
         }
-
     }
 
     //Starts running a deployed job specification periodically with an interval of "period" seconds
-    public static ScheduledExecutorService startRepetitiveDeployedJobSpec(DeployedJobSpecId distributedId,
-            IHyracksClientConnection hcc, long period, Map<byte[], byte[]> jobParameters, EntityId entityId,
-            ITxnIdFactory txnIdFactory, DeployedJobSpecEventListener listener) {
-        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(POOL_SIZE);
-        scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
-            @Override
-            public void run() {
-                try {
-                    if (!runDeployedJobSpecCheckPeriod(distributedId, hcc, jobParameters, period, entityId,
-                            txnIdFactory, listener)) {
-                        scheduledExecutorService.shutdown();
-                    }
-                } catch (Exception e) {
-                    LOGGER.log(Level.SEVERE, "Job Failed to run for " + entityId.getExtensionName() + " "
-                            + entityId.getDataverseName() + "." + entityId.getEntityName() + ".", e);
+    public static void startRepetitiveDeployedJobSpec(ScheduledExecutorService scheduledExecutorService,
+            DeployedJobSpecId distributedId, IHyracksClientConnection hcc, long period,
+            Map<byte[], byte[]> jobParameters, EntityId entityId, ITxnIdFactory txnIdFactory,
+            DeployedJobSpecEventListener listener) {
+        scheduledExecutorService.scheduleAtFixedRate(() -> {
+            try {
+                if (!runDeployedJobSpecCheckPeriod(distributedId, hcc, jobParameters, period, entityId, txnIdFactory,
+                        listener)) {
+                    scheduledExecutorService.shutdown();
                 }
+            } catch (Exception e) {
+                LOGGER.log(Level.SEVERE, "Job Failed to run for " + entityId.getExtensionName() + " "
+                        + entityId.getDataverseName() + "." + entityId.getEntityName() + ".", e);
             }
         }, period, period, TimeUnit.MILLISECONDS);
-        return scheduledExecutorService;
     }
 
     public static boolean runDeployedJobSpecCheckPeriod(DeployedJobSpecId distributedId, IHyracksClientConnection hcc,
@@ -207,8 +206,8 @@
     }
 
     public static void redeployJobSpec(EntityId entityId, String queryBodyString, MetadataProvider metadataProvider,
-            BADStatementExecutor badStatementExecutor, IHyracksClientConnection hcc,
-            IRequestParameters requestParameters, boolean useNewId) throws Exception {
+            BADQueryTranslator badStatementExecutor, IHyracksClientConnection hcc, IRequestParameters requestParameters,
+            boolean useNewId) throws Exception {
 
         ICcApplicationContext appCtx = metadataProvider.getApplicationContext();
         ActiveNotificationHandler activeEventHandler =
@@ -220,7 +219,7 @@
         }
 
         BADParserFactory factory = new BADParserFactory();
-        List<Statement> fStatements = factory.createParser(new StringReader(queryBodyString)).parse();
+        List<Statement> fStatements = factory.createParser(queryBodyString).parse();
         JobSpecification jobSpec = null;
         if (listener.getType().equals(DeployedJobSpecEventListener.PrecompiledType.PUSH_CHANNEL)
                 || listener.getType().equals(DeployedJobSpecEventListener.PrecompiledType.CHANNEL)) {
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/BADUtils.java b/asterix-bad/src/main/java/org/apache/asterix/bad/BADUtils.java
new file mode 100644
index 0000000..db1e06b
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/BADUtils.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.bad;
+
+import org.apache.asterix.metadata.declared.MetadataProvider;
+
+public class BADUtils {
+
+    public static MetadataProvider replicateMetadataProvider(MetadataProvider metadataProvider) {
+        MetadataProvider tempMdProvider = MetadataProvider.create(metadataProvider.getApplicationContext(),
+                metadataProvider.getDefaultDataverse());
+        tempMdProvider.setResultSetId(metadataProvider.getResultSetId());
+        tempMdProvider.getConfig().putAll(metadataProvider.getConfig());
+        tempMdProvider.setResultSetId(metadataProvider.getResultSetId());
+        tempMdProvider.setWriterFactory(metadataProvider.getWriterFactory());
+        tempMdProvider.setResultSerializerFactoryProvider(metadataProvider.getResultSerializerFactoryProvider());
+        tempMdProvider.setOutputFile(metadataProvider.getOutputFile());
+        return tempMdProvider;
+    }
+
+}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADLangExtension.java b/asterix-bad/src/main/java/org/apache/asterix/bad/extension/BADLangExtension.java
similarity index 98%
rename from asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADLangExtension.java
rename to asterix-bad/src/main/java/org/apache/asterix/bad/extension/BADLangExtension.java
index 4e94508..024f3e4 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADLangExtension.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/extension/BADLangExtension.java
@@ -16,11 +16,12 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.bad.lang;
+package org.apache.asterix.bad.extension;
 
 import java.util.List;
 
 import org.apache.asterix.algebra.base.ILangExtension;
+import org.apache.asterix.bad.lang.BADCompilationProvider;
 import org.apache.asterix.bad.metadata.AllChannelsSearchKey;
 import org.apache.asterix.bad.metadata.AllProceduresSearchKey;
 import org.apache.asterix.bad.metadata.Broker;
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataExtension.java b/asterix-bad/src/main/java/org/apache/asterix/bad/extension/BADMetadataExtension.java
similarity index 75%
rename from asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataExtension.java
rename to asterix-bad/src/main/java/org/apache/asterix/bad/extension/BADMetadataExtension.java
index 49371f9..d0639e2 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataExtension.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/extension/BADMetadataExtension.java
@@ -16,13 +16,17 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.bad.metadata;
+package org.apache.asterix.bad.extension;
 
 import java.rmi.RemoteException;
 import java.util.Arrays;
 import java.util.List;
 
 import org.apache.asterix.bad.BADConstants;
+import org.apache.asterix.bad.function.BADFunctions;
+import org.apache.asterix.bad.function.rewriter.BADFeedRewriter;
+import org.apache.asterix.bad.metadata.BADMetadataIndexes;
+import org.apache.asterix.bad.metadata.BADMetadataRecordTypes;
 import org.apache.asterix.common.api.ExtensionId;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.metadata.MetadataManager;
@@ -36,6 +40,9 @@
 import org.apache.asterix.metadata.entitytupletranslators.MetadataTupleTranslatorProvider;
 import org.apache.asterix.metadata.utils.MetadataConstants;
 import org.apache.asterix.metadata.utils.MetadataUtil;
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.typecomputer.impl.ABooleanTypeComputer;
+import org.apache.asterix.om.typecomputer.impl.ADateTimeTypeComputer;
 import org.apache.asterix.runtime.formats.NonTaggedDataFormat;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.api.application.INCServiceContext;
@@ -49,18 +56,21 @@
             NonTaggedDataFormat.class.getName(), MetadataUtil.PENDING_NO_OP);
 
     public static final Datatype BAD_SUBSCRIPTION_DATATYPE = new Datatype(MetadataConstants.METADATA_DATAVERSE_NAME,
-            BADConstants.ChannelSubscriptionsType, BADMetadataRecordTypes.channelSubscriptionsType, false);
+            BADConstants.METADATA_TYPENAME_SUBSCRIPTIONS, BADMetadataRecordTypes.channelSubscriptionsType, false);
     public static final Datatype BAD_RESULT_DATATYPE = new Datatype(MetadataConstants.METADATA_DATAVERSE_NAME,
             BADConstants.ChannelResultsType, BADMetadataRecordTypes.channelResultsType, false);
 
     public static final Datatype BAD_BROKER_DATATYPE = new Datatype(MetadataConstants.METADATA_DATAVERSE_NAME,
-            BADConstants.RECORD_TYPENAME_BROKER, BADMetadataRecordTypes.BROKER_RECORDTYPE, false);
+            BADConstants.METADATA_TYPENAME_BROKER, BADMetadataRecordTypes.BROKER_RECORDTYPE, false);
 
     public static final Datatype BAD_CHANNEL_DATATYPE = new Datatype(MetadataConstants.METADATA_DATAVERSE_NAME,
-            BADConstants.RECORD_TYPENAME_CHANNEL, BADMetadataRecordTypes.CHANNEL_RECORDTYPE, false);
+            BADConstants.METADATA_TYPENAME_CHANNEL, BADMetadataRecordTypes.CHANNEL_RECORDTYPE, false);
 
     public static final Datatype BAD_PROCEDURE_DATATYPE = new Datatype(MetadataConstants.METADATA_DATAVERSE_NAME,
-            BADConstants.RECORD_TYPENAME_PROCEDURE, BADMetadataRecordTypes.PROCEDURE_RECORDTYPE, false);
+            BADConstants.METADATA_TYPENAME_PROCEDURE, BADMetadataRecordTypes.PROCEDURE_RECORDTYPE, false);
+
+    public static final Datatype BAD_ACTIVE_RECORD_TYPE = new Datatype(MetadataConstants.METADATA_DATAVERSE_NAME,
+            BADConstants.RECORD_TYPENAME_ACTIVE_RECORD, BADMetadataRecordTypes.ACTIVE_RECORD_RECORD_TYPE, false);
 
     @Override
     public ExtensionId getId() {
@@ -112,6 +122,7 @@
                 MetadataManager.INSTANCE.addDatatype(mdTxnCtx, BAD_BROKER_DATATYPE);
                 MetadataManager.INSTANCE.addDatatype(mdTxnCtx, BAD_CHANNEL_DATATYPE);
                 MetadataManager.INSTANCE.addDatatype(mdTxnCtx, BAD_PROCEDURE_DATATYPE);
+                MetadataManager.INSTANCE.addDatatype(mdTxnCtx, BAD_ACTIVE_RECORD_TYPE);
                 // TODO prevent user from dropping these types
                 MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             } catch (Exception e) {
@@ -121,5 +132,13 @@
         }
         // local recovery?
         // nothing for now
+        BuiltinFunctions.addFunction(BADFunctions.CURRENT_CHANNEL_TIME, ADateTimeTypeComputer.INSTANCE, false);
+        BuiltinFunctions.addFunction(BADFunctions.PREVIOUS_CHANNEL_TIME, ADateTimeTypeComputer.INSTANCE, false);
+        BuiltinFunctions.addFunction(BADFunctions.IS_NEW, ABooleanTypeComputer.INSTANCE, true);
+
+        // to shadow the master feed rewriter
+        BuiltinFunctions.addPrivateFunction(BuiltinFunctions.FEED_COLLECT, BADFeedRewriter.INSTANCE, true);
+        BuiltinFunctions.addUnnestFun(BuiltinFunctions.FEED_COLLECT, false);
+        BuiltinFunctions.addDatasourceFunction(BuiltinFunctions.FEED_COLLECT, BADFeedRewriter.INSTANCE);
     }
 }
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorExtension.java b/asterix-bad/src/main/java/org/apache/asterix/bad/extension/BADQueryTranslatorExtension.java
similarity index 94%
rename from asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorExtension.java
rename to asterix-bad/src/main/java/org/apache/asterix/bad/extension/BADQueryTranslatorExtension.java
index 23b44a7..fcb071c 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorExtension.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/extension/BADQueryTranslatorExtension.java
@@ -16,11 +16,12 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.bad.lang;
+package org.apache.asterix.bad.extension;
 
 import java.util.List;
 
 import org.apache.asterix.app.cc.IStatementExecutorExtension;
+import org.apache.asterix.bad.lang.BADQueryTranslatorFactory;
 import org.apache.asterix.common.api.ExtensionId;
 import org.apache.asterix.translator.IStatementExecutorFactory;
 import org.apache.hyracks.algebricks.common.utils.Pair;
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/recovery/BADRecoveryExtension.java b/asterix-bad/src/main/java/org/apache/asterix/bad/extension/BADRecoveryExtension.java
similarity index 94%
rename from asterix-bad/src/main/java/org/apache/asterix/bad/recovery/BADRecoveryExtension.java
rename to asterix-bad/src/main/java/org/apache/asterix/bad/extension/BADRecoveryExtension.java
index 609b6de..995be2c 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/recovery/BADRecoveryExtension.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/extension/BADRecoveryExtension.java
@@ -16,11 +16,12 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.bad.recovery;
+package org.apache.asterix.bad.extension;
 
 import java.util.List;
 
 import org.apache.asterix.app.cc.IGlobalRecoveryExtension;
+import org.apache.asterix.bad.recovery.BADGlobalRecoveryManager;
 import org.apache.asterix.common.api.ExtensionId;
 import org.apache.asterix.common.cluster.IGlobalRecoveryManager;
 import org.apache.asterix.common.context.IStorageComponentProvider;
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/feed/operators/BADLSMPrimaryInsertOperatorDescriptor.java b/asterix-bad/src/main/java/org/apache/asterix/bad/feed/operators/BADLSMPrimaryInsertOperatorDescriptor.java
new file mode 100644
index 0000000..8a92c56
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/feed/operators/BADLSMPrimaryInsertOperatorDescriptor.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.bad.feed.operators;
+
+import org.apache.asterix.runtime.operators.LSMPrimaryInsertOperatorDescriptor;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
+import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
+import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+
+public class BADLSMPrimaryInsertOperatorDescriptor extends LSMPrimaryInsertOperatorDescriptor {
+    public BADLSMPrimaryInsertOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor outRecDesc,
+            int[] fieldPermutation, IIndexDataflowHelperFactory indexHelperFactory,
+            IIndexDataflowHelperFactory keyIndexHelperFactory,
+            IModificationOperationCallbackFactory modificationOpCallbackFactory,
+            ISearchOperationCallbackFactory searchOpCallbackFactory, int numOfPrimaryKeys, int[] filterFields) {
+        super(spec, outRecDesc, fieldPermutation, indexHelperFactory, keyIndexHelperFactory,
+                modificationOpCallbackFactory, searchOpCallbackFactory, numOfPrimaryKeys, filterFields);
+    }
+
+    @Override
+    public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
+        RecordDescriptor intputRecDesc = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
+        return new BADLSMPrimaryInsertOperatorNodePushable(ctx, partition, indexHelperFactory, keyIndexHelperFactory,
+                fieldPermutation, intputRecDesc, modCallbackFactory, searchOpCallbackFactory, numOfPrimaryKeys,
+                filterFields, sourceLoc);
+    }
+}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/feed/operators/BADLSMPrimaryInsertOperatorNodePushable.java b/asterix-bad/src/main/java/org/apache/asterix/bad/feed/operators/BADLSMPrimaryInsertOperatorNodePushable.java
new file mode 100644
index 0000000..813fd0b
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/feed/operators/BADLSMPrimaryInsertOperatorNodePushable.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.bad.feed.operators;
+
+import java.nio.ByteBuffer;
+
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.runtime.operators.LSMPrimaryInsertOperatorNodePushable;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
+import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
+import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+
+public class BADLSMPrimaryInsertOperatorNodePushable extends LSMPrimaryInsertOperatorNodePushable {
+
+    public BADLSMPrimaryInsertOperatorNodePushable(IHyracksTaskContext ctx, int partition,
+            IIndexDataflowHelperFactory indexHelperFactory, IIndexDataflowHelperFactory keyIndexHelperFactory,
+            int[] fieldPermutation, RecordDescriptor inputRecDesc,
+            IModificationOperationCallbackFactory modCallbackFactory,
+            ISearchOperationCallbackFactory searchCallbackFactory, int numOfPrimaryKeys, int[] filterFields,
+            SourceLocation sourceLoc) throws HyracksDataException {
+        super(ctx, partition, indexHelperFactory, keyIndexHelperFactory, fieldPermutation, inputRecDesc,
+                modCallbackFactory, searchCallbackFactory, numOfPrimaryKeys, filterFields, sourceLoc);
+    }
+
+    @Override
+    protected void beforeModification(ITupleReference tuple) {
+        if ((tuple.getFieldCount() == 3
+                && tuple.getFieldData(0)[tuple.getFieldStart(2)] == ATypeTag.SERIALIZED_RECORD_TYPE_TAG)
+                || (tuple.getFieldCount() == 4
+                        && tuple.getFieldData(0)[tuple.getFieldStart(2)] == ATypeTag.SERIALIZED_RECORD_TYPE_TAG)) {
+            int targetIdx = tuple.getFieldStart(2) + 14;
+            ByteBuffer tupleBuff = ByteBuffer.wrap(tuple.getFieldData(0));
+            tupleBuff.putLong(targetIdx, System.currentTimeMillis());
+        }
+    }
+}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/feed/operators/BADLSMPrimaryUpsertOperatorDescriptor.java b/asterix-bad/src/main/java/org/apache/asterix/bad/feed/operators/BADLSMPrimaryUpsertOperatorDescriptor.java
new file mode 100644
index 0000000..46f9bc8
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/feed/operators/BADLSMPrimaryUpsertOperatorDescriptor.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.bad.feed.operators;
+
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.runtime.operators.LSMPrimaryUpsertOperatorDescriptor;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
+import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
+import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
+import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallbackFactory;
+
+public class BADLSMPrimaryUpsertOperatorDescriptor extends LSMPrimaryUpsertOperatorDescriptor {
+    public BADLSMPrimaryUpsertOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor outRecDesc,
+            int[] fieldPermutation, IIndexDataflowHelperFactory indexHelperFactory,
+            IMissingWriterFactory missingWriterFactory,
+            IModificationOperationCallbackFactory modificationOpCallbackFactory,
+            ISearchOperationCallbackFactory searchOpCallbackFactory,
+            IFrameOperationCallbackFactory frameOpCallbackFactory, int numPrimaryKeys, ARecordType recordType,
+            int filterIndex, boolean hasSecondaries) {
+        super(spec, outRecDesc, fieldPermutation, indexHelperFactory, missingWriterFactory,
+                modificationOpCallbackFactory, searchOpCallbackFactory, frameOpCallbackFactory, numPrimaryKeys,
+                recordType, filterIndex, hasSecondaries);
+    }
+
+    @Override
+    public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
+        RecordDescriptor intputRecDesc = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
+        return new BADLSMPrimaryUpsertOperatorNodePushable(ctx, partition, indexHelperFactory, fieldPermutation,
+                intputRecDesc, modCallbackFactory, searchOpCallbackFactory, numPrimaryKeys, recordType, filterIndex,
+                frameOpCallbackFactory, missingWriterFactory, hasSecondaries);
+    }
+}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/feed/operators/BADLSMPrimaryUpsertOperatorNodePushable.java b/asterix-bad/src/main/java/org/apache/asterix/bad/feed/operators/BADLSMPrimaryUpsertOperatorNodePushable.java
new file mode 100644
index 0000000..b0c79d5
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/feed/operators/BADLSMPrimaryUpsertOperatorNodePushable.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.bad.feed.operators;
+
+import java.nio.ByteBuffer;
+
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.runtime.operators.LSMPrimaryUpsertOperatorNodePushable;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
+import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
+import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallbackFactory;
+
+public class BADLSMPrimaryUpsertOperatorNodePushable extends LSMPrimaryUpsertOperatorNodePushable {
+    public BADLSMPrimaryUpsertOperatorNodePushable(IHyracksTaskContext ctx, int partition,
+            IIndexDataflowHelperFactory indexHelperFactory, int[] fieldPermutation, RecordDescriptor inputRecDesc,
+            IModificationOperationCallbackFactory modCallbackFactory,
+            ISearchOperationCallbackFactory searchCallbackFactory, int numOfPrimaryKeys, ARecordType recordType,
+            int filterFieldIndex, IFrameOperationCallbackFactory frameOpCallbackFactory,
+            IMissingWriterFactory missingWriterFactory, boolean hasSecondaries) throws HyracksDataException {
+        super(ctx, partition, indexHelperFactory, fieldPermutation, inputRecDesc, modCallbackFactory,
+                searchCallbackFactory, numOfPrimaryKeys, recordType, filterFieldIndex, frameOpCallbackFactory,
+                missingWriterFactory, hasSecondaries);
+    }
+
+    @Override
+    protected void beforeModification(ITupleReference tuple) {
+        if ((tuple.getFieldCount() == 3
+                && tuple.getFieldData(0)[tuple.getFieldStart(2)] == ATypeTag.SERIALIZED_RECORD_TYPE_TAG)
+                || (tuple.getFieldCount() == 4
+                        && tuple.getFieldData(0)[tuple.getFieldStart(2)] == ATypeTag.SERIALIZED_RECORD_TYPE_TAG)) {
+            int targetIdx = tuple.getFieldStart(2) + 14;
+            ByteBuffer tupleBuff = ByteBuffer.wrap(tuple.getFieldData(0));
+            tupleBuff.putLong(targetIdx, System.currentTimeMillis());
+        }
+    }
+}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/function/BADFunctionCollection.java b/asterix-bad/src/main/java/org/apache/asterix/bad/function/BADFunctionCollection.java
new file mode 100644
index 0000000..15bb617
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/function/BADFunctionCollection.java
@@ -0,0 +1,37 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements. See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership. The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License. You may obtain a copy of the License at
+ *  *
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied. See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+
+package org.apache.asterix.bad.function;
+
+import java.util.ArrayList;
+
+import org.apache.asterix.om.functions.IFunctionCollection;
+import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
+
+public class BADFunctionCollection implements IFunctionCollection {
+
+    private final ArrayList<IFunctionDescriptorFactory> descriptorFactories = new ArrayList<>();
+
+    @Override
+    public void add(IFunctionDescriptorFactory descriptorFactory) {
+
+    }
+}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/function/BADFunctionRegistrant.java b/asterix-bad/src/main/java/org/apache/asterix/bad/function/BADFunctionRegistrant.java
new file mode 100644
index 0000000..15a5fc5
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/function/BADFunctionRegistrant.java
@@ -0,0 +1,34 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements. See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership. The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License. You may obtain a copy of the License at
+ *  *
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied. See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+package org.apache.asterix.bad.function;
+
+import org.apache.asterix.bad.function.runtime.CurrentChannelTimeDescriptor;
+import org.apache.asterix.bad.function.runtime.PreviousChannelTimeDescriptor;
+import org.apache.asterix.om.functions.IFunctionCollection;
+import org.apache.asterix.om.functions.IFunctionRegistrant;
+
+public class BADFunctionRegistrant implements IFunctionRegistrant {
+    @Override
+    public void register(IFunctionCollection fc) {
+        fc.add(PreviousChannelTimeDescriptor.FACTORY);
+        fc.add(CurrentChannelTimeDescriptor.FACTORY);
+    }
+}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/function/BADFunctions.java b/asterix-bad/src/main/java/org/apache/asterix/bad/function/BADFunctions.java
new file mode 100644
index 0000000..a342210
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/function/BADFunctions.java
@@ -0,0 +1,40 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements. See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership. The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License. You may obtain a copy of the License at
+ *  *
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied. See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+
+package org.apache.asterix.bad.function;
+
+import org.apache.asterix.common.functions.FunctionConstants;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+
+public class BADFunctions {
+
+    public static final FunctionIdentifier IS_NEW = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "is-new", 1);
+
+    public static final FunctionIdentifier PREVIOUS_CHANNEL_TIME =
+            new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "previous-channel-time", 1);
+
+    public static final FunctionIdentifier CURRENT_CHANNEL_TIME =
+            new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "current-channel-time", 1);
+
+    public static final FunctionIdentifier ACTIVE_TIMESTAMP =
+            new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "active-timestamp", 1);
+
+}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/function/rewriter/BADFeedRewriter.java b/asterix-bad/src/main/java/org/apache/asterix/bad/function/rewriter/BADFeedRewriter.java
new file mode 100644
index 0000000..7e25a23
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/function/rewriter/BADFeedRewriter.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.function.rewriter;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.metadata.DataverseName;
+import org.apache.asterix.external.feed.watch.FeedActivityDetails;
+import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;
+import org.apache.asterix.metadata.declared.DataSourceId;
+import org.apache.asterix.metadata.declared.FeedDataSource;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.Feed;
+import org.apache.asterix.metadata.entities.FeedConnection;
+import org.apache.asterix.metadata.entities.FeedPolicyEntity;
+import org.apache.asterix.metadata.entities.InternalDatasetDetails;
+import org.apache.asterix.metadata.feeds.BuiltinFeedPolicies;
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.functions.IFunctionToDataSourceRewriter;
+import org.apache.asterix.om.typecomputer.base.IResultTypeComputer;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.utils.ConstantExpressionUtil;
+import org.apache.asterix.optimizer.rules.UnnestToDataScanRule;
+import org.apache.asterix.translator.util.PlanTranslationUtil;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+
+public class BADFeedRewriter implements IFunctionToDataSourceRewriter, IResultTypeComputer {
+    public static final BADFeedRewriter INSTANCE = new BADFeedRewriter();
+
+    private BADFeedRewriter() {
+    }
+
+    @Override
+    public boolean rewrite(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+        AbstractFunctionCallExpression f = UnnestToDataScanRule.getFunctionCall(opRef);
+        UnnestOperator unnest = (UnnestOperator) opRef.getValue();
+        if (unnest.getPositionalVariable() != null) {
+            throw new CompilationException(ErrorCode.COMPILATION_ERROR, unnest.getSourceLocation(),
+                    "No positional variables are allowed over feeds.");
+        }
+        DataverseName dataverseName =
+                DataverseName.createFromCanonicalForm(ConstantExpressionUtil.getStringArgument(f, 0));
+        String sourceFeedName = ConstantExpressionUtil.getStringArgument(f, 1);
+        String getTargetFeed = ConstantExpressionUtil.getStringArgument(f, 2);
+        String subscriptionLocation = ConstantExpressionUtil.getStringArgument(f, 3);
+        String targetDataset = ConstantExpressionUtil.getStringArgument(f, 4);
+        String outputType = ConstantExpressionUtil.getStringArgument(f, 5);
+        MetadataProvider metadataProvider = (MetadataProvider) context.getMetadataProvider();
+        DataSourceId asid = new DataSourceId(dataverseName, getTargetFeed);
+        String policyName = (String) metadataProvider.getConfig().get(FeedActivityDetails.FEED_POLICY_NAME);
+        FeedPolicyEntity policy = metadataProvider.findFeedPolicy(dataverseName, policyName);
+        if (policy == null) {
+            policy = BuiltinFeedPolicies.getFeedPolicy(policyName);
+            if (policy == null) {
+                throw new CompilationException(ErrorCode.COMPILATION_ERROR, unnest.getSourceLocation(),
+                        "Unknown feed policy:" + policyName);
+            }
+        }
+        ArrayList<LogicalVariable> feedDataScanOutputVariables = new ArrayList<>();
+        String csLocations = (String) metadataProvider.getConfig().get(FeedActivityDetails.COLLECT_LOCATIONS);
+        List<LogicalVariable> pkVars = new ArrayList<>();
+        FeedDataSource ds = createFeedDataSource(asid, targetDataset, sourceFeedName, subscriptionLocation,
+                metadataProvider, policy, outputType, csLocations, unnest.getVariable(), context, pkVars);
+        // The order for feeds is <Record-Meta-PK>
+        feedDataScanOutputVariables.add(unnest.getVariable());
+        // Does it produce meta?
+        if (ds.hasMeta()) {
+            feedDataScanOutputVariables.add(context.newVar());
+        }
+        // Does it produce pk?
+        if (ds.isChange()) {
+            feedDataScanOutputVariables.addAll(pkVars);
+        }
+        DataSourceScanOperator scan = new DataSourceScanOperator(feedDataScanOutputVariables, ds);
+        scan.setSourceLocation(unnest.getSourceLocation());
+        List<Mutable<ILogicalOperator>> scanInpList = scan.getInputs();
+        scanInpList.addAll(unnest.getInputs());
+        opRef.setValue(scan);
+        context.computeAndSetTypeEnvironmentForOperator(scan);
+        return true;
+    }
+
+    private FeedDataSource createFeedDataSource(DataSourceId id, String targetDataset, String sourceFeedName,
+            String subscriptionLocation, MetadataProvider metadataProvider, FeedPolicyEntity feedPolicy,
+            String outputType, String locations, LogicalVariable recordVar, IOptimizationContext context,
+            List<LogicalVariable> pkVars) throws AlgebricksException {
+        Dataset dataset = metadataProvider.findDataset(id.getDataverseName(), targetDataset);
+        ARecordType feedOutputType = (ARecordType) metadataProvider.findType(id.getDataverseName(), outputType);
+        Feed sourceFeed = metadataProvider.findFeed(id.getDataverseName(), sourceFeedName);
+        FeedConnection feedConnection =
+                metadataProvider.findFeedConnection(id.getDataverseName(), sourceFeedName, targetDataset);
+        // Is a change feed?
+        List<IAType> pkTypes = null;
+        List<List<String>> partitioningKeys = null;
+        List<Integer> keySourceIndicator = null;
+
+        List<ScalarFunctionCallExpression> keyAccessScalarFunctionCallExpression;
+        if (ExternalDataUtils.isChangeFeed(sourceFeed.getConfiguration())) {
+            List<Mutable<ILogicalExpression>> keyAccessExpression = new ArrayList<>();
+            keyAccessScalarFunctionCallExpression = new ArrayList<>();
+            pkTypes = ((InternalDatasetDetails) dataset.getDatasetDetails()).getPrimaryKeyType();
+            partitioningKeys = ((InternalDatasetDetails) dataset.getDatasetDetails()).getPartitioningKey();
+            if (dataset.hasMetaPart()) {
+                keySourceIndicator = ((InternalDatasetDetails) dataset.getDatasetDetails()).getKeySourceIndicator();
+            }
+            for (int i = 0; i < partitioningKeys.size(); i++) {
+                List<String> key = partitioningKeys.get(i);
+                if (keySourceIndicator == null || keySourceIndicator.get(i).intValue() == 0) {
+                    PlanTranslationUtil.prepareVarAndExpression(key, recordVar, pkVars, keyAccessExpression, null,
+                            context, null);
+                } else {
+                    PlanTranslationUtil.prepareMetaKeyAccessExpression(key, recordVar, keyAccessExpression, pkVars,
+                            null, context, null);
+                }
+            }
+            keyAccessExpression.forEach(
+                    expr -> keyAccessScalarFunctionCallExpression.add((ScalarFunctionCallExpression) expr.getValue()));
+        } else {
+            keyAccessScalarFunctionCallExpression = null;
+        }
+        FeedDataSource feedDataSource = new FeedDataSource(sourceFeed, id, targetDataset, feedOutputType, null, pkTypes,
+                keyAccessScalarFunctionCallExpression, sourceFeed.getFeedId(),
+                FeedRuntimeType.valueOf(subscriptionLocation), locations.split(","), context.getComputationNodeDomain(),
+                feedConnection);
+        feedDataSource.getProperties().put(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY, feedPolicy);
+        return feedDataSource;
+    }
+
+    @Override
+    public IAType computeType(ILogicalExpression expression, IVariableTypeEnvironment env, IMetadataProvider<?, ?> mp)
+            throws AlgebricksException {
+        AbstractFunctionCallExpression f = (AbstractFunctionCallExpression) expression;
+        if (f.getArguments().size() != BuiltinFunctions.FEED_COLLECT.getArity()) {
+            throw new AlgebricksException("Incorrect number of arguments -> arity is "
+                    + BuiltinFunctions.FEED_COLLECT.getArity() + ", not " + f.getArguments().size());
+        }
+        DataverseName dataverseName =
+                DataverseName.createFromCanonicalForm(ConstantExpressionUtil.getStringArgument(f, 0));
+        String outputTypeName = ConstantExpressionUtil.getStringArgument(f, 5);
+        if (outputTypeName == null) {
+            return BuiltinType.ANY;
+        }
+        MetadataProvider metadata = (MetadataProvider) mp;
+        IAType outputType = metadata.findType(dataverseName, outputTypeName);
+        if (outputType == null) {
+            throw new AlgebricksException("Unknown type " + outputTypeName);
+        }
+        return outputType;
+    }
+}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/function/runtime/CurrentChannelTimeDescriptor.java b/asterix-bad/src/main/java/org/apache/asterix/bad/function/runtime/CurrentChannelTimeDescriptor.java
new file mode 100644
index 0000000..f3392ad
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/function/runtime/CurrentChannelTimeDescriptor.java
@@ -0,0 +1,116 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements. See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership. The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License. You may obtain a copy of the License at
+ *  *
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied. See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+package org.apache.asterix.bad.function.runtime;
+
+import java.io.DataOutput;
+
+import org.apache.asterix.bad.function.BADFunctions;
+import org.apache.asterix.bad.runtime.ActiveTimestampManager;
+import org.apache.asterix.bad.runtime.ActiveTimestampState;
+import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
+import org.apache.asterix.om.base.ADateTime;
+import org.apache.asterix.om.base.AMutableDateTime;
+import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.runtime.evaluators.base.AbstractScalarFunctionDynamicDescriptor;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.runtime.base.IEvaluatorContext;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksJobletContext;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class CurrentChannelTimeDescriptor extends AbstractScalarFunctionDynamicDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    private final static FunctionIdentifier FID = BADFunctions.CURRENT_CHANNEL_TIME;
+    private final static long ActiveStateKey = 1024;
+
+    public final static IFunctionDescriptorFactory FACTORY = CurrentChannelTimeDescriptor::new;
+
+    private CurrentChannelTimeDescriptor() {
+    }
+
+    @Override
+    public IScalarEvaluatorFactory createEvaluatorFactory(IScalarEvaluatorFactory[] args) {
+        return new IScalarEvaluatorFactory() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public IScalarEvaluator createScalarEvaluator(final IEvaluatorContext ctx) throws HyracksDataException {
+                return new IScalarEvaluator() {
+
+                    private ArrayBackedValueStorage resultStorage = new ArrayBackedValueStorage();
+                    private DataOutput out = resultStorage.getDataOutput();
+
+                    private final IPointable argPtr0 = new VoidPointable();
+                    private final IScalarEvaluator eval0 = args[0].createScalarEvaluator(ctx);
+
+                    @SuppressWarnings("unchecked")
+                    private ISerializerDeserializer<ADateTime> datetimeSerde =
+                            SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ADATETIME);
+                    private AMutableDateTime aDateTime = new AMutableDateTime(0);
+                    private IHyracksJobletContext jobletCtx = ctx.getTaskContext().getJobletContext();
+
+                    @Override
+                    public void evaluate(IFrameTupleReference tuple, IPointable result) throws HyracksDataException {
+                        ActiveTimestampState existingState =
+                                (ActiveTimestampState) ctx.getTaskContext().getStateObject(ActiveStateKey);
+                        resultStorage.reset();
+
+                        if (existingState == null) {
+                            eval0.evaluate(tuple, argPtr0);
+                            String channelName = new String(argPtr0.getByteArray(), argPtr0.getStartOffset() + 2,
+                                    argPtr0.getLength() - 2);
+                            ActiveTimestampManager.progressChannelExecutionTimestamps(jobletCtx.getJobId(), channelName,
+                                    jobletCtx.getServiceContext().getNodeId());
+                            String nodeId = jobletCtx.getServiceContext().getNodeId();
+                            long previousChannelTime =
+                                    ActiveTimestampManager.getPreviousChannelExecutionTimestamp(channelName, nodeId);
+                            long currentChannelTime =
+                                    ActiveTimestampManager.getCurrentChannelExecutionTimestamp(channelName, nodeId);
+                            existingState = new ActiveTimestampState(jobletCtx.getJobId(), ActiveStateKey);
+                            existingState.setExecutionTime(previousChannelTime, currentChannelTime);
+                            ctx.getTaskContext().setStateObject(existingState);
+                        }
+                        aDateTime.setValue(existingState.getCurrentChannelExecutionTime());
+                        datetimeSerde.serialize(aDateTime, out);
+                        result.set(resultStorage);
+                    }
+                };
+            }
+        };
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.asterix.om.functions.IFunctionDescriptor#getIdentifier()
+     */
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return FID;
+    }
+
+}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/function/runtime/PreviousChannelTimeDescriptor.java b/asterix-bad/src/main/java/org/apache/asterix/bad/function/runtime/PreviousChannelTimeDescriptor.java
new file mode 100644
index 0000000..f4dd1f5
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/function/runtime/PreviousChannelTimeDescriptor.java
@@ -0,0 +1,118 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements. See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership. The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License. You may obtain a copy of the License at
+ *  *
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied. See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+package org.apache.asterix.bad.function.runtime;
+
+import java.io.DataOutput;
+
+import org.apache.asterix.bad.function.BADFunctions;
+import org.apache.asterix.bad.runtime.ActiveTimestampManager;
+import org.apache.asterix.bad.runtime.ActiveTimestampState;
+import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
+import org.apache.asterix.om.base.ADateTime;
+import org.apache.asterix.om.base.AMutableDateTime;
+import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.runtime.evaluators.base.AbstractScalarFunctionDynamicDescriptor;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.runtime.base.IEvaluatorContext;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksJobletContext;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class PreviousChannelTimeDescriptor extends AbstractScalarFunctionDynamicDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    private final static FunctionIdentifier FID = BADFunctions.PREVIOUS_CHANNEL_TIME;
+    private final static long ActiveStateKey = 1024;
+
+    public final static IFunctionDescriptorFactory FACTORY = PreviousChannelTimeDescriptor::new;
+
+    private PreviousChannelTimeDescriptor() {
+    }
+
+    @Override
+    public IScalarEvaluatorFactory createEvaluatorFactory(IScalarEvaluatorFactory[] args) {
+        return new IScalarEvaluatorFactory() {
+
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public IScalarEvaluator createScalarEvaluator(IEvaluatorContext ctx) throws HyracksDataException {
+                return new IScalarEvaluator() {
+
+                    private ArrayBackedValueStorage resultStorage = new ArrayBackedValueStorage();
+                    private DataOutput out = resultStorage.getDataOutput();
+                    private final IPointable argPtr0 = new VoidPointable();
+                    private final IScalarEvaluator eval0 = args[0].createScalarEvaluator(ctx);
+
+                    @SuppressWarnings("unchecked")
+                    private ISerializerDeserializer<ADateTime> datetimeSerde =
+                            SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ADATETIME);
+                    private AMutableDateTime aDateTime = new AMutableDateTime(0);
+                    private IHyracksJobletContext jobletCtx = ctx.getTaskContext().getJobletContext();
+
+                    @Override
+                    public void evaluate(IFrameTupleReference tuple, IPointable result) throws HyracksDataException {
+
+                        ActiveTimestampState existingState =
+                                (ActiveTimestampState) ctx.getTaskContext().getStateObject(ActiveStateKey);
+                        resultStorage.reset();
+
+                        if (existingState == null) {
+                            eval0.evaluate(tuple, argPtr0);
+                            // offset the type
+                            String channelName = new String(argPtr0.getByteArray(), argPtr0.getStartOffset() + 2,
+                                    argPtr0.getLength() - 2);
+                            String nodeId = jobletCtx.getServiceContext().getNodeId();
+                            ActiveTimestampManager.progressChannelExecutionTimestamps(jobletCtx.getJobId(), channelName,
+                                    nodeId);
+                            long previousChannelTime =
+                                    ActiveTimestampManager.getPreviousChannelExecutionTimestamp(channelName, nodeId);
+                            long currentChannelTime =
+                                    ActiveTimestampManager.getCurrentChannelExecutionTimestamp(channelName, nodeId);
+                            existingState = new ActiveTimestampState(jobletCtx.getJobId(), ActiveStateKey);
+                            existingState.setExecutionTime(previousChannelTime, currentChannelTime);
+                            ctx.getTaskContext().setStateObject(existingState);
+                        }
+                        aDateTime.setValue(existingState.getPreviousChannelExecutionTime());
+                        datetimeSerde.serialize(aDateTime, out);
+                        result.set(resultStorage);
+                    }
+                };
+            }
+        };
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.asterix.om.functions.IFunctionDescriptor#getIdentifier()
+     */
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return FID;
+    }
+
+}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADCompilationProvider.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADCompilationProvider.java
index 99f0d66..31a185a 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADCompilationProvider.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADCompilationProvider.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.bad.lang;
 
+import org.apache.asterix.algebra.base.ILangExpressionToPlanTranslatorFactory;
 import org.apache.asterix.compiler.provider.IRuleSetFactory;
 import org.apache.asterix.compiler.provider.SqlppCompilationProvider;
 import org.apache.asterix.lang.common.base.IParserFactory;
@@ -34,4 +35,8 @@
         return new BADRuleSetFactory();
     }
 
+    @Override
+    public ILangExpressionToPlanTranslatorFactory getExpressionToPlanTranslatorFactory() {
+        return new BADExpressionToPlanTranslatorFactory();
+    }
 }
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADExpressionToPlanTranslator.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADExpressionToPlanTranslator.java
new file mode 100644
index 0000000..23d10c5
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADExpressionToPlanTranslator.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.bad.lang;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.algebra.operators.CommitOperator;
+import org.apache.asterix.lang.common.base.Expression;
+import org.apache.asterix.lang.common.struct.VarIdentifier;
+import org.apache.asterix.metadata.declared.DatasetDataSource;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.om.base.ADateTime;
+import org.apache.asterix.om.base.ARecord;
+import org.apache.asterix.om.base.IAObject;
+import org.apache.asterix.om.constants.AsterixConstantValue;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.translator.CompiledStatements;
+import org.apache.asterix.translator.SqlppExpressionToPlanTranslator;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IAlgebricksConstantValue;
+import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.api.result.IResultMetadata;
+
+/**
+ * This class overrides the SqlppExpressionToPlanTranslator in AsterixDB to allow insert/upsert to
+ * datasets with meta records (active datasets). If inserting/upserting into an active dataset, the plan
+ * translator would attach a dummy active records containing the current timestamp as the active timestamp.
+ * In the case of data feeds, this active timestamp would then be updated by
+ * BADLSMPrimaryInsertOperatorNodePushable/BADLSMPrimaryUpsertOperatorNodePushable.
+ * Updates to SqlppExpressionToPlanTranslator in the AsterixDB master needs to be propagated into this class when
+ * bringing the BAD codebase to latest master.
+ *
+ * IMPORTANT NOTE: Currently, we assume active datasets are the only user of the datasets with meta records.
+ * If one want to use meta datasets in the BAD branch in the future, this needs to be refactored.
+ */
+public class BADExpressionToPlanTranslator extends SqlppExpressionToPlanTranslator {
+
+    public BADExpressionToPlanTranslator(MetadataProvider metadataProvider, int currentVarCounter,
+            Map<VarIdentifier, IAObject> externalVars) throws AlgebricksException {
+        super(metadataProvider, currentVarCounter, externalVars);
+    }
+
+    @Override
+    protected ILogicalOperator translateDelete(DatasetDataSource targetDatasource, Mutable<ILogicalExpression> varRef,
+            List<Mutable<ILogicalExpression>> varRefsForLoading,
+            List<Mutable<ILogicalExpression>> additionalFilteringExpressions, ILogicalOperator inputOp,
+            CompiledStatements.ICompiledDmlStatement stmt) throws AlgebricksException {
+        SourceLocation sourceLoc = stmt.getSourceLocation();
+        InsertDeleteUpsertOperator deleteOp;
+        if (!targetDatasource.getDataset().hasMetaPart()) {
+            deleteOp = new InsertDeleteUpsertOperator(targetDatasource, varRef, varRefsForLoading,
+                    InsertDeleteUpsertOperator.Kind.DELETE, false);
+        } else {
+            // prepare meta record
+            IAType metaType = metadataProvider.findMetaType(targetDatasource.getDataset());
+            LogicalVariable metaVar = context.newVar();
+            AssignOperator metaVariableAssignOp =
+                    new AssignOperator(metaVar, new MutableObject<>(makeMetaRecordExpr(metaType)));
+            metaVariableAssignOp.getInputs().add(new MutableObject<>(inputOp));
+            metaVariableAssignOp.setSourceLocation(sourceLoc);
+            // create insert op uses meta record
+            deleteOp = new InsertDeleteUpsertOperator(targetDatasource, varRef, varRefsForLoading,
+                    Collections.singletonList(new MutableObject<>(new VariableReferenceExpression(metaVar))),
+                    InsertDeleteUpsertOperator.Kind.DELETE, false);
+            // change current inputOp to be meta op
+            inputOp = metaVariableAssignOp;
+        }
+        deleteOp.setAdditionalFilteringExpressions(additionalFilteringExpressions);
+        deleteOp.getInputs().add(new MutableObject<>(inputOp));
+        deleteOp.setSourceLocation(sourceLoc);
+        DelegateOperator leafOperator = new DelegateOperator(new CommitOperator(true));
+        leafOperator.getInputs().add(new MutableObject<>(deleteOp));
+        leafOperator.setSourceLocation(sourceLoc);
+        return leafOperator;
+    }
+
+    @Override
+    protected ILogicalOperator translateUpsert(DatasetDataSource targetDatasource, Mutable<ILogicalExpression> varRef,
+            List<Mutable<ILogicalExpression>> varRefsForLoading, List<Mutable<ILogicalExpression>> filterExprs,
+            ILogicalOperator pkeyAssignOp, List<String> additionalFilteringField, LogicalVariable unnestVar,
+            ILogicalOperator topOp, List<Mutable<ILogicalExpression>> exprs, LogicalVariable resVar,
+            AssignOperator additionalFilteringAssign, CompiledStatements.ICompiledDmlStatement stmt,
+            IResultMetadata resultMetadata) throws AlgebricksException {
+        SourceLocation sourceLoc = stmt.getSourceLocation();
+        CompiledStatements.CompiledUpsertStatement compiledUpsert = (CompiledStatements.CompiledUpsertStatement) stmt;
+        Expression returnExpression = compiledUpsert.getReturnExpression();
+        InsertDeleteUpsertOperator upsertOp;
+        ILogicalOperator rootOperator;
+
+        ARecordType recordType = (ARecordType) targetDatasource.getItemType();
+
+        if (targetDatasource.getDataset().hasMetaPart()) {
+            IAType metaType = metadataProvider.findMetaType(targetDatasource.getDataset());
+            LogicalVariable metaVar = context.newVar();
+            AssignOperator metaVariableAssignOp =
+                    new AssignOperator(metaVar, new MutableObject<>(makeMetaRecordExpr(metaType)));
+            metaVariableAssignOp.getInputs().add(new MutableObject<>(pkeyAssignOp));
+            pkeyAssignOp = metaVariableAssignOp;
+
+            metaVariableAssignOp.setSourceLocation(sourceLoc);
+            List<Mutable<ILogicalExpression>> metaExprs = new ArrayList<>(1);
+            VariableReferenceExpression metaVarRef = new VariableReferenceExpression(metaVar);
+            metaExprs.add(new MutableObject<>(metaVarRef));
+            upsertOp = new InsertDeleteUpsertOperator(targetDatasource, varRef, varRefsForLoading, metaExprs,
+                    InsertDeleteUpsertOperator.Kind.UPSERT, false);
+
+            // set previous meta vars
+            List<LogicalVariable> metaVars = new ArrayList<>();
+            metaVars.add(context.newVar());
+            upsertOp.setPrevAdditionalNonFilteringVars(metaVars);
+            List<Object> metaTypes = new ArrayList<>();
+            metaTypes.add(targetDatasource.getMetaItemType());
+            upsertOp.setPrevAdditionalNonFilteringTypes(metaTypes);
+        } else {
+            upsertOp = new InsertDeleteUpsertOperator(targetDatasource, varRef, varRefsForLoading,
+                    InsertDeleteUpsertOperator.Kind.UPSERT, false);
+            // Create and add a new variable used for representing the original record
+            if (additionalFilteringField != null) {
+                upsertOp.setPrevFilterVar(context.newVar());
+                upsertOp.setPrevFilterType(recordType.getFieldType(additionalFilteringField.get(0)));
+            }
+        }
+        // Create and add a new variable used for representing the original record
+        upsertOp.setUpsertIndicatorVar(context.newVar());
+        upsertOp.setUpsertIndicatorVarType(BuiltinType.ABOOLEAN);
+        upsertOp.setPrevRecordVar(context.newVar());
+        upsertOp.setPrevRecordType(recordType);
+        upsertOp.setSourceLocation(sourceLoc);
+        upsertOp.setAdditionalFilteringExpressions(filterExprs);
+        upsertOp.getInputs().add(new MutableObject<>(pkeyAssignOp));
+
+        // Set up delegate operator
+        DelegateOperator delegateOperator = new DelegateOperator(new CommitOperator(returnExpression == null));
+        delegateOperator.getInputs().add(new MutableObject<>(upsertOp));
+        delegateOperator.setSourceLocation(sourceLoc);
+        rootOperator = delegateOperator;
+
+        // Compiles the return expression.
+        return processReturningExpression(rootOperator, upsertOp, compiledUpsert, resultMetadata);
+    }
+
+    @Override
+    protected ILogicalOperator translateInsert(DatasetDataSource targetDatasource, Mutable<ILogicalExpression> varRef,
+            List<Mutable<ILogicalExpression>> varRefsForLoading, List<Mutable<ILogicalExpression>> filterExprs,
+            ILogicalOperator inputOp, CompiledStatements.ICompiledDmlStatement stmt, IResultMetadata resultMetadata)
+            throws AlgebricksException {
+        SourceLocation sourceLoc = stmt.getSourceLocation();
+
+        InsertDeleteUpsertOperator insertOp;
+        if (!targetDatasource.getDataset().hasMetaPart()) {
+            insertOp = new InsertDeleteUpsertOperator(targetDatasource, varRef, varRefsForLoading,
+                    InsertDeleteUpsertOperator.Kind.INSERT, false);
+        } else {
+            // prepare meta record
+            IAType metaType = metadataProvider.findMetaType(targetDatasource.getDataset());
+            LogicalVariable metaVar = context.newVar();
+            AssignOperator metaVariableAssignOp =
+                    new AssignOperator(metaVar, new MutableObject<>(makeMetaRecordExpr(metaType)));
+            metaVariableAssignOp.getInputs().add(new MutableObject<>(inputOp));
+            metaVariableAssignOp.setSourceLocation(sourceLoc);
+            // create insert op uses meta record
+            insertOp = new InsertDeleteUpsertOperator(targetDatasource, varRef, varRefsForLoading,
+                    Collections.singletonList(new MutableObject<>(new VariableReferenceExpression(metaVar))),
+                    InsertDeleteUpsertOperator.Kind.INSERT, false);
+            // change current inputOp to be meta op
+            inputOp = metaVariableAssignOp;
+        }
+        insertOp.setAdditionalFilteringExpressions(filterExprs);
+        insertOp.getInputs().add(new MutableObject<>(inputOp));
+        insertOp.setSourceLocation(sourceLoc);
+
+        // Adds the commit operator.
+        CompiledStatements.CompiledInsertStatement compiledInsert = (CompiledStatements.CompiledInsertStatement) stmt;
+        Expression returnExpression = compiledInsert.getReturnExpression();
+        DelegateOperator rootOperator = new DelegateOperator(new CommitOperator(returnExpression == null));
+        rootOperator.getInputs().add(new MutableObject<>(insertOp));
+        rootOperator.setSourceLocation(sourceLoc);
+
+        // Compiles the return expression.
+        return processReturningExpression(rootOperator, insertOp, compiledInsert, resultMetadata);
+    }
+
+    private ILogicalExpression makeMetaRecordExpr(IAType metaRecordType) {
+        ARecord metaRecord =
+                new ARecord((ARecordType) metaRecordType, new IAObject[] { new ADateTime(System.currentTimeMillis()) });
+        IAlgebricksConstantValue metaConstantVal = new AsterixConstantValue(metaRecord);
+        ILogicalExpression expr = new ConstantExpression(metaConstantVal);
+        return expr;
+    }
+}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADExpressionToPlanTranslatorFactory.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADExpressionToPlanTranslatorFactory.java
new file mode 100644
index 0000000..68a50ee
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADExpressionToPlanTranslatorFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.bad.lang;
+
+import java.util.Map;
+
+import org.apache.asterix.algebra.base.ILangExpressionToPlanTranslator;
+import org.apache.asterix.algebra.base.ILangExpressionToPlanTranslatorFactory;
+import org.apache.asterix.lang.common.struct.VarIdentifier;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.om.base.IAObject;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+
+public class BADExpressionToPlanTranslatorFactory implements ILangExpressionToPlanTranslatorFactory {
+    @Override
+    public ILangExpressionToPlanTranslator createExpressionToPlanTranslator(MetadataProvider metadataProvider,
+            int currentVarCounter, Map<VarIdentifier, IAObject> externalVars) throws AlgebricksException {
+        return new BADExpressionToPlanTranslator(metadataProvider, currentVarCounter, externalVars);
+    }
+}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADLangUtils.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADLangUtils.java
new file mode 100644
index 0000000..a8301a1
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADLangUtils.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.bad.lang;
+
+import java.net.URI;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.client.methods.RequestBuilder;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+
+public class BADLangUtils {
+
+    private static final org.apache.log4j.Logger LOGGER = org.apache.log4j.Logger.getLogger(BADLangUtils.class);
+
+    public static int executeStatement(String host, String stmt) throws Exception {
+        URI badURI = URI.create("http://" + host + ":19002/query/service");
+        RequestBuilder requestBuilder = RequestBuilder.post(badURI);
+        requestBuilder.addParameter("statement", stmt);
+        try {
+            HttpResponse response = submitRequest(requestBuilder.build());
+            if (response.getStatusLine().getStatusCode() != 200) {
+                throw new AlgebricksException("Connecting to " + host + " failed");
+            }
+            return response.getStatusLine().getStatusCode();
+        } catch (Exception e) {
+            LOGGER.error("Statement Failed at " + host);
+            throw e;
+        }
+    }
+
+    private static HttpResponse submitRequest(HttpUriRequest request) throws Exception {
+        ExecutorService executor = Executors.newSingleThreadExecutor();
+        CloseableHttpClient client = HttpClients.createDefault();
+        Future<HttpResponse> response = executor.submit(() -> {
+            try {
+                return client.execute(request);
+            } catch (Exception e) {
+                throw e;
+            }
+        });
+        try {
+            return response.get();
+        } catch (Exception e) {
+            client.close();
+            throw e;
+        } finally {
+            executor.shutdownNow();
+        }
+    }
+}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslator.java
similarity index 70%
rename from asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
rename to asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslator.java
index 3cf2ce6..a0d33a1 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslator.java
@@ -20,14 +20,18 @@
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ExecutorService;
 
 import org.apache.asterix.app.active.ActiveNotificationHandler;
 import org.apache.asterix.app.translator.QueryTranslator;
+import org.apache.asterix.bad.BADConstants;
 import org.apache.asterix.bad.BADJobService;
+import org.apache.asterix.bad.extension.BADLangExtension;
 import org.apache.asterix.bad.lang.statement.BrokerDropStatement;
 import org.apache.asterix.bad.lang.statement.ChannelDropStatement;
 import org.apache.asterix.bad.lang.statement.ProcedureDropStatement;
+import org.apache.asterix.bad.metadata.BADMetadataProvider;
 import org.apache.asterix.bad.metadata.Broker;
 import org.apache.asterix.bad.metadata.Channel;
 import org.apache.asterix.bad.metadata.DeployedJobSpecEventListener;
@@ -38,18 +42,24 @@
 import org.apache.asterix.common.functions.FunctionSignature;
 import org.apache.asterix.common.metadata.DataverseName;
 import org.apache.asterix.compiler.provider.ILangCompilationProvider;
+import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.lang.common.base.Statement;
+import org.apache.asterix.lang.common.statement.CreateFeedStatement;
 import org.apache.asterix.lang.common.statement.CreateIndexStatement;
 import org.apache.asterix.lang.common.statement.DataverseDropStatement;
 import org.apache.asterix.lang.common.statement.DropDatasetStatement;
 import org.apache.asterix.lang.common.statement.FunctionDropStatement;
 import org.apache.asterix.lang.common.statement.IndexDropStatement;
+import org.apache.asterix.lang.common.statement.StartFeedStatement;
+import org.apache.asterix.lang.common.statement.StopFeedStatement;
 import org.apache.asterix.lang.common.struct.Identifier;
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.MetadataTransactionContext;
 import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.asterix.metadata.entities.Dataverse;
+import org.apache.asterix.metadata.entities.Feed;
 import org.apache.asterix.metadata.entities.Function;
+import org.apache.asterix.metadata.feeds.FeedMetadataUtil;
 import org.apache.asterix.translator.IRequestParameters;
 import org.apache.asterix.translator.SessionOutput;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -57,9 +67,9 @@
 import org.apache.hyracks.algebricks.common.utils.Triple;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 
-public class BADStatementExecutor extends QueryTranslator {
+public class BADQueryTranslator extends QueryTranslator {
 
-    public BADStatementExecutor(ICcApplicationContext appCtx, List<Statement> statements, SessionOutput output,
+    public BADQueryTranslator(ICcApplicationContext appCtx, List<Statement> statements, SessionOutput output,
             ILangCompilationProvider compliationProvider, ExecutorService executorService, IResponsePrinter printer) {
         super(appCtx, statements, output, compliationProvider, executorService, printer);
     }
@@ -80,7 +90,6 @@
                     if (!checkAll) {
                         return new Pair<>(channelsUsingDataset, proceduresUsingDataset);
                     }
-
                 }
             }
 
@@ -103,8 +112,7 @@
     }
 
     private Pair<List<Channel>, List<Procedure>> checkIfFunctionIsInUse(MetadataTransactionContext mdTxnCtx,
-            DataverseName dvId, String function, String arity, boolean checkAll)
-            throws CompilationException, AlgebricksException {
+            DataverseName dvId, String function, String arity, boolean checkAll) throws AlgebricksException {
         List<Channel> channelsUsingFunction = new ArrayList<>();
         List<Procedure> proceduresUsingFunction = new ArrayList<>();
 
@@ -142,7 +150,7 @@
     }
 
     private void throwErrorIfDatasetUsed(MetadataTransactionContext mdTxnCtx, DataverseName dataverse, String dataset)
-            throws CompilationException, AlgebricksException {
+            throws AlgebricksException {
         Pair<List<Channel>, List<Procedure>> dependents = checkIfDatasetIsInUse(mdTxnCtx, dataverse, dataset, false);
         if (dependents.first.size() > 0) {
             throw new CompilationException("Cannot alter dataset " + dataverse + "." + dataset + ". "
@@ -155,7 +163,7 @@
     }
 
     private void throwErrorIfFunctionUsed(MetadataTransactionContext mdTxnCtx, DataverseName dataverse, String function,
-            String arity, FunctionSignature sig) throws CompilationException, AlgebricksException {
+            String arity, FunctionSignature sig) throws AlgebricksException {
         Pair<List<Channel>, List<Procedure>> dependents =
                 checkIfFunctionIsInUse(mdTxnCtx, dataverse, function, arity, false);
         String errorStart = sig != null ? "Cannot drop function " + sig + "." : "Cannot drop index.";
@@ -170,6 +178,127 @@
     }
 
     @Override
+    protected void handleCreateFeedStatement(MetadataProvider metadataProvider, Statement stmt) throws Exception {
+        CreateFeedStatement cfs = (CreateFeedStatement) stmt;
+        Map<String, String> feedConfig = cfs.getConfiguration();
+        if (feedConfig.containsKey(BADConstants.BAD_FEED_FIELD_NAME_HOST)) {
+
+            // check parameters
+            if (!feedConfig.containsKey(BADConstants.BAD_FEED_FIELD_NAME_HOST)
+                    || !feedConfig.containsKey(BADConstants.BAD_FEED_FIELD_NAME_CHANNEL)
+                    || !feedConfig.containsKey(BADConstants.BAD_FEED_FIELD_NAME_PARAMETERS)
+                    || !feedConfig.containsKey(BADConstants.BAD_FEED_FIELD_NAME_CHANNEL_DV)) {
+                throw new AlgebricksException(
+                        "A BAD feed requires the host, dataverse name, channel name, and channel parameters of the other BAD system.");
+            }
+
+            // check format and http feed
+            if (!feedConfig.containsKey(ExternalDataConstants.KEY_ADAPTER_NAME)
+                    || !feedConfig.get(ExternalDataConstants.KEY_ADAPTER_NAME).toLowerCase().equals("http_adapter")) {
+                throw new AlgebricksException("A BAD feed needs a http adapter.");
+            }
+            if (!feedConfig.containsKey(ExternalDataConstants.KEY_FORMAT)
+                    || !feedConfig.get(ExternalDataConstants.KEY_FORMAT).toLowerCase().equals("adm")) {
+                throw new AlgebricksException("A BAD feed requires incoming data to be in ADM format.");
+            }
+            if (!feedConfig.containsKey(ExternalDataConstants.KEY_MODE)
+                    || !feedConfig.get(ExternalDataConstants.KEY_MODE).toLowerCase().equals("ip")) {
+                throw new AlgebricksException("A BAD feed requires an IP address.");
+            }
+        }
+        super.handleCreateFeedStatement(metadataProvider, stmt);
+    }
+
+    @Override
+    protected void handleStartFeedStatement(MetadataProvider metadataProvider, Statement stmt,
+            IHyracksClientConnection hcc) throws Exception {
+        StartFeedStatement sfs = (StartFeedStatement) stmt;
+        DataverseName dataverseName = getActiveDataverseName(sfs.getDataverseName());
+        String feedName = sfs.getFeedName().getValue();
+
+        // Retrieve Feed entity from Metadata
+        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+        metadataProvider.setMetadataTxnContext(mdTxnCtx);
+        Feed feed = FeedMetadataUtil.validateIfFeedExists(dataverseName, feedName,
+                metadataProvider.getMetadataTxnContext());
+        MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+
+        // If it's a BAD feed
+        Map<String, String> feedConfig = feed.getConfiguration();
+        if (feedConfig.containsKey(BADConstants.BAD_FEED_FIELD_NAME_HOST)) {
+            String badHost = feedConfig.getOrDefault(BADConstants.BAD_FEED_FIELD_NAME_HOST, null);
+            String badChannelName = feedConfig.getOrDefault(BADConstants.BAD_FEED_FIELD_NAME_CHANNEL, null);
+            String badParameters = feedConfig.getOrDefault(BADConstants.BAD_FEED_FIELD_NAME_PARAMETERS, null);
+            String badDataverseName = feedConfig.getOrDefault(BADConstants.BAD_FEED_FIELD_NAME_CHANNEL_DV, null);
+
+            // construct statements
+            // add Broker to feed name as the broker name
+            try {
+                // create broker
+                String connStmtStr = String.format(
+                        "USE %s;\n"
+                                + "DROP BROKER %BROKER IF EXISTS; CREATE BROKER %sBroker AT \"http://%s\" with {\"broker-type\" : \"BAD\"};\n",
+                        badDataverseName, feed.getFeedName(), feed.getFeedName(),
+                        feed.getConfiguration().get("addresses"));
+                BADLangUtils.executeStatement(badHost, connStmtStr);
+
+                // create subs
+                String[] params = badParameters.split(";");
+                StringBuilder subStmtStr = new StringBuilder(String.format("USE %s;\n ", badDataverseName));
+                for (String param : params) {
+                    subStmtStr.append(String.format("SUBSCRIBE TO %s(%s) on %sBroker;", badChannelName, param,
+                            feed.getFeedName()));
+                }
+                BADLangUtils.executeStatement(badHost, subStmtStr.toString());
+            } catch (Exception e) {
+                // drop broker and all subs if anything goes wrong
+                String dropStmtStr = String.format(
+                        "USE %s;\n"
+                                + "DELETE FROM %sSubscriptions s WHERE s.BrokerName = \"%sBroker\"; DROP BROKER %sBroker",
+                        badDataverseName, badChannelName, feed.getFeedName(), feed.getFeedName());
+                BADLangUtils.executeStatement(badHost, dropStmtStr);
+                throw e;
+            }
+        }
+        MetadataProvider badMetadataProvider = BADMetadataProvider.create(metadataProvider.getApplicationContext(),
+                metadataProvider.getDefaultDataverse());
+        super.handleStartFeedStatement(badMetadataProvider, stmt, hcc);
+    }
+
+    @Override
+    protected void handleStopFeedStatement(MetadataProvider metadataProvider, Statement stmt) throws Exception {
+        StopFeedStatement sfst = (StopFeedStatement) stmt;
+        DataverseName dataverseName = getActiveDataverseName(sfst.getDataverseName());
+        String feedName = sfst.getFeedName().getValue();
+
+        // Retrieve Feed entity from Metadata
+        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+        metadataProvider.setMetadataTxnContext(mdTxnCtx);
+        Feed feed = FeedMetadataUtil.validateIfFeedExists(dataverseName, feedName,
+                metadataProvider.getMetadataTxnContext());
+        MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+
+        Map<String, String> feedConfig = feed.getConfiguration();
+        if (feedConfig.containsKey(BADConstants.BAD_FEED_FIELD_NAME_HOST)) {
+            String badHost = feedConfig.getOrDefault(BADConstants.BAD_FEED_FIELD_NAME_HOST, null);
+            String badChannelName = feedConfig.getOrDefault(BADConstants.BAD_FEED_FIELD_NAME_CHANNEL, null);
+            String badDataverseName = feedConfig.getOrDefault(BADConstants.BAD_FEED_FIELD_NAME_CHANNEL_DV, null);
+
+            // construct statements
+            String dropStmtStr = String.format(
+                    "USE %s;\n"
+                            + "DELETE FROM %sSubscriptions s WHERE s.BrokerName = \"%sBroker\"; DROP BROKER %sBroker",
+                    badDataverseName, badChannelName, feed.getFeedName(), feed.getFeedName());
+
+            // make request
+            int responseCode = BADLangUtils.executeStatement(badHost, dropStmtStr);
+            if (responseCode != 200) {
+                throw new AlgebricksException("Connecting to " + badHost + " failed");
+            }
+        }
+    }
+
+    @Override
     public void handleDatasetDropStatement(MetadataProvider metadataProvider, Statement stmt,
             IHyracksClientConnection hcc, IRequestParameters requestParameters) throws Exception {
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorFactory.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorFactory.java
index ab5a96e..f7ca86b 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorFactory.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorFactory.java
@@ -35,6 +35,6 @@
     public QueryTranslator create(ICcApplicationContext appCtx, List<Statement> statements, SessionOutput output,
             ILangCompilationProvider compilationProvider, IStorageComponentProvider storageComponentProvider,
             IResponsePrinter printer) {
-        return new BADStatementExecutor(appCtx, statements, output, compilationProvider, executorService, printer);
+        return new BADQueryTranslator(appCtx, statements, output, compilationProvider, executorService, printer);
     }
 }
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADRuleSetFactory.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADRuleSetFactory.java
index 8d4b1e5..3c3ec59 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADRuleSetFactory.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADRuleSetFactory.java
@@ -22,57 +22,98 @@
 import java.util.List;
 
 import org.apache.asterix.bad.rules.InsertBrokerNotifierForChannelRule;
+import org.apache.asterix.bad.rules.RewriteChannelTimeFunctionToLocalVarRule;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.compiler.provider.DefaultRuleSetFactory;
 import org.apache.asterix.compiler.provider.IRuleSetFactory;
 import org.apache.asterix.optimizer.base.RuleCollections;
+import org.apache.asterix.optimizer.rules.FeedScanCollectionToUnnest;
+import org.apache.asterix.optimizer.rules.MetaFunctionToMetaVariableRule;
 import org.apache.asterix.optimizer.rules.UnnestToDataScanRule;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.algebricks.compiler.rewriter.rulecontrollers.SequentialOnceRuleController;
+import org.apache.hyracks.algebricks.compiler.rewriter.rulecontrollers.SequentialFixpointRuleController;
 import org.apache.hyracks.algebricks.core.rewriter.base.AbstractRuleController;
 import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
 
 public class BADRuleSetFactory implements IRuleSetFactory {
 
+    private boolean isSameRuleCollection(List<IAlgebraicRewriteRule> listA, List<IAlgebraicRewriteRule> listB) {
+        if (listA.size() != listB.size()) {
+            return false;
+        }
+        for (int i = 0; i < listA.size(); i++) {
+            if (!listA.get(i).getClass().equals(listB.get(i).getClass())) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private void updateNormalizationRules(
+            List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> logicalRuleSet,
+            ICcApplicationContext appCtx) {
+        // gen original normalization rules
+        List<IAlgebraicRewriteRule> originalNormalizationRules =
+                RuleCollections.buildNormalizationRuleCollection(appCtx);
+        // make a copy
+        List<IAlgebraicRewriteRule> alteredNormalizationRules = new ArrayList<>();
+        alteredNormalizationRules.addAll(originalNormalizationRules);
+
+        // insert the broker rule
+        for (int i = 0; i < alteredNormalizationRules.size(); i++) {
+            IAlgebraicRewriteRule rule = alteredNormalizationRules.get(i);
+            if (rule instanceof UnnestToDataScanRule) {
+                alteredNormalizationRules.add(i + 1, new InsertBrokerNotifierForChannelRule());
+            }
+        }
+
+        // replace all normalization rule collections with the new one
+        SequentialFixpointRuleController seqOnceCtrl = new SequentialFixpointRuleController(true);
+        for (int i = 0; i < logicalRuleSet.size(); i++) {
+            List<IAlgebraicRewriteRule> existingRuleCollection = logicalRuleSet.get(i).second;
+            if (isSameRuleCollection(existingRuleCollection, originalNormalizationRules)) {
+                logicalRuleSet.set(i, new Pair<>(seqOnceCtrl, alteredNormalizationRules));
+            }
+        }
+    }
+
+    private void addRewriteChannelTimeFunctionRule(
+            List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> logicalRuleSet,
+            ICcApplicationContext appCtx) {
+        // gen original normalization rules
+        List<IAlgebraicRewriteRule> originalRuleCollection = RuleCollections.buildLoadFieldsRuleCollection(appCtx);
+        // make a copy
+        List<IAlgebraicRewriteRule> alteredRuleCollection = new ArrayList<>();
+        alteredRuleCollection.addAll(originalRuleCollection);
+        // insert the broker rule
+        for (int i = 0; i < alteredRuleCollection.size(); i++) {
+            IAlgebraicRewriteRule rule = alteredRuleCollection.get(i);
+            if (rule instanceof FeedScanCollectionToUnnest) {
+                alteredRuleCollection.add(i + 1, new MetaFunctionToMetaVariableRule());
+                alteredRuleCollection.add(i + 1, new RewriteChannelTimeFunctionToLocalVarRule());
+            }
+        }
+
+        // replace all normalization rule collections with the new one
+        SequentialFixpointRuleController seqCtrlNoDfs = new SequentialFixpointRuleController(true);
+        for (int i = 0; i < logicalRuleSet.size(); i++) {
+            List<IAlgebraicRewriteRule> existingRuleCollection = logicalRuleSet.get(i).second;
+            if (isSameRuleCollection(existingRuleCollection, originalRuleCollection)) {
+                logicalRuleSet.set(i, new Pair<>(seqCtrlNoDfs, alteredRuleCollection));
+            }
+        }
+    }
+
     @Override
     public List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> getLogicalRewrites(
             ICcApplicationContext appCtx) throws AlgebricksException {
         List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> logicalRuleSet =
                 DefaultRuleSetFactory.buildLogical(appCtx);
 
-        List<IAlgebraicRewriteRule> normalizationCollection = RuleCollections.buildNormalizationRuleCollection(appCtx);
-        List<IAlgebraicRewriteRule> alteredNormalizationCollection = new ArrayList<>();
-        alteredNormalizationCollection.addAll(normalizationCollection);
+        updateNormalizationRules(logicalRuleSet, appCtx);
+        addRewriteChannelTimeFunctionRule(logicalRuleSet, appCtx);
 
-        //Create a normalization collection that includes the broker rule
-        for (int i = 0; i < alteredNormalizationCollection.size(); i++) {
-            IAlgebraicRewriteRule rule = alteredNormalizationCollection.get(i);
-            if (rule instanceof UnnestToDataScanRule) {
-                alteredNormalizationCollection.add(i + 1, new InsertBrokerNotifierForChannelRule());
-                break;
-            }
-        }
-
-        //Find instances of the normalization collection and replace them with the new one
-        SequentialOnceRuleController seqOnceCtrl = new SequentialOnceRuleController(true);
-        for (int i = 0; i < logicalRuleSet.size(); i++) {
-            List<IAlgebraicRewriteRule> collection = logicalRuleSet.get(i).second;
-            if (collection.size() == normalizationCollection.size()) {
-                boolean isNormalizationCollection = true;
-                for (int j = 0; j < collection.size(); j++) {
-                    //Make sure the set of rules is the same
-                    if (!collection.get(j).getClass().equals(normalizationCollection.get(j).getClass())) {
-                        isNormalizationCollection = false;
-                        break;
-                    }
-                }
-                if (isNormalizationCollection) {
-                    //replace with the new collection
-                    logicalRuleSet.set(i, new Pair<>(seqOnceCtrl, alteredNormalizationCollection));
-                }
-            }
-        }
         return logicalRuleSet;
     }
 
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/AbstractCreateChannelStatement.java
similarity index 79%
rename from asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
rename to asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/AbstractCreateChannelStatement.java
index e29ff67..3020cb0 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/AbstractCreateChannelStatement.java
@@ -18,9 +18,6 @@
  */
 package org.apache.asterix.bad.lang.statement;
 
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.StringReader;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -33,7 +30,8 @@
 import org.apache.asterix.app.translator.QueryTranslator;
 import org.apache.asterix.bad.BADConstants;
 import org.apache.asterix.bad.BADJobService;
-import org.apache.asterix.bad.lang.BADLangExtension;
+import org.apache.asterix.bad.BADUtils;
+import org.apache.asterix.bad.extension.BADLangExtension;
 import org.apache.asterix.bad.lang.BADParserFactory;
 import org.apache.asterix.bad.metadata.Channel;
 import org.apache.asterix.bad.metadata.DeployedJobSpecEventListener;
@@ -43,17 +41,14 @@
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.CompilationException;
-import org.apache.asterix.common.exceptions.MetadataException;
 import org.apache.asterix.common.functions.FunctionSignature;
 import org.apache.asterix.common.metadata.DataverseName;
 import org.apache.asterix.lang.common.base.Expression;
 import org.apache.asterix.lang.common.base.Statement;
 import org.apache.asterix.lang.common.expression.CallExpr;
 import org.apache.asterix.lang.common.expression.IndexedTypeExpression;
-import org.apache.asterix.lang.common.expression.LiteralExpr;
 import org.apache.asterix.lang.common.expression.TypeExpression;
 import org.apache.asterix.lang.common.expression.TypeReferenceExpression;
-import org.apache.asterix.lang.common.literal.StringLiteral;
 import org.apache.asterix.lang.common.statement.CreateIndexStatement;
 import org.apache.asterix.lang.common.statement.DatasetDecl;
 import org.apache.asterix.lang.common.statement.IDatasetDetailsDecl;
@@ -66,9 +61,7 @@
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.MetadataTransactionContext;
 import org.apache.asterix.metadata.declared.MetadataProvider;
-import org.apache.asterix.metadata.entities.Function;
 import org.apache.asterix.metadata.utils.MetadataConstants;
-import org.apache.asterix.om.base.temporal.ADurationParserFactory;
 import org.apache.asterix.translator.IRequestParameters;
 import org.apache.asterix.translator.IStatementExecutor;
 import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
@@ -79,26 +72,24 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.result.IResultSet;
-import org.apache.hyracks.dataflow.common.data.parsers.IValueParser;
 
-public class CreateChannelStatement extends ExtensionStatement {
+public abstract class AbstractCreateChannelStatement extends ExtensionStatement {
 
-    private static final Logger LOGGER = Logger.getLogger(CreateChannelStatement.class.getName());
-    private final Identifier channelName;
-    private final FunctionSignature function;
-    private final CallExpr period;
+    private static final Logger LOGGER = Logger.getLogger(AbstractCreateChannelStatement.class.getName());
+    protected String duration;
+    protected FunctionSignature function;
+    protected final CallExpr period;
     private DataverseName dataverseName;
-    private String duration;
     private String body;
     private String subscriptionsTableName;
     private String resultsTableName;
     private final boolean push;
+    private final Identifier channelName;
 
-    public CreateChannelStatement(DataverseName dataverseName, Identifier channelName, FunctionSignature function,
-            Expression period, boolean push) {
+    public AbstractCreateChannelStatement(DataverseName dataverseName, Identifier channelName, Expression period,
+            boolean push) {
         this.channelName = channelName;
         this.dataverseName = dataverseName;
-        this.function = function;
         this.period = (CallExpr) period;
         this.duration = "";
         this.push = push;
@@ -108,18 +99,6 @@
         return dataverseName;
     }
 
-    public Identifier getChannelName() {
-        return channelName;
-    }
-
-    public String getResultsName() {
-        return resultsTableName;
-    }
-
-    public String getSubscriptionsName() {
-        return subscriptionsTableName;
-    }
-
     public String getDuration() {
         return duration;
     }
@@ -142,27 +121,13 @@
         return null;
     }
 
-    public void initialize(MetadataTransactionContext mdTxnCtx) throws AlgebricksException, HyracksDataException {
-        Function lookup = MetadataManager.INSTANCE.getFunction(mdTxnCtx, function);
-        if (lookup == null) {
-            throw new MetadataException(" Unknown function " + function.getName());
-        }
-
-        if (!period.getFunctionSignature().getName().equals("duration")) {
-            throw new MetadataException(
-                    "Expected argument period as a duration, but got " + period.getFunctionSignature().getName() + ".");
-        }
-        duration = ((StringLiteral) ((LiteralExpr) period.getExprList().get(0)).getValue()).getValue();
-        IValueParser durationParser = ADurationParserFactory.INSTANCE.createValueParser();
-        ByteArrayOutputStream bos = new ByteArrayOutputStream();
-        DataOutputStream outputStream = new DataOutputStream(bos);
-        durationParser.parse(duration.toCharArray(), 0, duration.toCharArray().length, outputStream);
-    }
+    protected abstract void initialize(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
+            MetadataTransactionContext mdTxnCtx) throws Exception;
 
     private void createDatasets(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
-            IHyracksClientConnection hcc) throws AsterixException, Exception {
+            IHyracksClientConnection hcc) throws Exception {
 
-        Identifier subscriptionsTypeName = new Identifier(BADConstants.ChannelSubscriptionsType);
+        Identifier subscriptionsTypeName = new Identifier(BADConstants.METADATA_TYPENAME_SUBSCRIPTIONS);
         Identifier resultsTypeName = new Identifier(BADConstants.ChannelResultsType);
         //Setup the subscriptions dataset
         List<List<String>> partitionFields = new ArrayList<>();
@@ -232,18 +197,25 @@
         builder.append("select result, ");
         builder.append(BADConstants.ChannelExecutionTime + ", ");
         builder.append("sub." + BADConstants.SubscriptionId + " as " + BADConstants.SubscriptionId + ",");
+        // builder.append("b." + BADConstants.METADATA_TYPE_FIELD_NAME_BROKER_TYPE + " as "
+        //         + BADConstants.METADATA_TYPE_FIELD_NAME_BROKER_TYPE + ",");
+        // builder.append("b." + BADConstants.METADATA_TYPE_FIELD_NAME_BROKER_END_POINT + " as " + BADConstants.METADATA_TYPE_FIELD_NAME_BROKER_END_POINT + ",");
         builder.append("current_datetime() as " + BADConstants.DeliveryTime + "\n");
         builder.append("from " + dataverse + "." + subscriptionsTableName + " sub,\n");
-        builder.append(MetadataConstants.METADATA_DATAVERSE_NAME + "." + BADConstants.BROKER_KEYWORD + " b, \n");
-        //TODO(MULTI_PART_DATAVERSE_NAME):REVISIT
-        builder.append(function.getDataverseName().getCanonicalForm() + "." + function.getName() + "(");
-        int i = 0;
-        for (; i < function.getArity() - 1; i++) {
-            builder.append("sub.param" + i + ",");
+        builder.append(
+                MetadataConstants.METADATA_DATAVERSE_NAME + ".`" + BADConstants.METADATA_DATASET_BROKER + "` b, \n");
+        builder.append(function.getDataverseName() + "." + function.getName() + "(");
+        for (int iter1 = 0; iter1 < function.getArity(); iter1++) {
+            if (iter1 > 0) {
+                builder.append(", ");
+            }
+            builder.append("sub.param" + iter1);
         }
-        builder.append("sub.param" + i + ") result \n");
-        builder.append("where b." + BADConstants.BrokerName + " = sub." + BADConstants.BrokerName + "\n");
-        builder.append("and b." + BADConstants.DataverseName + " = sub." + BADConstants.DataverseName + "\n");
+        builder.append(") result \n");
+        builder.append("where sub." + BADConstants.METADATA_TYPE_FIELD_NAME_BROKERNAME + " /*+ bcast */ = b."
+                + BADConstants.METADATA_TYPE_FIELD_NAME_BROKERNAME + "\n");
+        builder.append("and sub." + BADConstants.METADATA_TYPE_NAME_DATAVERSENAME + " /*+ bcast */ = b."
+                + BADConstants.METADATA_TYPE_NAME_DATAVERSENAME + "\n");
         if (!push) {
             builder.append(")");
             builder.append(" returning a");
@@ -251,7 +223,7 @@
         builder.append(";");
         body = builder.toString();
         BADParserFactory factory = new BADParserFactory();
-        List<Statement> fStatements = factory.createParser(new StringReader(builder.toString())).parse();
+        List<Statement> fStatements = factory.createParser(builder.toString()).parse();
 
         SetStatement ss = (SetStatement) fStatements.get(0);
         metadataProvider.getConfig().put(ss.getPropName(), ss.getPropValue());
@@ -265,7 +237,7 @@
 
     @Override
     public String getName() {
-        return CreateChannelStatement.class.getName();
+        return AbstractCreateChannelStatement.class.getName();
     }
 
     @Override
@@ -283,7 +255,7 @@
         subscriptionsTableName = channelName + BADConstants.subscriptionEnding;
         resultsTableName = push ? "" : channelName + BADConstants.resultsEnding;
 
-        EntityId entityId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, dataverseName, channelName.getValue());
+        EntityId entityId = new EntityId(BADConstants.RUNTIME_ENTITY_CHANNEL, dataverseName, channelName.getValue());
         ICcApplicationContext appCtx = metadataProvider.getApplicationContext();
         ActiveNotificationHandler activeEventHandler =
                 (ActiveNotificationHandler) appCtx.getActiveNotificationHandler();
@@ -305,7 +277,7 @@
             if (alreadyActive) {
                 throw new AsterixException("Channel " + channelName + " is already running");
             }
-            initialize(mdTxnCtx);
+            initialize(statementExecutor, metadataProvider, mdTxnCtx);
 
             //check if names are available before creating anything
             if (MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, subscriptionsTableName) != null) {
@@ -314,11 +286,11 @@
             if (!push && MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, resultsTableName) != null) {
                 throw new AsterixException("The channel name:" + channelName + " is not available.");
             }
-            MetadataProvider tempMdProvider = MetadataProvider.create(metadataProvider.getApplicationContext(),
-                    metadataProvider.getDefaultDataverse());
-            tempMdProvider.getConfig().putAll(metadataProvider.getConfig());
+            MetadataProvider tempMdProvider = BADUtils.replicateMetadataProvider(metadataProvider);
+            tempMdProvider.setMaxResultReads(requestContext.getResultProperties().getMaxReads());
             final IResultSet resultSet = requestContext.getResultSet();
             final Stats stats = requestContext.getStats();
+            tempMdProvider.getConfig().put(BADConstants.CONFIG_CHANNEL_NAME, channelName.getValue());
             //Create Channel Datasets
             createDatasets(statementExecutor, tempMdProvider, hcc);
             tempMdProvider.getLocks().reset();
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/BrokerDropStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/BrokerDropStatement.java
index 9678b46..962b0d5 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/BrokerDropStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/BrokerDropStatement.java
@@ -20,7 +20,7 @@
 
 import org.apache.asterix.algebra.extension.ExtensionStatement;
 import org.apache.asterix.app.translator.QueryTranslator;
-import org.apache.asterix.bad.lang.BADLangExtension;
+import org.apache.asterix.bad.extension.BADLangExtension;
 import org.apache.asterix.bad.metadata.Broker;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.metadata.DataverseName;
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
index fb4fe47..34940a9 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
@@ -27,7 +27,7 @@
 import org.apache.asterix.app.active.ActiveNotificationHandler;
 import org.apache.asterix.app.translator.QueryTranslator;
 import org.apache.asterix.bad.BADConstants;
-import org.apache.asterix.bad.lang.BADLangExtension;
+import org.apache.asterix.bad.extension.BADLangExtension;
 import org.apache.asterix.bad.metadata.Channel;
 import org.apache.asterix.bad.metadata.DeployedJobSpecEventListener;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
@@ -92,7 +92,7 @@
             throws HyracksDataException, AlgebricksException {
         DataverseName dataverse = statementExecutor.getActiveDataverseName(dataverseName);
         boolean txnActive = false;
-        EntityId entityId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, dataverse, channelName.getValue());
+        EntityId entityId = new EntityId(BADConstants.RUNTIME_ENTITY_CHANNEL, dataverse, channelName.getValue());
         ICcApplicationContext appCtx = metadataProvider.getApplicationContext();
         ActiveNotificationHandler activeEventHandler =
                 (ActiveNotificationHandler) appCtx.getActiveNotificationHandler();
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
index 60be430..4aa1959 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
@@ -24,7 +24,8 @@
 import org.apache.asterix.algebra.extension.ExtensionStatement;
 import org.apache.asterix.app.translator.QueryTranslator;
 import org.apache.asterix.bad.BADConstants;
-import org.apache.asterix.bad.lang.BADLangExtension;
+import org.apache.asterix.bad.BADUtils;
+import org.apache.asterix.bad.extension.BADLangExtension;
 import org.apache.asterix.bad.metadata.Broker;
 import org.apache.asterix.bad.metadata.Channel;
 import org.apache.asterix.common.exceptions.AsterixException;
@@ -154,11 +155,11 @@
             Query subscriptionTuple = new Query(false);
 
             List<FieldBinding> fb = new ArrayList<>();
-            LiteralExpr leftExpr = new LiteralExpr(new StringLiteral(BADConstants.DataverseName));
+            LiteralExpr leftExpr = new LiteralExpr(new StringLiteral(BADConstants.METADATA_TYPE_NAME_DATAVERSENAME));
             Expression rightExpr = new LiteralExpr(new StringLiteral(brokerDataverse.getCanonicalForm()));
             fb.add(new FieldBinding(leftExpr, rightExpr));
 
-            leftExpr = new LiteralExpr(new StringLiteral(BADConstants.BrokerName));
+            leftExpr = new LiteralExpr(new StringLiteral(BADConstants.METADATA_TYPE_FIELD_NAME_BROKERNAME));
             rightExpr = new LiteralExpr(new StringLiteral(broker.getBrokerName()));
             fb.add(new FieldBinding(leftExpr, rightExpr));
 
@@ -183,9 +184,9 @@
             RecordConstructor recordCon = new RecordConstructor(fb);
             subscriptionTuple.setBody(recordCon);
             subscriptionTuple.setVarCounter(varCounter);
-            MetadataProvider tempMdProvider = MetadataProvider.create(metadataProvider.getApplicationContext(),
-                    metadataProvider.getDefaultDataverse());
-            tempMdProvider.getConfig().putAll(metadataProvider.getConfig());
+
+            metadataProvider.setResultSetId(new ResultSetId(resultSetId));
+            MetadataProvider tempMdProvider = BADUtils.replicateMetadataProvider(metadataProvider);
 
             final ResultDelivery resultDelivery = requestParameters.getResultProperties().getDelivery();
             final IResultSet resultSet = requestParameters.getResultSet();
@@ -197,16 +198,10 @@
                 useResultVar.setIsNewVar(false);
                 FieldAccessor accessor = new FieldAccessor(useResultVar, new Identifier(BADConstants.SubscriptionId));
 
-                metadataProvider.setResultSetId(new ResultSetId(resultSetId));
                 boolean resultsAsync =
                         resultDelivery == ResultDelivery.ASYNC || resultDelivery == ResultDelivery.DEFERRED;
                 metadataProvider.setResultAsyncMode(resultsAsync);
-                tempMdProvider.setResultSetId(metadataProvider.getResultSetId());
                 tempMdProvider.setResultAsyncMode(resultsAsync);
-                tempMdProvider.setWriterFactory(metadataProvider.getWriterFactory());
-                tempMdProvider
-                        .setResultSerializerFactoryProvider(metadataProvider.getResultSerializerFactoryProvider());
-                tempMdProvider.setOutputFile(metadataProvider.getOutputFile());
                 tempMdProvider.setMaxResultReads(requestParameters.getResultProperties().getMaxReads());
 
                 InsertStatement insert = new InsertStatement(dataverse, subscriptionsDatasetName, subscriptionTuple,
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
index 298919f..93c27fa 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
@@ -24,7 +24,7 @@
 import org.apache.asterix.algebra.extension.ExtensionStatement;
 import org.apache.asterix.app.translator.QueryTranslator;
 import org.apache.asterix.bad.BADConstants;
-import org.apache.asterix.bad.lang.BADLangExtension;
+import org.apache.asterix.bad.extension.BADLangExtension;
 import org.apache.asterix.bad.metadata.Channel;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.CompilationException;
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateBrokerStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateBrokerStatement.java
index 581b597..042d3dd 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateBrokerStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateBrokerStatement.java
@@ -23,15 +23,19 @@
 
 import org.apache.asterix.algebra.extension.ExtensionStatement;
 import org.apache.asterix.app.translator.QueryTranslator;
-import org.apache.asterix.bad.lang.BADLangExtension;
+import org.apache.asterix.bad.BADConstants;
+import org.apache.asterix.bad.extension.BADLangExtension;
 import org.apache.asterix.bad.metadata.Broker;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.metadata.DataverseName;
+import org.apache.asterix.lang.common.expression.RecordConstructor;
 import org.apache.asterix.lang.common.struct.Identifier;
+import org.apache.asterix.lang.common.util.ExpressionUtils;
 import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.MetadataTransactionContext;
 import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.object.base.AdmObjectNode;
 import org.apache.asterix.translator.IRequestParameters;
 import org.apache.asterix.translator.IStatementExecutor;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -44,11 +48,19 @@
     private final DataverseName dataverseName;
     private final Identifier brokerName;
     private String endPointName;
+    private String brokerType;
+    private AdmObjectNode withObjectNode;
 
-    public CreateBrokerStatement(DataverseName dataverseName, Identifier brokerName, String endPointName) {
+    public CreateBrokerStatement(DataverseName dataverseName, Identifier brokerName, String endPointName,
+            RecordConstructor withRecord) throws CompilationException {
         this.brokerName = brokerName;
         this.dataverseName = dataverseName;
         this.endPointName = endPointName;
+        if (withRecord != null) {
+            this.withObjectNode = ExpressionUtils.toNode(withRecord);
+            this.brokerType = withObjectNode.getOptionalString(BADConstants.BAD_BROKER_FIELD_NAME_TYPE);
+        }
+        this.brokerType = brokerType == null ? BADConstants.GENERAL_BROKER_TYPE_NAME : brokerType.toLowerCase();
     }
 
     public String getEndPointName() {
@@ -91,7 +103,7 @@
             if (broker != null) {
                 throw new AlgebricksException("A broker with this name " + brokerName + " already exists.");
             }
-            broker = new Broker(dataverse, brokerName.getValue(), endPointName);
+            broker = new Broker(dataverse, brokerName.getValue(), endPointName, brokerType);
             MetadataManager.INSTANCE.addEntity(mdTxnCtx, broker);
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
         } catch (Exception e) {
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateContinuousChannelStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateContinuousChannelStatement.java
new file mode 100644
index 0000000..fd14e27
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateContinuousChannelStatement.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.lang.statement;
+
+import org.apache.asterix.app.translator.QueryTranslator;
+import org.apache.asterix.common.exceptions.MetadataException;
+import org.apache.asterix.common.metadata.DataverseName;
+import org.apache.asterix.lang.common.base.Expression;
+import org.apache.asterix.lang.common.expression.LiteralExpr;
+import org.apache.asterix.lang.common.literal.StringLiteral;
+import org.apache.asterix.lang.common.statement.CreateFunctionStatement;
+import org.apache.asterix.lang.common.struct.Identifier;
+import org.apache.asterix.lang.sqlpp.parser.SqlppParserFactory;
+import org.apache.asterix.lang.sqlpp.rewrites.SqlppRewriterFactory;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.MetadataTransactionContext;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Function;
+import org.apache.asterix.translator.IStatementExecutor;
+
+public class CreateContinuousChannelStatement extends AbstractCreateChannelStatement {
+
+    private CreateFunctionStatement stmt;
+
+    public CreateContinuousChannelStatement(DataverseName dataverseName, Identifier channelName, Expression period,
+            boolean push, CreateFunctionStatement stmt) {
+        super(dataverseName, channelName, period, push);
+        this.stmt = stmt;
+    }
+
+    @Override
+    protected void initialize(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
+            MetadataTransactionContext mdTxnCtx) throws Exception {
+        // Creates function
+        SqlppRewriterFactory fact = new SqlppRewriterFactory(new SqlppParserFactory());
+        ((QueryTranslator) statementExecutor).handleCreateFunctionStatement(metadataProvider, stmt,
+                fact.createStatementRewriter());
+        this.function = stmt.getFunctionSignature();
+
+        // Check whether function exists
+        Function lookup = MetadataManager.INSTANCE.getFunction(mdTxnCtx, function);
+        if (lookup == null) {
+            throw new MetadataException(" Unknown function " + function.getName());
+        }
+
+        if (period != null) {
+            if (!period.getFunctionSignature().getName().equals("duration")) {
+                throw new MetadataException("Expected argument period as a duration, but got "
+                        + period.getFunctionSignature().getName() + ".");
+            }
+            duration = ((StringLiteral) ((LiteralExpr) period.getExprList().get(0)).getValue()).getValue();
+        } else {
+            duration = "";
+        }
+    }
+}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
index c76385c..eb56e35 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
@@ -34,7 +34,7 @@
 import org.apache.asterix.app.translator.QueryTranslator;
 import org.apache.asterix.bad.BADConstants;
 import org.apache.asterix.bad.BADJobService;
-import org.apache.asterix.bad.lang.BADLangExtension;
+import org.apache.asterix.bad.extension.BADLangExtension;
 import org.apache.asterix.bad.lang.BADParserFactory;
 import org.apache.asterix.bad.metadata.DeployedJobSpecEventListener;
 import org.apache.asterix.bad.metadata.DeployedJobSpecEventListener.PrecompiledType;
@@ -240,7 +240,7 @@
                 (ActiveNotificationHandler) appCtx.getActiveNotificationHandler();
         initialize();
         DataverseName dataverse = statementExecutor.getActiveDataverseName(signature.getDataverseName());
-        EntityId entityId = new EntityId(BADConstants.PROCEDURE_KEYWORD, dataverse, signature.getName());
+        EntityId entityId = new EntityId(BADConstants.RUNTIME_ENTITY_PROCEDURE, dataverse, signature.getName());
         DeployedJobSpecEventListener listener = (DeployedJobSpecEventListener) activeEventHandler.getListener(entityId);
         boolean alreadyActive = false;
         Procedure procedure = null;
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateRepetitiveChannelStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateRepetitiveChannelStatement.java
new file mode 100644
index 0000000..006535c
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateRepetitiveChannelStatement.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.lang.statement;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+
+import org.apache.asterix.common.exceptions.MetadataException;
+import org.apache.asterix.common.functions.FunctionSignature;
+import org.apache.asterix.common.metadata.DataverseName;
+import org.apache.asterix.lang.common.base.Expression;
+import org.apache.asterix.lang.common.expression.LiteralExpr;
+import org.apache.asterix.lang.common.literal.StringLiteral;
+import org.apache.asterix.lang.common.struct.Identifier;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.MetadataTransactionContext;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Function;
+import org.apache.asterix.om.base.temporal.ADurationParserFactory;
+import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.parsers.IValueParser;
+
+public class CreateRepetitiveChannelStatement extends AbstractCreateChannelStatement {
+    public CreateRepetitiveChannelStatement(DataverseName dataverseName, Identifier channelName,
+            FunctionSignature function, Expression period, boolean push) {
+        super(dataverseName, channelName, period, push);
+        this.function = function;
+    }
+
+    @Override
+    protected void initialize(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
+            MetadataTransactionContext mdTxnCtx) throws AlgebricksException, HyracksDataException {
+        Function lookup = MetadataManager.INSTANCE.getFunction(mdTxnCtx, function);
+        if (lookup == null) {
+            throw new MetadataException(" Unknown function " + function.getName());
+        }
+
+        if (!period.getFunctionSignature().getName().equals("duration")) {
+            throw new MetadataException(
+                    "Expected argument period as a duration, but got " + period.getFunctionSignature().getName() + ".");
+        }
+        duration = ((StringLiteral) ((LiteralExpr) period.getExprList().get(0)).getValue()).getValue();
+        IValueParser durationParser = ADurationParserFactory.INSTANCE.createValueParser();
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        DataOutputStream outputStream = new DataOutputStream(bos);
+        durationParser.parse(duration.toCharArray(), 0, duration.toCharArray().length, outputStream);
+    }
+}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
index ca2435e..1083f8b 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
@@ -30,7 +30,7 @@
 import org.apache.asterix.app.translator.QueryTranslator;
 import org.apache.asterix.bad.BADConstants;
 import org.apache.asterix.bad.BADJobService;
-import org.apache.asterix.bad.lang.BADLangExtension;
+import org.apache.asterix.bad.extension.BADLangExtension;
 import org.apache.asterix.bad.metadata.DeployedJobSpecEventListener;
 import org.apache.asterix.bad.metadata.Procedure;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
@@ -109,7 +109,7 @@
                 (ActiveNotificationHandler) appCtx.getActiveNotificationHandler();
         DataverseName dataverse = statementExecutor.getActiveDataverseName(dataverseName);
         boolean txnActive = false;
-        EntityId entityId = new EntityId(BADConstants.PROCEDURE_KEYWORD, dataverse, procedureName);
+        EntityId entityId = new EntityId(BADConstants.RUNTIME_ENTITY_PROCEDURE, dataverse, procedureName);
         DeployedJobSpecEventListener listener = (DeployedJobSpecEventListener) activeEventHandler.getListener(entityId);
         Procedure procedure;
 
@@ -129,10 +129,11 @@
                         (QueryTranslator) statementExecutor);
 
             } else {
-                ScheduledExecutorService ses = BADJobService.startRepetitiveDeployedJobSpec(deployedJobSpecId, hcc,
+                ScheduledExecutorService ses = BADJobService.createExecutorServe();
+                listener.setExecutorService(ses);
+                BADJobService.startRepetitiveDeployedJobSpec(ses, deployedJobSpecId, hcc,
                         BADJobService.findPeriod(procedure.getDuration()), contextRuntimeVarMap, entityId,
                         metadataProvider.getTxnIdFactory(), listener);
-                listener.setExecutorService(ses);
             }
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             txnActive = false;
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java
index 9d5c901..16c2766 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java
@@ -27,7 +27,7 @@
 import org.apache.asterix.app.active.ActiveNotificationHandler;
 import org.apache.asterix.app.translator.QueryTranslator;
 import org.apache.asterix.bad.BADConstants;
-import org.apache.asterix.bad.lang.BADLangExtension;
+import org.apache.asterix.bad.extension.BADLangExtension;
 import org.apache.asterix.bad.metadata.DeployedJobSpecEventListener;
 import org.apache.asterix.bad.metadata.Procedure;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
@@ -90,7 +90,7 @@
         DataverseName dataverseName = statementExecutor.getActiveDataverseName(signature.getDataverseName());
         signature.setDataverseName(dataverseName);
         boolean txnActive = false;
-        EntityId entityId = new EntityId(BADConstants.PROCEDURE_KEYWORD, dataverseName, signature.getName());
+        EntityId entityId = new EntityId(BADConstants.RUNTIME_ENTITY_PROCEDURE, dataverseName, signature.getName());
         DeployedJobSpecEventListener listener = (DeployedJobSpecEventListener) activeEventHandler.getListener(entityId);
 
         if (listener.isActive()) {
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataIndexes.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataIndexes.java
index 0722c48..5db98d3 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataIndexes.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataIndexes.java
@@ -21,6 +21,7 @@
 import java.util.Arrays;
 
 import org.apache.asterix.bad.BADConstants;
+import org.apache.asterix.bad.extension.BADMetadataExtension;
 import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
 import org.apache.asterix.metadata.api.ExtensionMetadataDataset;
 import org.apache.asterix.metadata.api.ExtensionMetadataDatasetId;
@@ -32,23 +33,23 @@
 public class BADMetadataIndexes {
 
     public static final ExtensionMetadataDatasetId BAD_CHANNEL_INDEX_ID = new ExtensionMetadataDatasetId(
-            BADMetadataExtension.BAD_METADATA_EXTENSION_ID, BADConstants.CHANNEL_EXTENSION_NAME);
+            BADMetadataExtension.BAD_METADATA_EXTENSION_ID, BADConstants.METADATA_DATASET_CHANNEL);
     public static final MetadataIndexImmutableProperties PROPERTIES_CHANNEL =
-            new MetadataIndexImmutableProperties(BADConstants.CHANNEL_EXTENSION_NAME,
+            new MetadataIndexImmutableProperties(BADConstants.METADATA_DATASET_CHANNEL,
                     MetadataIndexImmutableProperties.FIRST_AVAILABLE_EXTENSION_METADATA_DATASET_ID,
                     MetadataIndexImmutableProperties.FIRST_AVAILABLE_EXTENSION_METADATA_DATASET_ID);
 
-    public static final ExtensionMetadataDatasetId BAD_BROKER_INDEX_ID =
-            new ExtensionMetadataDatasetId(BADMetadataExtension.BAD_METADATA_EXTENSION_ID, BADConstants.BROKER_KEYWORD);
+    public static final ExtensionMetadataDatasetId BAD_BROKER_INDEX_ID = new ExtensionMetadataDatasetId(
+            BADMetadataExtension.BAD_METADATA_EXTENSION_ID, BADConstants.METADATA_DATASET_BROKER);
     public static final MetadataIndexImmutableProperties PROPERTIES_BROKER =
-            new MetadataIndexImmutableProperties(BADConstants.BROKER_KEYWORD,
+            new MetadataIndexImmutableProperties(BADConstants.METADATA_DATASET_BROKER,
                     MetadataIndexImmutableProperties.FIRST_AVAILABLE_EXTENSION_METADATA_DATASET_ID + 1,
                     MetadataIndexImmutableProperties.FIRST_AVAILABLE_EXTENSION_METADATA_DATASET_ID + 1);
 
     public static final ExtensionMetadataDatasetId BAD_PROCEDURE_INDEX_ID = new ExtensionMetadataDatasetId(
-            BADMetadataExtension.BAD_METADATA_EXTENSION_ID, BADConstants.PROCEDURE_KEYWORD);
+            BADMetadataExtension.BAD_METADATA_EXTENSION_ID, BADConstants.METADATA_DATASET_PROCEDURE);
     public static final MetadataIndexImmutableProperties PROPERTIES_PROCEDURE =
-            new MetadataIndexImmutableProperties(BADConstants.PROCEDURE_KEYWORD,
+            new MetadataIndexImmutableProperties(BADConstants.METADATA_DATASET_PROCEDURE,
                     MetadataIndexImmutableProperties.FIRST_AVAILABLE_EXTENSION_METADATA_DATASET_ID + 2,
                     MetadataIndexImmutableProperties.FIRST_AVAILABLE_EXTENSION_METADATA_DATASET_ID + 2);
 
@@ -66,7 +67,7 @@
     public static final ExtensionMetadataDataset BROKER_DATASET = new ExtensionMetadataDataset<>(PROPERTIES_BROKER,
             NUM_FIELDS_BROKER_IDX, new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING },
             Arrays.asList(Arrays.asList(MetadataRecordTypes.FIELD_NAME_DATAVERSE_NAME),
-                    Arrays.asList(BADConstants.BrokerName)),
+                    Arrays.asList(BADConstants.METADATA_TYPE_FIELD_NAME_BROKERNAME)),
             0, BADMetadataRecordTypes.BROKER_RECORDTYPE, true, new int[] { 0, 1 }, BAD_BROKER_INDEX_ID,
             (IMetadataEntityTupleTranslatorFactory<Broker>) BrokerTupleTranslator::new);
 
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataProvider.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataProvider.java
new file mode 100644
index 0000000..290db49
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataProvider.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.bad.metadata;
+
+import org.apache.asterix.bad.feed.operators.BADLSMPrimaryInsertOperatorDescriptor;
+import org.apache.asterix.bad.feed.operators.BADLSMPrimaryUpsertOperatorDescriptor;
+import org.apache.asterix.common.context.IStorageComponentProvider;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.formats.base.IDataFormat;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.Dataverse;
+import org.apache.asterix.metadata.entities.Index;
+import org.apache.asterix.metadata.utils.DatasetUtil;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.runtime.operators.LSMPrimaryInsertOperatorDescriptor;
+import org.apache.asterix.runtime.operators.LSMPrimaryUpsertOperatorDescriptor;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
+import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.dataflow.value.ITypeTraits;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
+import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
+import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
+import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
+import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+
+public class BADMetadataProvider extends MetadataProvider {
+
+    public static MetadataProvider create(ICcApplicationContext appCtx, Dataverse defaultDataverse) {
+        MetadataProvider mp = new BADMetadataProvider(appCtx);
+        mp.setDefaultDataverse(defaultDataverse);
+        return mp;
+    }
+
+    protected BADMetadataProvider(ICcApplicationContext appCtx) {
+        super(appCtx);
+    }
+
+    @Override
+    protected LSMPrimaryInsertOperatorDescriptor createLSMPrimaryInsertOperatorDescriptor(JobSpecification spec,
+            RecordDescriptor inputRecordDesc, int[] fieldPermutation, IIndexDataflowHelperFactory idfh,
+            IIndexDataflowHelperFactory pkidfh, IModificationOperationCallbackFactory modificationCallbackFactory,
+            ISearchOperationCallbackFactory searchCallbackFactory, int numKeys, int[] filterFields) {
+        return new BADLSMPrimaryInsertOperatorDescriptor(spec, inputRecordDesc, fieldPermutation, idfh, pkidfh,
+                modificationCallbackFactory, searchCallbackFactory, numKeys, filterFields);
+    }
+
+    // This method uses a static method from DatasetUtil, so it cannot be further simplified.
+    @Override
+    protected Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> createPrimaryIndexUpsertOp(JobSpecification spec,
+            MetadataProvider metadataProvider, Dataset dataset, RecordDescriptor inputRecordDesc,
+            int[] fieldPermutation, IMissingWriterFactory missingWriterFactory) throws AlgebricksException {
+        int numKeys = dataset.getPrimaryKeys().size();
+        int numFilterFields = DatasetUtil.getFilterField(dataset) == null ? 0 : 1;
+        ARecordType itemType = (ARecordType) metadataProvider.findType(dataset);
+        ARecordType metaItemType = (ARecordType) metadataProvider.findMetaType(dataset);
+        Index primaryIndex = metadataProvider.getIndex(dataset.getDataverseName(), dataset.getDatasetName(),
+                dataset.getDatasetName());
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
+                metadataProvider.getSplitProviderAndConstraints(dataset);
+
+        // prepare callback
+        int[] primaryKeyFields = new int[numKeys];
+        for (int i = 0; i < numKeys; i++) {
+            primaryKeyFields[i] = i;
+        }
+        boolean hasSecondaries =
+                metadataProvider.getDatasetIndexes(dataset.getDataverseName(), dataset.getDatasetName()).size() > 1;
+        IStorageComponentProvider storageComponentProvider = metadataProvider.getStorageComponentProvider();
+        IModificationOperationCallbackFactory modificationCallbackFactory = dataset.getModificationCallbackFactory(
+                storageComponentProvider, primaryIndex, IndexOperation.UPSERT, primaryKeyFields);
+        ISearchOperationCallbackFactory searchCallbackFactory = dataset.getSearchCallbackFactory(
+                storageComponentProvider, primaryIndex, IndexOperation.UPSERT, primaryKeyFields);
+        IIndexDataflowHelperFactory idfh =
+                new IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(), splitsAndConstraint.first);
+        LSMPrimaryUpsertOperatorDescriptor op;
+        ITypeTraits[] outputTypeTraits = new ITypeTraits[inputRecordDesc.getFieldCount() + 1
+                + (dataset.hasMetaPart() ? 2 : 1) + numFilterFields];
+        ISerializerDeserializer<?>[] outputSerDes = new ISerializerDeserializer[inputRecordDesc.getFieldCount() + 1
+                + (dataset.hasMetaPart() ? 2 : 1) + numFilterFields];
+        IDataFormat dataFormat = metadataProvider.getDataFormat();
+
+        int f = 0;
+        // add the upsert indicator var
+        outputSerDes[f] = dataFormat.getSerdeProvider().getSerializerDeserializer(BuiltinType.ABOOLEAN);
+        outputTypeTraits[f] = dataFormat.getTypeTraitProvider().getTypeTrait(BuiltinType.ABOOLEAN);
+        f++;
+        // add the previous record
+        outputSerDes[f] = dataFormat.getSerdeProvider().getSerializerDeserializer(itemType);
+        outputTypeTraits[f] = dataFormat.getTypeTraitProvider().getTypeTrait(itemType);
+        f++;
+        // add the previous meta second
+        if (dataset.hasMetaPart()) {
+            outputSerDes[f] = dataFormat.getSerdeProvider().getSerializerDeserializer(metaItemType);
+            outputTypeTraits[f] = dataFormat.getTypeTraitProvider().getTypeTrait(metaItemType);
+            f++;
+        }
+        // add the previous filter third
+        int fieldIdx = -1;
+        if (numFilterFields > 0) {
+            String filterField = DatasetUtil.getFilterField(dataset).get(0);
+            String[] fieldNames = itemType.getFieldNames();
+            int i = 0;
+            for (; i < fieldNames.length; i++) {
+                if (fieldNames[i].equals(filterField)) {
+                    break;
+                }
+            }
+            fieldIdx = i;
+            outputTypeTraits[f] = dataFormat.getTypeTraitProvider().getTypeTrait(itemType.getFieldTypes()[fieldIdx]);
+            outputSerDes[f] =
+                    dataFormat.getSerdeProvider().getSerializerDeserializer(itemType.getFieldTypes()[fieldIdx]);
+            f++;
+        }
+        for (int j = 0; j < inputRecordDesc.getFieldCount(); j++) {
+            outputTypeTraits[j + f] = inputRecordDesc.getTypeTraits()[j];
+            outputSerDes[j + f] = inputRecordDesc.getFields()[j];
+        }
+        RecordDescriptor outputRecordDesc = new RecordDescriptor(outputSerDes, outputTypeTraits);
+        op = new BADLSMPrimaryUpsertOperatorDescriptor(spec, outputRecordDesc, fieldPermutation, idfh,
+                missingWriterFactory, modificationCallbackFactory, searchCallbackFactory,
+                dataset.getFrameOpCallbackFactory(metadataProvider), numKeys, itemType, fieldIdx, hasSecondaries);
+        return new Pair<>(op, splitsAndConstraint.second);
+    }
+}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java
index a764a5a..0c11398 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java
@@ -28,11 +28,11 @@
 public class BADMetadataRecordTypes {
 
     // -------------------------------------- Subscriptions --------------------------------------//
-    private static final String[] subTypeFieldNames =
-            { BADConstants.DataverseName, BADConstants.BrokerName, BADConstants.SubscriptionId };
+    private static final String[] subTypeFieldNames = { BADConstants.METADATA_TYPE_NAME_DATAVERSENAME,
+            BADConstants.METADATA_TYPE_FIELD_NAME_BROKERNAME, BADConstants.SubscriptionId };
     private static final IAType[] subTypeFieldTypes = { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.AUUID };
     public static final ARecordType channelSubscriptionsType =
-            new ARecordType(BADConstants.ChannelSubscriptionsType, subTypeFieldNames, subTypeFieldTypes, true);
+            new ARecordType(BADConstants.METADATA_TYPENAME_SUBSCRIPTIONS, subTypeFieldNames, subTypeFieldTypes, true);
 
     // ---------------------------------------- Results --------------------------------------------//
     private static final String[] resultTypeFieldNames = { BADConstants.ResultId, BADConstants.ChannelExecutionTime,
@@ -53,11 +53,11 @@
     public static final int CHANNEL_ARECORD_BODY_FIELD_INDEX = 7;
     public static final ARecordType CHANNEL_RECORDTYPE = MetadataRecordTypes.createRecordType(
             // RecordTypeName
-            BADConstants.RECORD_TYPENAME_CHANNEL,
+            BADConstants.METADATA_TYPENAME_CHANNEL,
             // FieldNames
-            new String[] { BADConstants.DataverseName, BADConstants.ChannelName, BADConstants.SubscriptionsDatasetName,
-                    BADConstants.ResultsDatasetName, BADConstants.Function, BADConstants.Duration,
-                    BADConstants.FIELD_NAME_DEPENDENCIES, BADConstants.FIELD_NAME_BODY },
+            new String[] { BADConstants.METADATA_TYPE_NAME_DATAVERSENAME, BADConstants.ChannelName,
+                    BADConstants.SubscriptionsDatasetName, BADConstants.ResultsDatasetName, BADConstants.Function,
+                    BADConstants.Duration, BADConstants.FIELD_NAME_DEPENDENCIES, BADConstants.FIELD_NAME_BODY },
             // FieldTypes
             new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING,
                     new AOrderedListType(BuiltinType.ASTRING, null), BuiltinType.ASTRING,
@@ -70,14 +70,18 @@
     public static final int BROKER_DATAVERSE_NAME_FIELD_INDEX = 0;
     public static final int BROKER_NAME_FIELD_INDEX = 1;
     public static final int BROKER_ENDPOINT_FIELD_INDEX = 2;
+    public static final int BROKER_TYPE_FIELD_INDEX = 3;
     public static final ARecordType BROKER_RECORDTYPE = MetadataRecordTypes.createRecordType(
             // RecordTypeName
-            BADConstants.RECORD_TYPENAME_BROKER,
+            BADConstants.METADATA_TYPENAME_BROKER,
             // FieldNames
-            new String[] { BADConstants.DataverseName, BADConstants.BrokerName, BADConstants.BrokerEndPoint },
+            new String[] { BADConstants.METADATA_TYPE_NAME_DATAVERSENAME,
+                    BADConstants.METADATA_TYPE_FIELD_NAME_BROKERNAME,
+                    BADConstants.METADATA_TYPE_FIELD_NAME_BROKER_END_POINT,
+                    BADConstants.METADATA_TYPE_FIELD_NAME_BROKER_TYPE },
             // FieldTypes
             new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING,
-                    BuiltinType.ASTRING, BuiltinType.ASTRING },
+                    BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING },
             //IsOpen?
             true);
 
@@ -93,11 +97,12 @@
     public static final int PROCEDURE_ARECORD_DEPENDENCIES_FIELD_INDEX = 8;
     public static final ARecordType PROCEDURE_RECORDTYPE = MetadataRecordTypes.createRecordType(
             // RecordTypeName
-            BADConstants.RECORD_TYPENAME_PROCEDURE,
+            BADConstants.METADATA_TYPENAME_PROCEDURE,
             // FieldNames
-            new String[] { BADConstants.DataverseName, BADConstants.ProcedureName, BADConstants.FIELD_NAME_ARITY,
-                    BADConstants.FIELD_NAME_PARAMS, BADConstants.FIELD_NAME_TYPE, BADConstants.FIELD_NAME_DEFINITION,
-                    BADConstants.FIELD_NAME_LANGUAGE, BADConstants.Duration, BADConstants.FIELD_NAME_DEPENDENCIES },
+            new String[] { BADConstants.METADATA_TYPE_NAME_DATAVERSENAME, BADConstants.ProcedureName,
+                    BADConstants.FIELD_NAME_ARITY, BADConstants.FIELD_NAME_PARAMS, BADConstants.FIELD_NAME_TYPE,
+                    BADConstants.FIELD_NAME_DEFINITION, BADConstants.FIELD_NAME_LANGUAGE, BADConstants.Duration,
+                    BADConstants.FIELD_NAME_DEPENDENCIES },
             // FieldTypes
             new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING,
                     new AOrderedListType(BuiltinType.ASTRING, null), BuiltinType.ASTRING, BuiltinType.ASTRING,
@@ -107,4 +112,15 @@
             //IsOpen?
             true);
 
+    //------------------------------------------ Active Timestamp ----------------------------------------//
+    public static final ARecordType ACTIVE_RECORD_RECORD_TYPE = MetadataRecordTypes.createRecordType(
+            // RecordTypeName
+            BADConstants.RECORD_TYPENAME_ACTIVE_RECORD,
+            // FieldNames
+            new String[] { BADConstants.FIELD_NAME_ACTIVE_TS },
+            // FieldTypes
+            new IAType[] { BuiltinType.ADATETIME },
+            //IsOpen?
+            true);
+
 }
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Broker.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Broker.java
index feabf3f..ef6cfbc 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Broker.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Broker.java
@@ -29,11 +29,13 @@
     private final DataverseName dataverseName;
     private final String brokerName;
     private final String endPointName;
+    private final String brokerType;
 
-    public Broker(DataverseName dataverseName, String brokerName, String endPointName) {
+    public Broker(DataverseName dataverseName, String brokerName, String endPointName, String brokerType) {
         this.endPointName = endPointName;
         this.dataverseName = dataverseName;
         this.brokerName = brokerName;
+        this.brokerType = brokerType;
     }
 
     public DataverseName getDataverseName() {
@@ -48,6 +50,10 @@
         return endPointName;
     }
 
+    public String getBrokerType() {
+        return brokerType;
+    }
+
     @Override
     public boolean equals(Object other) {
         if (this == other) {
@@ -67,4 +73,5 @@
     public ExtensionMetadataDatasetId getDatasetId() {
         return BADMetadataIndexes.BAD_BROKER_INDEX_ID;
     }
+
 }
\ No newline at end of file
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BrokerTupleTranslator.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BrokerTupleTranslator.java
index 1114d06..d71a1db 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BrokerTupleTranslator.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BrokerTupleTranslator.java
@@ -50,8 +50,10 @@
                 ((AString) brokerRecord.getValueByPos(BADMetadataRecordTypes.BROKER_NAME_FIELD_INDEX)).getStringValue();
         String endPointName = ((AString) brokerRecord.getValueByPos(BADMetadataRecordTypes.BROKER_ENDPOINT_FIELD_INDEX))
                 .getStringValue();
+        String brokerType =
+                ((AString) brokerRecord.getValueByPos(BADMetadataRecordTypes.BROKER_TYPE_FIELD_INDEX)).getStringValue();
 
-        return new Broker(dataverseName, brokerName, endPointName);
+        return new Broker(dataverseName, brokerName, endPointName, brokerType);
     }
 
     @Override
@@ -89,6 +91,12 @@
         stringSerde.serialize(aString, fieldValue.getDataOutput());
         recordBuilder.addField(BADMetadataRecordTypes.BROKER_ENDPOINT_FIELD_INDEX, fieldValue);
 
+        // write field 3
+        fieldValue.reset();
+        aString.setValue(broker.getBrokerType());
+        stringSerde.serialize(aString, fieldValue.getDataOutput());
+        recordBuilder.addField(BADMetadataRecordTypes.BROKER_TYPE_FIELD_INDEX, fieldValue);
+
         // write record
         recordBuilder.write(tupleBuilder.getDataOutput(), true);
 
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Channel.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Channel.java
index 9c5a1f0..a203468 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Channel.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Channel.java
@@ -44,16 +44,16 @@
     /*
     Dependencies are stored as an array of size two:
     element 0 is a list of dataset dependencies
-    -stored as triples of [DataverseName, Dataset, null] for the datasets
+    -stored as triples of [METADATA_TYPE_NAME_DATAVERSENAME, Dataset, null] for the datasets
     element 1 is a list of function dependencies
-    -stored as triples of [DataverseName, FunctionName, Arity] for the functions
+    -stored as triples of [METADATA_TYPE_NAME_DATAVERSENAME, FunctionName, Arity] for the functions
     */
     private final List<List<Triple<DataverseName, String, String>>> dependencies;
 
     public Channel(DataverseName dataverseName, String channelName, String subscriptionsDataset, String resultsDataset,
             FunctionSignature function, String duration, List<List<Triple<DataverseName, String, String>>> dependencies,
             String channelBody) {
-        this.channelId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, dataverseName, channelName);
+        this.channelId = new EntityId(BADConstants.RUNTIME_ENTITY_CHANNEL, dataverseName, channelName);
         this.function = function;
         this.duration = duration;
         this.resultsDatasetName = resultsDataset;
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Procedure.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Procedure.java
index c989611..fcf5549 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Procedure.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Procedure.java
@@ -42,16 +42,16 @@
     /*
     Dependencies are stored as an array of size two:
     element 0 is a list of dataset dependencies
-    -stored as triples of [DataverseName, Dataset, null] for the datasets
+    -stored as triples of [METADATA_TYPE_NAME_DATAVERSENAME, Dataset, null] for the datasets
     element 1 is a list of function dependencies
-    -stored as triples of [DataverseName, FunctionName, Arity] for the functions
+    -stored as triples of [METADATA_TYPE_NAME_DATAVERSENAME, FunctionName, Arity] for the functions
      */
     private final List<List<Triple<DataverseName, String, String>>> dependencies;
 
     public Procedure(DataverseName dataverseName, String functionName, int arity, List<String> params, String type,
             String functionBody, String language, String duration,
             List<List<Triple<DataverseName, String, String>>> dependencies) {
-        this.procedureId = new EntityId(BADConstants.PROCEDURE_KEYWORD, dataverseName, functionName);
+        this.procedureId = new EntityId(BADConstants.RUNTIME_ENTITY_PROCEDURE, dataverseName, functionName);
         this.params = params;
         this.body = functionBody;
         this.type = type;
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/recovery/BADGlobalRecoveryManager.java b/asterix-bad/src/main/java/org/apache/asterix/bad/recovery/BADGlobalRecoveryManager.java
index b7f316e..b7d2f54 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/recovery/BADGlobalRecoveryManager.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/recovery/BADGlobalRecoveryManager.java
@@ -35,9 +35,9 @@
 import org.apache.asterix.app.translator.DefaultStatementExecutorFactory;
 import org.apache.asterix.app.translator.RequestParameters;
 import org.apache.asterix.bad.BADJobService;
+import org.apache.asterix.bad.extension.BADLangExtension;
 import org.apache.asterix.bad.lang.BADCompilationProvider;
-import org.apache.asterix.bad.lang.BADLangExtension;
-import org.apache.asterix.bad.lang.BADStatementExecutor;
+import org.apache.asterix.bad.lang.BADQueryTranslator;
 import org.apache.asterix.bad.metadata.Channel;
 import org.apache.asterix.bad.metadata.DeployedJobSpecEventListener;
 import org.apache.asterix.bad.metadata.DeployedJobSpecEventListener.PrecompiledType;
@@ -96,8 +96,8 @@
         SessionConfig sessionConfig =
                 new SessionConfig(SessionConfig.OutputFormat.ADM, true, true, true, SessionConfig.PlanFormat.STRING);
         final SessionOutput sessionOutput = new SessionOutput(sessionConfig, null);
-        BADStatementExecutor badStatementExecutor =
-                new BADStatementExecutor(appCtx, new ArrayList<>(), sessionOutput, new BADCompilationProvider(),
+        BADQueryTranslator badStatementExecutor =
+                new BADQueryTranslator(appCtx, new ArrayList<>(), sessionOutput, new BADCompilationProvider(),
                         Executors.newSingleThreadExecutor(
                                 new HyracksThreadFactory(DefaultStatementExecutorFactory.class.getSimpleName())),
                         new ResponsePrinter(sessionOutput));
@@ -133,10 +133,11 @@
                     new RequestParameters(requestReference, null, null, null, null, null, null, null, null, null, true),
                     true);
 
-            ScheduledExecutorService ses = BADJobService.startRepetitiveDeployedJobSpec(listener.getDeployedJobSpecId(),
-                    hcc, BADJobService.findPeriod(channel.getDuration()), new HashMap<>(), entityId,
-                    metadataProvider.getTxnIdFactory(), listener);
+            ScheduledExecutorService ses = BADJobService.createExecutorServe();
             listener.setExecutorService(ses);
+            BADJobService.startRepetitiveDeployedJobSpec(ses, listener.getDeployedJobSpecId(), hcc,
+                    BADJobService.findPeriod(channel.getDuration()), new HashMap<>(), entityId,
+                    metadataProvider.getTxnIdFactory(), listener);
             metadataProvider.getLocks().unlock();
 
             LOGGER.log(Level.SEVERE, entityId.getExtensionName() + " " + entityId.getDataverseName() + "."
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java b/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
index 6a0b31c..41393a4 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
@@ -24,8 +24,8 @@
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.algebra.operators.CommitOperator;
 import org.apache.asterix.bad.BADConstants;
-import org.apache.asterix.bad.runtime.NotifyBrokerOperator;
-import org.apache.asterix.bad.runtime.NotifyBrokerPOperator;
+import org.apache.asterix.bad.runtime.operators.NotifyBrokerOperator;
+import org.apache.asterix.bad.runtime.operators.NotifyBrokerPOperator;
 import org.apache.asterix.common.metadata.DataverseName;
 import org.apache.asterix.lang.common.util.FunctionUtil;
 import org.apache.asterix.metadata.declared.DatasetDataSource;
@@ -133,6 +133,8 @@
 
         //Now we need to get the broker EndPoint
         LogicalVariable brokerEndpointVar = context.newVar();
+        LogicalVariable brokerTypeVar = context.newVar();
+
         AbstractLogicalOperator opAboveBrokersScan = findOp(op, "brokers");
         if (opAboveBrokersScan == null) {
             return false;
@@ -155,8 +157,10 @@
             ((CommitOperator) ((DelegateOperator) op).getDelegate()).setSink(false);
         }
 
-        AssignOperator assignOp = createbrokerEndPointAssignOperator(brokerEndpointVar, opAboveBrokersScan);
-        //now brokerNameVar holds the brokerName for use farther up in the plan
+        AssignOperator assignOp =
+                createbrokerEndPointAssignOperator(brokerEndpointVar, brokerTypeVar, opAboveBrokersScan);
+
+        visitGroupBy(op, assignOp, brokerEndpointVar, brokerTypeVar);
 
         context.computeAndSetTypeEnvironmentForOperator(assignOp);
         context.computeAndSetTypeEnvironmentForOperator(opAboveBrokersScan);
@@ -166,25 +170,46 @@
         badProject.getVariables().add(subscriptionIdVar);
         badProject.getVariables().add(brokerEndpointVar);
         badProject.getVariables().add(channelExecutionVar);
+        badProject.getVariables().add(brokerTypeVar);
         context.computeAndSetTypeEnvironmentForOperator(badProject);
 
         //Create my brokerNotify plan above the extension Operator
         DelegateOperator dOp = push
                 ? createNotifyBrokerPushPlan(brokerEndpointVar, badProject.getVariables().get(0), channelExecutionVar,
-                        context, op, (DistributeResultOperator) op1, channelDataverse, channelName)
-                : createNotifyBrokerPullPlan(brokerEndpointVar, subscriptionIdVar, channelExecutionVar, context, op,
-                        (DistributeResultOperator) op1, channelDataverse, channelName);
+                        brokerTypeVar, context, op, (DistributeResultOperator) op1, channelDataverse, channelName)
+                : createNotifyBrokerPullPlan(brokerEndpointVar, subscriptionIdVar, channelExecutionVar, brokerTypeVar,
+                        context, op, (DistributeResultOperator) op1, channelDataverse, channelName);
 
         opRef.setValue(dOp);
 
         return true;
     }
 
+    private boolean visitGroupBy(ILogicalOperator currOp, ILogicalOperator brokerAssignOp, LogicalVariable endpointVar,
+            LogicalVariable typeVar) {
+        // this method makes sure even the broker information is not projected out
+        boolean containsBroker = false;
+        if (currOp == brokerAssignOp) {
+            return true;
+        } else {
+            for (Mutable<ILogicalOperator> input : currOp.getInputs()) {
+                containsBroker = containsBroker || visitGroupBy(input.getValue(), brokerAssignOp, endpointVar, typeVar);
+            }
+        }
+        if (currOp.getOperatorTag() == LogicalOperatorTag.GROUP && containsBroker) {
+            GroupByOperator groupByOperator = (GroupByOperator) currOp;
+            groupByOperator.addDecorExpression(null, new VariableReferenceExpression(endpointVar));
+            groupByOperator.addDecorExpression(null, new VariableReferenceExpression(typeVar));
+        }
+        return containsBroker;
+    }
+
     private DelegateOperator createBrokerOp(LogicalVariable brokerEndpointVar, LogicalVariable sendVar,
-            LogicalVariable channelExecutionVar, DataverseName channelDataverse, String channelName, boolean push) {
+            LogicalVariable channelExecutionVar, LogicalVariable brokerTypeVar, DataverseName channelDataverse,
+            String channelName, boolean push) {
         NotifyBrokerOperator notifyBrokerOp =
-                new NotifyBrokerOperator(brokerEndpointVar, sendVar, channelExecutionVar, push);
-        EntityId activeId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, channelDataverse, channelName);
+                new NotifyBrokerOperator(brokerEndpointVar, sendVar, channelExecutionVar, brokerTypeVar, push);
+        EntityId activeId = new EntityId(BADConstants.RUNTIME_ENTITY_CHANNEL, channelDataverse, channelName);
         NotifyBrokerPOperator notifyBrokerPOp = new NotifyBrokerPOperator(activeId);
         notifyBrokerOp.setPhysicalOperator(notifyBrokerPOp);
         DelegateOperator extensionOp = new DelegateOperator(notifyBrokerOp);
@@ -193,9 +218,9 @@
     }
 
     private DelegateOperator createNotifyBrokerPushPlan(LogicalVariable brokerEndpointVar, LogicalVariable sendVar,
-            LogicalVariable channelExecutionVar, IOptimizationContext context, ILogicalOperator eOp,
-            DistributeResultOperator distributeOp, DataverseName channelDataverse, String channelName)
-            throws AlgebricksException {
+            LogicalVariable channelExecutionVar, LogicalVariable brokerTypeVar, IOptimizationContext context,
+            ILogicalOperator eOp, DistributeResultOperator distributeOp, DataverseName channelDataverse,
+            String channelName) throws AlgebricksException {
         //Find the assign operator to get the result type that we need
         AbstractLogicalOperator assign = (AbstractLogicalOperator) eOp.getInputs().get(0).getValue();
         while (assign.getOperatorTag() != LogicalOperatorTag.ASSIGN) {
@@ -203,8 +228,8 @@
         }
 
         //Create the NotifyBrokerOperator
-        DelegateOperator extensionOp =
-                createBrokerOp(brokerEndpointVar, sendVar, channelExecutionVar, channelDataverse, channelName, true);
+        DelegateOperator extensionOp = createBrokerOp(brokerEndpointVar, sendVar, channelExecutionVar, brokerTypeVar,
+                channelDataverse, channelName, true);
 
         extensionOp.getInputs().add(new MutableObject<>(eOp));
         context.computeAndSetTypeEnvironmentForOperator(extensionOp);
@@ -214,9 +239,9 @@
     }
 
     private DelegateOperator createNotifyBrokerPullPlan(LogicalVariable brokerEndpointVar, LogicalVariable sendVar,
-            LogicalVariable channelExecutionVar, IOptimizationContext context, ILogicalOperator eOp,
-            DistributeResultOperator distributeOp, DataverseName channelDataverse, String channelName)
-            throws AlgebricksException {
+            LogicalVariable channelExecutionVar, LogicalVariable brokerTypeVar, IOptimizationContext context,
+            ILogicalOperator eOp, DistributeResultOperator distributeOp, DataverseName channelDataverse,
+            String channelName) throws AlgebricksException {
 
         //Create the Distinct Op
         ArrayList<Mutable<ILogicalExpression>> expressions = new ArrayList<>();
@@ -231,6 +256,7 @@
         //Create GroupBy operator
         GroupByOperator groupbyOp = new GroupByOperator(groupByList, groupByDecorList, nestedPlans);
         groupbyOp.addGbyExpression(null, new VariableReferenceExpression(brokerEndpointVar));
+        groupbyOp.addGbyExpression(null, new VariableReferenceExpression(brokerTypeVar));
         groupbyOp.addGbyExpression(null, new VariableReferenceExpression(channelExecutionVar));
 
         //Set the distinct as input
@@ -254,7 +280,7 @@
 
         //Create the NotifyBrokerOperator
         DelegateOperator extensionOp = createBrokerOp(brokerEndpointVar, sendListVar, channelExecutionVar,
-                channelDataverse, channelName, false);
+                brokerTypeVar, channelDataverse, channelName, false);
 
         //Set the input for the distinct as the old top
         extensionOp.getInputs().add(new MutableObject<>(groupbyOp));
@@ -262,9 +288,9 @@
 
         //compute environment bottom up
         context.computeAndSetTypeEnvironmentForOperator(distinctOp);
-        context.computeAndSetTypeEnvironmentForOperator(groupbyOp);
         context.computeAndSetTypeEnvironmentForOperator(nestedTupleSourceOp);
         context.computeAndSetTypeEnvironmentForOperator(listifyOp);
+        context.computeAndSetTypeEnvironmentForOperator(groupbyOp);
         context.computeAndSetTypeEnvironmentForOperator(extensionOp);
 
         return extensionOp;
@@ -272,9 +298,11 @@
     }
 
     private AssignOperator createbrokerEndPointAssignOperator(LogicalVariable brokerEndpointVar,
-            AbstractLogicalOperator opAboveBrokersScan) {
-        Mutable<ILogicalExpression> fieldRef = new MutableObject<ILogicalExpression>(
-                new ConstantExpression(new AsterixConstantValue(new AString(BADConstants.BrokerEndPoint))));
+            LogicalVariable brokerTypeVar, AbstractLogicalOperator opAboveBrokersScan) {
+        Mutable<ILogicalExpression> endpointFieldName = new MutableObject<ILogicalExpression>(new ConstantExpression(
+                new AsterixConstantValue(new AString(BADConstants.METADATA_TYPE_FIELD_NAME_BROKER_END_POINT))));
+        Mutable<ILogicalExpression> brokerTypeFieldName = new MutableObject<ILogicalExpression>(new ConstantExpression(
+                new AsterixConstantValue(new AString(BADConstants.METADATA_TYPE_FIELD_NAME_BROKER_TYPE))));
         DataSourceScanOperator brokerScan = null;
         int index = 0;
         for (Mutable<ILogicalOperator> subOp : opAboveBrokersScan.getInputs()) {
@@ -287,18 +315,24 @@
         Mutable<ILogicalExpression> varRef = new MutableObject<ILogicalExpression>(
                 new VariableReferenceExpression(brokerScan.getVariables().get(2)));
 
-        ScalarFunctionCallExpression fieldAccessByName = new ScalarFunctionCallExpression(
-                FunctionUtil.getFunctionInfo(BuiltinFunctions.FIELD_ACCESS_BY_NAME), varRef, fieldRef);
-        ArrayList<LogicalVariable> varArray = new ArrayList<LogicalVariable>(1);
+        ScalarFunctionCallExpression brokerEndpointFieldAccessor = new ScalarFunctionCallExpression(
+                FunctionUtil.getFunctionInfo(BuiltinFunctions.FIELD_ACCESS_BY_NAME), varRef, endpointFieldName);
+        ScalarFunctionCallExpression brokerTYpeFieldAccessor = new ScalarFunctionCallExpression(
+                FunctionUtil.getFunctionInfo(BuiltinFunctions.FIELD_ACCESS_BY_NAME), varRef, brokerTypeFieldName);
+
+        ArrayList<LogicalVariable> varArray = new ArrayList<LogicalVariable>(2);
         varArray.add(brokerEndpointVar);
-        ArrayList<Mutable<ILogicalExpression>> exprArray = new ArrayList<Mutable<ILogicalExpression>>(1);
-        exprArray.add(new MutableObject<ILogicalExpression>(fieldAccessByName));
+        varArray.add(brokerTypeVar);
+
+        ArrayList<Mutable<ILogicalExpression>> exprArray = new ArrayList<Mutable<ILogicalExpression>>(2);
+        exprArray.add(new MutableObject<>(brokerEndpointFieldAccessor));
+        exprArray.add(new MutableObject<>(brokerTYpeFieldAccessor));
 
         AssignOperator assignOp = new AssignOperator(varArray, exprArray);
 
         //Place assignOp between the scan and the op above it
-        assignOp.getInputs().add(new MutableObject<ILogicalOperator>(brokerScan));
-        opAboveBrokersScan.getInputs().set(index, new MutableObject<ILogicalOperator>(assignOp));
+        assignOp.getInputs().add(new MutableObject<>(brokerScan));
+        opAboveBrokersScan.getInputs().set(index, new MutableObject<>(assignOp));
 
         return assignOp;
     }
@@ -313,40 +347,18 @@
             return null;
         }
         for (Mutable<ILogicalOperator> subOp : op.getInputs()) {
-            if (lookingForString.equals("brokers")) {
-                if (isBrokerScan((AbstractLogicalOperator) subOp.getValue())) {
-                    return op;
-                } else {
-                    AbstractLogicalOperator nestedOp =
-                            findOp((AbstractLogicalOperator) subOp.getValue(), lookingForString);
-                    if (nestedOp != null) {
-                        return nestedOp;
-                    }
+            if (lookingForString.equals("brokers") && isBrokerScan((AbstractLogicalOperator) subOp.getValue())) {
+                return op;
+            } else if (lookingForString.equals("project")
+                    && subOp.getValue().getOperatorTag() == LogicalOperatorTag.PROJECT) {
+                return (AbstractLogicalOperator) subOp.getValue();
+            } else if (isSubscriptionsScan((AbstractLogicalOperator) subOp.getValue(), lookingForString)) {
+                return (AbstractLogicalOperator) subOp.getValue();
+            } else {
+                AbstractLogicalOperator nestedOp = findOp((AbstractLogicalOperator) subOp.getValue(), lookingForString);
+                if (nestedOp != null) {
+                    return nestedOp;
                 }
-
-            } else if (lookingForString.equals("project")) {
-                if (subOp.getValue().getOperatorTag() == LogicalOperatorTag.PROJECT) {
-                    return (AbstractLogicalOperator) subOp.getValue();
-                } else {
-                    AbstractLogicalOperator nestedOp =
-                            findOp((AbstractLogicalOperator) subOp.getValue(), lookingForString);
-                    if (nestedOp != null) {
-                        return nestedOp;
-                    }
-                }
-            }
-
-            else {
-                if (isSubscriptionsScan((AbstractLogicalOperator) subOp.getValue(), lookingForString)) {
-                    return (AbstractLogicalOperator) subOp.getValue();
-                } else {
-                    AbstractLogicalOperator nestedOp =
-                            findOp((AbstractLogicalOperator) subOp.getValue(), lookingForString);
-                    if (nestedOp != null) {
-                        return nestedOp;
-                    }
-                }
-
             }
         }
         return null;
@@ -356,10 +368,8 @@
         if (op instanceof DataSourceScanOperator) {
             if (((DataSourceScanOperator) op).getDataSource() instanceof DatasetDataSource) {
                 DatasetDataSource dds = (DatasetDataSource) ((DataSourceScanOperator) op).getDataSource();
-                if (dds.getDataset().getDataverseName().equals(MetadataConstants.METADATA_DATAVERSE_NAME)
-                        && dds.getDataset().getDatasetName().equals("Broker")) {
-                    return true;
-                }
+                return dds.getDataset().getDataverseName().equals(MetadataConstants.METADATA_DATAVERSE_NAME)
+                        && dds.getDataset().getDatasetName().equals(BADConstants.METADATA_DATASET_BROKER);
             }
         }
         return false;
@@ -370,10 +380,8 @@
             if (((DataSourceScanOperator) op).getDataSource() instanceof DatasetDataSource) {
                 DatasetDataSource dds = (DatasetDataSource) ((DataSourceScanOperator) op).getDataSource();
                 if (dds.getDataset().getItemTypeDataverseName().equals(MetadataConstants.METADATA_DATAVERSE_NAME)
-                        && dds.getDataset().getItemTypeName().equals("ChannelSubscriptionsType")) {
-                    if (subscriptionsName.equals("") || dds.getDataset().getDatasetName().equals(subscriptionsName)) {
-                        return true;
-                    }
+                        && dds.getDataset().getItemTypeName().equals(BADConstants.METADATA_TYPENAME_SUBSCRIPTIONS)) {
+                    return subscriptionsName.equals("") || dds.getDataset().getDatasetName().equals(subscriptionsName);
                 }
             }
         }
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/rules/RewriteChannelTimeFunctionToLocalVarRule.java b/asterix-bad/src/main/java/org/apache/asterix/bad/rules/RewriteChannelTimeFunctionToLocalVarRule.java
new file mode 100644
index 0000000..ec922e7
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/rules/RewriteChannelTimeFunctionToLocalVarRule.java
@@ -0,0 +1,213 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements. See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership. The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License. You may obtain a copy of the License at
+ *  *
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied. See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+package org.apache.asterix.bad.rules;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.asterix.bad.BADConstants;
+import org.apache.asterix.bad.function.BADFunctions;
+import org.apache.asterix.lang.common.util.FunctionUtil;
+import org.apache.asterix.metadata.declared.DataSource;
+import org.apache.asterix.om.base.AString;
+import org.apache.asterix.om.constants.AsterixConstantValue;
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import org.apache.hyracks.algebricks.core.algebra.functions.AlgebricksBuiltinFunctions;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
+import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+public class RewriteChannelTimeFunctionToLocalVarRule implements IAlgebraicRewriteRule {
+
+    @Override
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
+
+        Mutable<ILogicalExpression> exprRef;
+        if (opRef.getValue().getOperatorTag() == LogicalOperatorTag.INNERJOIN) {
+            InnerJoinOperator selectOp = (InnerJoinOperator) opRef.getValue();
+            exprRef = selectOp.getCondition();
+        } else if (opRef.getValue().getOperatorTag() == LogicalOperatorTag.SELECT) {
+            SelectOperator selectOp = (SelectOperator) opRef.getValue();
+            exprRef = selectOp.getCondition();
+        } else {
+            return false;
+        }
+
+        Set<Mutable<ILogicalExpression>> activeFunctionSet = new HashSet<>();
+        Set<LogicalVariable> needPrevDsSet = new HashSet<>();
+        Set<LogicalVariable> needCurrDsSet = new HashSet<>();
+        Set<LogicalVariable> needActiveDsSet = new HashSet<>();
+
+        // collect active functions
+        collectChannelTimeFunctions(exprRef, activeFunctionSet, needPrevDsSet, needCurrDsSet, needActiveDsSet);
+        if (activeFunctionSet.size() == 0) {
+            return false;
+        }
+
+        // add assigns for active functions
+        Map<LogicalVariable, LogicalVariable> prevMap = new HashMap<>();
+        Map<LogicalVariable, LogicalVariable> currMap = new HashMap<>();
+        Map<LogicalVariable, LogicalVariable> activeMap = new HashMap<>();
+        createChannelTimeAssignOps(opRef, needPrevDsSet, needCurrDsSet, needActiveDsSet, prevMap, currMap, activeMap,
+                context);
+
+        // update expressions with new vars
+        updateActiveFuncExprsWithVars(prevMap, currMap, activeMap, activeFunctionSet);
+
+        context.computeAndSetTypeEnvironmentForOperator(opRef.getValue());
+        return true;
+    }
+
+    private void updateActiveFuncExprsWithVars(Map<LogicalVariable, LogicalVariable> prevMap,
+            Map<LogicalVariable, LogicalVariable> currMap, Map<LogicalVariable, LogicalVariable> activeMap,
+            Set<Mutable<ILogicalExpression>> activeFuncExprs) {
+
+        for (Mutable<ILogicalExpression> expr : activeFuncExprs) {
+            AbstractFunctionCallExpression funcExpr = (AbstractFunctionCallExpression) expr.getValue();
+            LogicalVariable dsVar = ((VariableReferenceExpression) ((AbstractFunctionCallExpression) expr.getValue())
+                    .getArguments().get(0).getValue()).getVariableReference();
+            if (funcExpr.getFunctionIdentifier() == BADFunctions.CURRENT_CHANNEL_TIME) {
+                expr.setValue(new VariableReferenceExpression(currMap.get(dsVar)));
+            } else if (funcExpr.getFunctionIdentifier() == BADFunctions.PREVIOUS_CHANNEL_TIME) {
+                expr.setValue(new VariableReferenceExpression(prevMap.get(dsVar)));
+            } else {
+                ILogicalExpression lessThanExpr =
+                        new ScalarFunctionCallExpression(FunctionUtil.getFunctionInfo(AlgebricksBuiltinFunctions.LT),
+                                new MutableObject<>(new VariableReferenceExpression(activeMap.get(dsVar))),
+                                new MutableObject<>(new VariableReferenceExpression(currMap.get(dsVar))));
+                ILogicalExpression greaterThanExpr =
+                        new ScalarFunctionCallExpression(FunctionUtil.getFunctionInfo(AlgebricksBuiltinFunctions.GT),
+                                new MutableObject<>(new VariableReferenceExpression(activeMap.get(dsVar))),
+                                new MutableObject<>(new VariableReferenceExpression(prevMap.get(dsVar))));
+                ScalarFunctionCallExpression andExpr =
+                        new ScalarFunctionCallExpression(FunctionUtil.getFunctionInfo(BuiltinFunctions.AND),
+                                new MutableObject<>(lessThanExpr), new MutableObject<>(greaterThanExpr));
+                expr.setValue(andExpr);
+            }
+        }
+    }
+
+    private void createChannelTimeAssignOps(Mutable<ILogicalOperator> opRef, Set<LogicalVariable> needPrevDsSet,
+            Set<LogicalVariable> needCurrDsSet, Set<LogicalVariable> needActiveDsSet,
+            Map<LogicalVariable, LogicalVariable> prevMap, Map<LogicalVariable, LogicalVariable> currMap,
+            Map<LogicalVariable, LogicalVariable> activeMap, IOptimizationContext context) {
+        ILogicalOperator currOp = opRef.getValue();
+        String channelName =
+                (String) context.getMetadataProvider().getConfig().getOrDefault(BADConstants.CONFIG_CHANNEL_NAME, "");
+        if (opRef.getValue().getOperatorTag() == LogicalOperatorTag.DATASOURCESCAN) {
+            DataSourceScanOperator dataScanOp = (DataSourceScanOperator) opRef.getValue();
+            DataSource ds = (DataSource) dataScanOp.getDataSource();
+            LogicalVariable dsVar = ds.getDataRecordVariable(dataScanOp.getScanVariables());
+
+            if (needPrevDsSet.contains(dsVar) || needActiveDsSet.contains(dsVar)) {
+                LogicalVariable channelTimeVar = context.newVar();
+                ILogicalExpression previousChannelTimeExpr = new ScalarFunctionCallExpression(
+                        BuiltinFunctions.getAsterixFunctionInfo(BADFunctions.PREVIOUS_CHANNEL_TIME),
+                        new MutableObject<>(
+                                new ConstantExpression(new AsterixConstantValue(new AString(channelName)))));
+                AssignOperator assignOp =
+                        new AssignOperator(channelTimeVar, new MutableObject<>(previousChannelTimeExpr));
+                assignOp.getInputs().add(new MutableObject<>(opRef.getValue()));
+                opRef.setValue(assignOp);
+                prevMap.put(dsVar, channelTimeVar);
+            }
+
+            if (needCurrDsSet.contains(dsVar) || needActiveDsSet.contains(dsVar)) {
+                LogicalVariable channelTimeVar = context.newVar();
+                ILogicalExpression previousChannelTimeExpr = new ScalarFunctionCallExpression(
+                        BuiltinFunctions.getAsterixFunctionInfo(BADFunctions.CURRENT_CHANNEL_TIME), new MutableObject<>(
+                                new ConstantExpression(new AsterixConstantValue(new AString(channelName)))));
+                AssignOperator assignOp =
+                        new AssignOperator(channelTimeVar, new MutableObject<>(previousChannelTimeExpr));
+                assignOp.getInputs().add(new MutableObject<>(opRef.getValue()));
+                opRef.setValue(assignOp);
+                currMap.put(dsVar, channelTimeVar);
+            }
+
+            if (needActiveDsSet.contains(dsVar)) {
+                LogicalVariable channelTimeVar = context.newVar();
+
+                ILogicalExpression activeTsFunc =
+                        new ScalarFunctionCallExpression(FunctionUtil.getFunctionInfo(BuiltinFunctions.META),
+                                new MutableObject<>(new VariableReferenceExpression(dsVar)));
+                ScalarFunctionCallExpression faExpr = new ScalarFunctionCallExpression(
+                        FunctionUtil.getFunctionInfo(BuiltinFunctions.FIELD_ACCESS_BY_NAME),
+                        new MutableObject<>(activeTsFunc), new MutableObject<>(new ConstantExpression(
+                                new AsterixConstantValue(new AString(BADConstants.FIELD_NAME_ACTIVE_TS)))));
+                AssignOperator assignOp = new AssignOperator(channelTimeVar, new MutableObject<>(faExpr));
+                assignOp.getInputs().add(new MutableObject<>(opRef.getValue()));
+                opRef.setValue(assignOp);
+                activeMap.put(dsVar, channelTimeVar);
+            }
+        }
+        for (Mutable<ILogicalOperator> input : currOp.getInputs()) {
+            createChannelTimeAssignOps(input, needPrevDsSet, needCurrDsSet, needActiveDsSet, prevMap, currMap,
+                    activeMap, context);
+        }
+    }
+
+    private void collectChannelTimeFunctions(Mutable<ILogicalExpression> exprRef,
+            Set<Mutable<ILogicalExpression>> activeFunctionSet, Set<LogicalVariable> needPrevDsSet,
+            Set<LogicalVariable> needCurrDsSet, Set<LogicalVariable> needActive) {
+        if (exprRef.getValue().getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) {
+            AbstractFunctionCallExpression funcExpr = (AbstractFunctionCallExpression) exprRef.getValue();
+            if (funcExpr.getFunctionIdentifier() == BADFunctions.IS_NEW
+                    || funcExpr.getFunctionIdentifier() == BADFunctions.PREVIOUS_CHANNEL_TIME
+                    || funcExpr.getFunctionIdentifier() == BADFunctions.CURRENT_CHANNEL_TIME) {
+                // add to active func set for later replacement
+                activeFunctionSet.add(exprRef);
+                // collect ds var to see what assign op needs to be added
+                LogicalVariable dsVar = ((VariableReferenceExpression) funcExpr.getArguments().get(0).getValue())
+                        .getVariableReference();
+                if (funcExpr.getFunctionIdentifier() == BADFunctions.PREVIOUS_CHANNEL_TIME) {
+                    needPrevDsSet.add(dsVar);
+                } else if (funcExpr.getFunctionIdentifier() == BADFunctions.CURRENT_CHANNEL_TIME) {
+                    needCurrDsSet.add(dsVar);
+                } else {
+                    needActive.add(dsVar);
+                }
+            } else {
+                for (Mutable<ILogicalExpression> argExpr : funcExpr.getArguments()) {
+                    collectChannelTimeFunctions(argExpr, activeFunctionSet, needPrevDsSet, needCurrDsSet, needActive);
+                }
+            }
+        }
+    }
+
+}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/ActiveTimestampManager.java b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/ActiveTimestampManager.java
new file mode 100644
index 0000000..adbbd03
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/ActiveTimestampManager.java
@@ -0,0 +1,97 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements. See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership. The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License. You may obtain a copy of the License at
+ *  *
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied. See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+package org.apache.asterix.bad.runtime;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.hyracks.api.job.JobId;
+
+/*
+ * This only supports one channel currently. Can be extended to multiple channels.
+ * */
+public class ActiveTimestampManager {
+
+    static class ChannelTimeState {
+        long previousChannelExecutionTimestamp;
+        long currentChannelExecutionTimestamp;
+        JobId maxJobId;
+
+        public ChannelTimeState(long prev, long curr, JobId jobId) {
+            this.previousChannelExecutionTimestamp = prev;
+            this.currentChannelExecutionTimestamp = curr;
+            this.maxJobId = jobId;
+        }
+    }
+
+    private static Map<String, ChannelTimeState> channelTimeStateMap = new ConcurrentHashMap<>();
+    private static Logger LOGGER = Logger.getLogger(ActiveTimestampManager.class.getName());
+
+    public static synchronized boolean progressChannelExecutionTimestamps(JobId jobId, String channelName,
+            String nodeId) {
+        if (channelName.equals("")) {
+            return false;
+        }
+        // In distributed cases, channel name would be sufficient. Since in BADExecutionTest, all nodes share the same
+        // JVM, we would need to add the node id as part of the key
+        String channelTimeKey = nodeId + channelName;
+        if (channelTimeStateMap.containsKey(channelTimeKey)) {
+            ChannelTimeState state = channelTimeStateMap.get(channelTimeKey);
+            if (state.maxJobId.compareTo(jobId) < 0) {
+                state.previousChannelExecutionTimestamp = state.currentChannelExecutionTimestamp;
+                state.currentChannelExecutionTimestamp = System.currentTimeMillis();
+                state.maxJobId = jobId;
+                LOGGER.log(Level.FINE, "CHN TS UPD " + channelName + " at " + jobId + " on " + nodeId + " "
+                        + state.previousChannelExecutionTimestamp + "  -  " + state.currentChannelExecutionTimestamp);
+                // System.err.println("CHN TS UPD " + channelName + " at " + jobId + " on " + nodeId + " "
+                //         + state.previousChannelExecutionTimestamp + "  -  " + state.currentChannelExecutionTimestamp);
+                return true;
+            }
+        } else {
+            LOGGER.log(Level.FINE,
+                    "CHN TS INIT " + channelName + " at " + jobId + " 0 - " + System.currentTimeMillis());
+            // System.err.println("CHN TS INIT " + channelName + " at " + jobId + " on node " + nodeId + " 0 - "
+            //         + System.currentTimeMillis());
+            channelTimeStateMap.put(channelTimeKey, new ChannelTimeState(0, System.currentTimeMillis(), jobId));
+        }
+        return false;
+    }
+
+    public static long getPreviousChannelExecutionTimestamp(String channelName, String nodeId) {
+        String channelTimeKey = nodeId + channelName;
+        if (channelTimeStateMap.containsKey(channelTimeKey)) {
+            return channelTimeStateMap.get(channelTimeKey).previousChannelExecutionTimestamp;
+        } else {
+            return 0;
+        }
+    }
+
+    public static long getCurrentChannelExecutionTimestamp(String channelName, String nodeId) {
+        String channelTimeKey = nodeId + channelName;
+        if (channelTimeStateMap.containsKey(channelTimeKey)) {
+            return channelTimeStateMap.get(channelTimeKey).currentChannelExecutionTimestamp;
+        } else {
+            return System.currentTimeMillis();
+        }
+    }
+}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/ActiveTimestampState.java b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/ActiveTimestampState.java
new file mode 100644
index 0000000..ad7e424
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/ActiveTimestampState.java
@@ -0,0 +1,47 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements. See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership. The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License. You may obtain a copy of the License at
+ *  *
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied. See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+package org.apache.asterix.bad.runtime;
+
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.dataflow.std.base.AbstractStateObject;
+
+public class ActiveTimestampState extends AbstractStateObject {
+
+    private long previousChannelExecutionTime = -1;
+    private long currentChannelExecutionTime = -1;
+
+    public ActiveTimestampState(JobId jobId, Object id) {
+        super(jobId, id);
+    }
+
+    public void setExecutionTime(long prev, long curr) {
+        previousChannelExecutionTime = prev;
+        currentChannelExecutionTime = curr;
+    }
+
+    public long getCurrentChannelExecutionTime() {
+        return currentChannelExecutionTime;
+    }
+
+    public long getPreviousChannelExecutionTime() {
+        return previousChannelExecutionTime;
+    }
+}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerOperator.java b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/operators/NotifyBrokerOperator.java
similarity index 86%
rename from asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerOperator.java
rename to asterix-bad/src/main/java/org/apache/asterix/bad/runtime/operators/NotifyBrokerOperator.java
index 5fc9b22..5425e71 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerOperator.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/operators/NotifyBrokerOperator.java
@@ -6,9 +6,7 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License. You may obtain a copy of the License at
- *
  * http://www.apache.org/licenses/LICENSE-2.0
- *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -16,7 +14,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.bad.runtime;
+package org.apache.asterix.bad.runtime.operators;
 
 import java.util.Collection;
 
@@ -33,13 +31,15 @@
     private final LogicalVariable brokerEndpointVar;
     private final LogicalVariable channelExecutionVar;
     private final LogicalVariable pushListVar;
+    private final LogicalVariable brokerTypeVar;
     private final boolean push;
 
     public NotifyBrokerOperator(LogicalVariable brokerEndpointVar, LogicalVariable pushListVar,
-            LogicalVariable resultSetVar, boolean push) {
+            LogicalVariable resultSetVar, LogicalVariable brokerTypeVar, boolean push) {
         this.brokerEndpointVar = brokerEndpointVar;
         this.channelExecutionVar = resultSetVar;
         this.pushListVar = pushListVar;
+        this.brokerTypeVar = brokerTypeVar;
         this.push = push;
     }
 
@@ -55,6 +55,10 @@
         return channelExecutionVar;
     }
 
+    public LogicalVariable getBrokerTypeVar() {
+        return brokerTypeVar;
+    }
+
     public boolean getPush() {
         return push;
     }
@@ -62,7 +66,7 @@
     @Override
     public String toString() {
         return "notify-brokers (" + brokerEndpointVar.toString() + "," + channelExecutionVar.toString() + ","
-                + pushListVar.toString() + ")";
+                + pushListVar.toString() + "," + brokerTypeVar + ")";
     }
 
     @Override
@@ -72,7 +76,7 @@
 
     @Override
     public IOperatorDelegate newInstance() {
-        return new NotifyBrokerOperator(brokerEndpointVar, pushListVar, channelExecutionVar, push);
+        return new NotifyBrokerOperator(brokerEndpointVar, pushListVar, channelExecutionVar, brokerTypeVar, push);
     }
 
     @Override
@@ -86,6 +90,7 @@
         usedVars.add(pushListVar);
         usedVars.add(brokerEndpointVar);
         usedVars.add(channelExecutionVar);
+        usedVars.add(brokerTypeVar);
     }
 
     @Override
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerPOperator.java b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/operators/NotifyBrokerPOperator.java
similarity index 92%
rename from asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerPOperator.java
rename to asterix-bad/src/main/java/org/apache/asterix/bad/runtime/operators/NotifyBrokerPOperator.java
index 13cd166..c063f8e 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerPOperator.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/operators/NotifyBrokerPOperator.java
@@ -6,9 +6,7 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License. You may obtain a copy of the License at
- *
  * http://www.apache.org/licenses/LICENSE-2.0
- *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -17,7 +15,7 @@
  * under the License.
  */
 
-package org.apache.asterix.bad.runtime;
+package org.apache.asterix.bad.runtime.operators;
 
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.om.types.IAType;
@@ -79,6 +77,7 @@
         LogicalVariable pushListVar = ((NotifyBrokerOperator) notify.getDelegate()).getPushListVar();
         LogicalVariable brokerVar = ((NotifyBrokerOperator) notify.getDelegate()).getBrokerEndpointVariable();
         LogicalVariable executionVar = ((NotifyBrokerOperator) notify.getDelegate()).getChannelExecutionVariable();
+        LogicalVariable brokerTypeVar = ((NotifyBrokerOperator) notify.getDelegate()).getBrokerTypeVar();
 
         IVariableTypeEnvironment env = context.getTypeEnvironment(op.getInputs().get(0).getValue());
         IAType recordType = (IAType) env.getVarType(pushListVar);
@@ -88,13 +87,15 @@
         int brokerColumn = inputSchemas[0].findVariable(brokerVar);
         int pushColumn = inputSchemas[0].findVariable(pushListVar);
         int executionColumn = inputSchemas[0].findVariable(executionVar);
+        int brokerTypeColumn = inputSchemas[0].findVariable(brokerTypeVar);
 
         IScalarEvaluatorFactory brokerEvalFactory = new ColumnAccessEvalFactory(brokerColumn);
         IScalarEvaluatorFactory pushListEvalFactory = new ColumnAccessEvalFactory(pushColumn);
         IScalarEvaluatorFactory channelExecutionEvalFactory = new ColumnAccessEvalFactory(executionColumn);
+        IScalarEvaluatorFactory brokerTypeEvalFactory = new ColumnAccessEvalFactory(brokerTypeColumn);
 
         NotifyBrokerRuntimeFactory runtime = new NotifyBrokerRuntimeFactory(brokerEvalFactory, pushListEvalFactory,
-                channelExecutionEvalFactory, entityId, push, recordType);
+                channelExecutionEvalFactory, brokerTypeEvalFactory, entityId, push, recordType);
 
         RecordDescriptor recDesc =
                 JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema, context);
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/operators/NotifyBrokerRuntime.java
similarity index 65%
rename from asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java
rename to asterix-bad/src/main/java/org/apache/asterix/bad/runtime/operators/NotifyBrokerRuntime.java
index 69fb7d4..14d1500 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/operators/NotifyBrokerRuntime.java
@@ -6,9 +6,7 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License. You may obtain a copy of the License at
- *
  * http://www.apache.org/licenses/LICENSE-2.0
- *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -17,7 +15,7 @@
  * under the License.
  */
 
-package org.apache.asterix.bad.runtime;
+package org.apache.asterix.bad.runtime.operators;
 
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
@@ -31,13 +29,14 @@
 import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.bad.BADConstants;
 import org.apache.asterix.dataflow.data.nontagged.printers.json.clean.AOrderedlistPrinterFactory;
-import org.apache.asterix.dataflow.data.nontagged.printers.json.clean.ARecordPrinterFactory;
 import org.apache.asterix.dataflow.data.nontagged.serde.ADateTimeSerializerDeserializer;
 import org.apache.asterix.dataflow.data.nontagged.serde.AStringSerializerDeserializer;
 import org.apache.asterix.om.base.ADateTime;
@@ -70,45 +69,55 @@
     private static final AStringSerializerDeserializer stringSerDes =
             new AStringSerializerDeserializer(new UTF8StringWriter(), new UTF8StringReader());
 
-    private final IPrinter recordPrinterFactory;
+    private final IPrinter jsonRecordPrinter;
+    private final IPrinter admRecordPrinter;
     private final IPrinter subscriptionIdListPrinterFactory;
 
     private IPointable inputArg0 = new VoidPointable();
     private IPointable inputArg1 = new VoidPointable();
     private IPointable inputArg2 = new VoidPointable();
+    private IPointable inputArg3 = new VoidPointable();
     private IScalarEvaluator eval0;
     private IScalarEvaluator eval1;
     private IScalarEvaluator eval2;
+    private IScalarEvaluator eval3;
+
     private final EntityId entityId;
     private final boolean push;
     private final Map<String, String> sendData = new HashMap<>();
     private final Map<String, ByteArrayOutputStream> sendbaos = new HashMap<>();
     private final Map<String, PrintStream> sendStreams = new HashMap<>();
-    private String executionTimeString;
-    private boolean firstResult = true;
-    String endpoint;
+    private final Map<String, String> brokerTypes = new HashMap<>();
+    private Long executionTimeMili = (long) -1;
 
     public NotifyBrokerRuntime(IHyracksTaskContext ctx, IScalarEvaluatorFactory brokerEvalFactory,
             IScalarEvaluatorFactory pushListEvalFactory, IScalarEvaluatorFactory channelExecutionEvalFactory,
-            EntityId activeJobId, boolean push, IAType recordType) throws HyracksDataException {
+            IScalarEvaluatorFactory brokerTypeEvalFactory, EntityId activeJobId, boolean push, IAType recordType)
+            throws HyracksDataException {
         this.tRef = new FrameTupleReference();
         IEvaluatorContext evalCtx = new EvaluatorContext(ctx);
         eval0 = brokerEvalFactory.createScalarEvaluator(evalCtx);
         eval1 = pushListEvalFactory.createScalarEvaluator(evalCtx);
         eval2 = channelExecutionEvalFactory.createScalarEvaluator(evalCtx);
+        eval3 = brokerTypeEvalFactory.createScalarEvaluator(evalCtx);
+
         this.entityId = activeJobId;
         this.push = push;
         if (push) {
             //for push-based channel, the recordType is the result record type (records are sent directly)
-            recordPrinterFactory = new ARecordPrinterFactory((ARecordType) recordType).createPrinter();
+            jsonRecordPrinter =
+                    new org.apache.asterix.dataflow.data.nontagged.printers.json.clean.ARecordPrinterFactory(
+                            (ARecordType) recordType).createPrinter();
+            admRecordPrinter = new org.apache.asterix.dataflow.data.nontagged.printers.adm.ARecordPrinterFactory(
+                    (ARecordType) recordType).createPrinter();
         } else {
             //for pull-based channels, the recordType is a list of subscription ids
             //the subscriptionIdListPrinterFactory is used instead
-            recordPrinterFactory = null;
+            jsonRecordPrinter = null;
+            admRecordPrinter = null;
         }
         subscriptionIdListPrinterFactory =
                 new AOrderedlistPrinterFactory(new AOrderedListType(BuiltinType.AUUID, null)).createPrinter();
-        executionTimeString = null;
     }
 
     @Override
@@ -117,48 +126,46 @@
     }
 
     public String createData(String endpoint) {
-        String resultTitle = "\"subscriptionIds\"";
-        if (push) {
-            resultTitle = "\"results\"";
-        }
+        String resultTitle = push ? "\"results\"" : "\"subscriptionIds\"";
         String jsonStr = "{ \"dataverseName\":\"" + entityId.getDataverseName().getCanonicalForm()
-                + "\", \"channelName\":\"" + entityId.getEntityName() + "\", \"" + BADConstants.ChannelExecutionTime
-                + "\":\"" + executionTimeString + "\", " + resultTitle + ":[";
+                + "\", \"channelName\":\"" + entityId.getEntityName() + "\", \""
+                + BADConstants.CHANNEL_EXECUTION_EPOCH_TIME + "\":" + executionTimeMili + ", " + resultTitle + ":[";
         jsonStr += sendData.get(endpoint);
-        jsonStr = jsonStr.substring(0, jsonStr.length());
         jsonStr += "]}";
         return jsonStr;
-
     }
 
     private void sendGroupOfResults(String endpoint) {
         String urlParameters = createData(endpoint);
-        try {
-            //Create connection
-            URL url = new URL(endpoint);
-            HttpURLConnection connection = (HttpURLConnection) url.openConnection();
-            connection.setRequestMethod("POST");
-            connection.setRequestProperty("Content-Type", "application/x-www-form-urlencoded");
+        ExecutorService executor = Executors.newSingleThreadExecutor();
+        executor.submit(() -> {
+            try {
+                //Create connection
+                URL url = new URL(endpoint);
+                HttpURLConnection connection = (HttpURLConnection) url.openConnection();
+                connection.setRequestMethod("POST");
+                connection.setRequestProperty("Content-Type", "application/json");
 
-            connection.setRequestProperty("Content-Length", Integer.toString(urlParameters.getBytes().length));
-            connection.setRequestProperty("Content-Language", "en-US");
+                connection.setRequestProperty("Content-Length", Integer.toString(urlParameters.getBytes().length));
+                connection.setRequestProperty("Content-Language", "en-US");
 
-            connection.setUseCaches(false);
-            connection.setDoOutput(true);
-            connection.setConnectTimeout(500);
-            DataOutputStream wr = new DataOutputStream(connection.getOutputStream());
-            wr.writeBytes(urlParameters);
-            if (LOGGER.isLoggable(Level.INFO)) {
-                int responseCode = connection.getResponseCode();
-                LOGGER.info("\nSending 'POST' request to URL : " + url);
-                LOGGER.info("Post parameters : " + urlParameters);
-                LOGGER.info("Response Code : " + responseCode);
+                connection.setUseCaches(false);
+                connection.setDoOutput(true);
+                connection.setConnectTimeout(500);
+                DataOutputStream wr = new DataOutputStream(connection.getOutputStream());
+                wr.writeBytes(urlParameters);
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    int responseCode = connection.getResponseCode();
+                    LOGGER.info("\nSending 'POST' request to URL : " + url);
+                    LOGGER.info("Post parameters : " + urlParameters);
+                    LOGGER.info("Response Code : " + responseCode);
+                }
+                wr.close();
+                connection.disconnect();
+            } catch (Exception e) {
+                LOGGER.log(Level.WARNING, "Channel Failed to connect to Broker.");
             }
-            wr.close();
-            connection.disconnect();
-        } catch (Exception e) {
-            LOGGER.log(Level.WARNING, "Channel Failed to connect to Broker.");
-        }
+        });
     }
 
     @Override
@@ -171,47 +178,55 @@
             eval0.evaluate(tRef, inputArg0);
             eval1.evaluate(tRef, inputArg1);
             eval2.evaluate(tRef, inputArg2);
+            eval3.evaluate(tRef, inputArg3);
 
             /*The incoming tuples have three fields:
              1. eval0 will get the serialized broker endpoint string
              2. eval1 will get the payload (either the subscriptionIds or entire results)
              3. eval2 will get the channel execution time stamp (the same for all tuples)
             */
-            if (executionTimeString == null) {
+            if (executionTimeMili == -1) {
                 int resultSetOffset = inputArg2.getStartOffset();
                 bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), resultSetOffset + 1);
                 ADateTime executionTime = ADateTimeSerializerDeserializer.INSTANCE.deserialize(di);
-                executionTimeString = executionTime.toSimpleString();
+                executionTimeMili = executionTime.getChrononTime();
             }
 
+            // Get HTTP endpoint
             int serBrokerOffset = inputArg0.getStartOffset();
             bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), serBrokerOffset + 1);
-            endpoint = stringSerDes.deserialize(di).getStringValue();
-            sendbaos.putIfAbsent(endpoint, new ByteArrayOutputStream());
-            try {
-                sendStreams.putIfAbsent(endpoint,
-                        new PrintStream(sendbaos.get(endpoint), true, StandardCharsets.UTF_8.name()));
-            } catch (UnsupportedEncodingException e) {
-                throw new HyracksDataException(e.getMessage());
+            String endpoint = stringSerDes.deserialize(di).getStringValue();
+
+            // Get broker type
+            int serTypeOffset = inputArg3.getStartOffset();
+            bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), serTypeOffset + 1);
+            String brokerType = stringSerDes.deserialize(di).getStringValue();
+            IPrinter currPrinter =
+                    brokerType.equals(BADConstants.BAD_BROKER_TYPE_NAME) ? admRecordPrinter : jsonRecordPrinter;
+
+            PrintStream currPrintStream = sendStreams.getOrDefault(endpoint, null);
+            if (currPrintStream == null) {
+                try {
+                    ByteArrayOutputStream newOutput = new ByteArrayOutputStream();
+                    sendbaos.putIfAbsent(endpoint, newOutput);
+                    sendStreams.put(endpoint, new PrintStream(newOutput, true, StandardCharsets.UTF_8.name()));
+                } catch (UnsupportedEncodingException e) {
+                    throw new HyracksDataException(e.getMessage());
+                }
+                currPrintStream = sendStreams.get(endpoint);
+            } else {
+                currPrintStream.append(',');
             }
 
             if (push) {
                 int pushOffset = inputArg1.getStartOffset();
                 bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), pushOffset + 1);
-                if (!firstResult) {
-                    sendStreams.get(endpoint).append(',');
-                }
-                recordPrinterFactory.print(inputArg1.getByteArray(), inputArg1.getStartOffset(), inputArg1.getLength(),
-                        sendStreams.get(endpoint));
-
+                currPrinter.print(inputArg1.getByteArray(), inputArg1.getStartOffset(), inputArg1.getLength(),
+                        currPrintStream);
             } else {
-                if (!firstResult) {
-                    sendStreams.get(endpoint).append(',');
-                }
                 subscriptionIdListPrinterFactory.print(inputArg1.getByteArray(), inputArg1.getStartOffset(),
-                        inputArg1.getLength(), sendStreams.get(endpoint));
+                        inputArg1.getLength(), currPrintStream);
             }
-            firstResult = false;
         }
 
     }
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntimeFactory.java b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/operators/NotifyBrokerRuntimeFactory.java
similarity index 86%
rename from asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntimeFactory.java
rename to asterix-bad/src/main/java/org/apache/asterix/bad/runtime/operators/NotifyBrokerRuntimeFactory.java
index a7f12ba..b4c11ca 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntimeFactory.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/operators/NotifyBrokerRuntimeFactory.java
@@ -6,9 +6,7 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License. You may obtain a copy of the License at
- *
  * http://www.apache.org/licenses/LICENSE-2.0
- *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -17,7 +15,7 @@
  * under the License.
  */
 
-package org.apache.asterix.bad.runtime;
+package org.apache.asterix.bad.runtime.operators;
 
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.om.types.IAType;
@@ -34,16 +32,18 @@
     private final IScalarEvaluatorFactory brokerEvalFactory;
     private final IScalarEvaluatorFactory pushListEvalFactory;
     private final IScalarEvaluatorFactory channelExecutionEvalFactory;
+    private final IScalarEvaluatorFactory brokerTypeEvalFactory;
     private final EntityId entityId;
     private final boolean push;
     private final IAType recordType;
 
     public NotifyBrokerRuntimeFactory(IScalarEvaluatorFactory brokerEvalFactory,
             IScalarEvaluatorFactory pushListEvalFactory, IScalarEvaluatorFactory channelExecutionEvalFactory,
-            EntityId entityId, boolean push, IAType recordType) {
+            IScalarEvaluatorFactory brokerTypeEvalFactory, EntityId entityId, boolean push, IAType recordType) {
         this.brokerEvalFactory = brokerEvalFactory;
         this.pushListEvalFactory = pushListEvalFactory;
         this.channelExecutionEvalFactory = channelExecutionEvalFactory;
+        this.brokerTypeEvalFactory = brokerTypeEvalFactory;
         this.entityId = entityId;
         this.push = push;
         this.recordType = recordType;
@@ -57,6 +57,6 @@
     @Override
     public IPushRuntime[] createPushRuntime(IHyracksTaskContext ctx) throws HyracksDataException {
         return new IPushRuntime[] { new NotifyBrokerRuntime(ctx, brokerEvalFactory, pushListEvalFactory,
-                channelExecutionEvalFactory, entityId, push, recordType) };
+                channelExecutionEvalFactory, brokerTypeEvalFactory, entityId, push, recordType) };
     }
 }
diff --git a/asterix-bad/src/main/resources/META-INF/services/org.apache.asterix.om.functions.IFunctionRegistrant b/asterix-bad/src/main/resources/META-INF/services/org.apache.asterix.om.functions.IFunctionRegistrant
new file mode 100644
index 0000000..fdb3724
--- /dev/null
+++ b/asterix-bad/src/main/resources/META-INF/services/org.apache.asterix.om.functions.IFunctionRegistrant
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+org.apache.asterix.bad.function.BADFunctionRegistrant
\ No newline at end of file
diff --git a/asterix-bad/src/main/resources/asterix-build-configuration.xml b/asterix-bad/src/main/resources/asterix-build-configuration.xml
index 6007416..0f3cec9 100644
--- a/asterix-bad/src/main/resources/asterix-build-configuration.xml
+++ b/asterix-bad/src/main/resources/asterix-build-configuration.xml
@@ -36,13 +36,13 @@
   </transactionLogDir>
   <extensions>
     <extension>
-      <extensionClassName>org.apache.asterix.bad.lang.BADQueryTranslatorExtension</extensionClassName>
+      <extensionClassName>org.apache.asterix.bad.extension.BADQueryTranslatorExtension</extensionClassName>
     </extension>
     <extension>
-      <extensionClassName>org.apache.asterix.bad.lang.BADLangExtension</extensionClassName>
+      <extensionClassName>org.apache.asterix.bad.extension.BADLangExtension</extensionClassName>
     </extension>
     <extension>
-      <extensionClassName>org.apache.asterix.bad.metadata.BADMetadataExtension</extensionClassName>
+      <extensionClassName>org.apache.asterix.bad.extension.BADMetadataExtension</extensionClassName>
     </extension>
   </extensions>
   <property>
diff --git a/asterix-bad/src/main/resources/cc.conf b/asterix-bad/src/main/resources/cc.conf
index 371cbe8..9e9d26b 100644
--- a/asterix-bad/src/main/resources/cc.conf
+++ b/asterix-bad/src/main/resources/cc.conf
@@ -53,11 +53,11 @@
 messaging.frame.size=4096
 messaging.frame.count=512
 
-[extension/org.apache.asterix.bad.lang.BADQueryTranslatorExtension]
+[extension/org.apache.asterix.bad.extension.BADQueryTranslatorExtension]
 enabled = true
-[extension/org.apache.asterix.bad.lang.BADLangExtension]
+[extension/org.apache.asterix.bad.extension.BADLangExtension]
 enabled = true
-[extension/org.apache.asterix.bad.metadata.BADMetadataExtension]
+[extension/org.apache.asterix.bad.extension.BADMetadataExtension]
 enabled = true
-[extension/org.apache.asterix.bad.recovery.BADRecoveryExtension]
+[extension/org.apache.asterix.bad.extension.BADRecoveryExtension]
 enabled = true
\ No newline at end of file
diff --git a/asterix-bad/src/main/resources/lang-extension/lang.txt b/asterix-bad/src/main/resources/lang-extension/lang.txt
index 59e1a8d..f2b6410 100644
--- a/asterix-bad/src/main/resources/lang-extension/lang.txt
+++ b/asterix-bad/src/main/resources/lang-extension/lang.txt
@@ -22,7 +22,9 @@
 import org.apache.asterix.bad.lang.statement.ChannelSubscribeStatement;
 import org.apache.asterix.bad.lang.statement.ChannelUnsubscribeStatement;
 import org.apache.asterix.bad.lang.statement.CreateBrokerStatement;
-import org.apache.asterix.bad.lang.statement.CreateChannelStatement;
+import org.apache.asterix.bad.lang.statement.AbstractCreateChannelStatement;
+import org.apache.asterix.bad.lang.statement.CreateRepetitiveChannelStatement;
+import org.apache.asterix.bad.lang.statement.CreateContinuousChannelStatement;
 import org.apache.asterix.bad.lang.statement.CreateProcedureStatement;
 import org.apache.asterix.bad.lang.statement.ExecuteProcedureStatement;
 import org.apache.asterix.bad.lang.statement.ProcedureDropStatement;
@@ -30,6 +32,21 @@
 import org.apache.asterix.lang.sqlpp.parser.SqlppParseException;
 import org.apache.asterix.lang.sqlpp.parser.Token;
 
+@new
+<DEFAULT,IN_DBL_BRACE>
+TOKEN [IGNORE_CASE]:
+{
+  <BROKER : "broker">
+  |  <CHANNEL : "channel">
+  |  <PROCEDURE : "procedure">
+  |  <SUBSCRIBE : "subscribe">
+  |  <ACTIVE : "active">
+  | <UNSUBSCRIBE : "unsubscribe">
+  | <REPETITIVE : "repetitive">
+  | <CONTINUOUS : "continuous">
+  | <PERIOD : "period">
+  | <PUSH : "push">
+}
 
 @merge
 Statement SingleStatement() throws ParseException:
@@ -59,9 +76,10 @@
   (
     // merge area 2
     before:
-    after:    | stmt = CreateChannelStatement()
-              | stmt = CreateBrokerStatement()
+    after:    | stmt = CreateChannelStatement(startToken)
+              | stmt = CreateBrokerStatement(startToken)
               | stmt = CreateProcedureStatement()
+              | stmt = ActiveStatementSpecification(startToken)
   )
   {
     // merge area 3
@@ -89,29 +107,160 @@
 }
 
 @new
-CreateChannelStatement CreateChannelStatement() throws ParseException:
+Statement ActiveStatementSpecification(Token startStmtToken) throws ParseException:
+{
+    Statement stmt = null;
+}
+{
+  "ACTIVE"
+   stmt = ActiveDatasetSpecification(startStmtToken)
+  {
+      return stmt;
+  }
+}
+
+@new
+DatasetDecl ActiveDatasetSpecification(Token startStmtToken) throws ParseException:
+{
+  Pair<DataverseName,Identifier> nameComponents = null;
+  TypeExpression activeRecordTypeExpr = new TypeReferenceExpression(
+                new Pair<DataverseName,Identifier>(DataverseName.createBuiltinDataverseName("Metadata"), new Identifier("ActiveRecordType")));
+  TypeExpression datasetTypeExpr = null;
+  boolean ifNotExists = false;
+  Map<String,String> properties = null;
+  Pair<List<Integer>, List<List<String>>> primaryKeyFields = null;
+  String nodeGroupName = null;
+  Map<String,String> hints = new HashMap<String,String>();
+  DatasetDecl stmt = null;
+  boolean autogenerated = false;
+  RecordConstructor withRecord = null;
+
+}
+{
+    Dataset() nameComponents = QualifiedName()
+    datasetTypeExpr = DatasetTypeSpecification()
+    ifNotExists = IfNotExists()
+    primaryKeyFields = PrimaryKey()
+    (<AUTOGENERATED> { autogenerated = true; } )?
+    (<ON> nodeGroupName = Identifier() )?
+    ( <HINTS> hints = Properties() )?
+    ( <WITH> withRecord = RecordConstructor() )?
+      {
+        // TODO: add filters on meta records
+        InternalDetailsDecl idd = new InternalDetailsDecl(primaryKeyFields.second,
+                                                          primaryKeyFields.first,
+                                                          autogenerated,
+                                                          null);
+        try{
+        stmt = new DatasetDecl(nameComponents.first,
+                                   nameComponents.second,
+                                   datasetTypeExpr,
+                                   activeRecordTypeExpr,
+                                   nodeGroupName != null ? new Identifier(nodeGroupName) : null,
+                                   hints,
+                                   DatasetType.INTERNAL,
+                                   idd,
+                                   withRecord,
+                                   ifNotExists);
+
+        } catch (CompilationException e){
+           throw new SqlppParseException(getSourceLocation(startStmtToken), e.getMessage());
+        }
+      }
+    {
+      return addSourceLocation(stmt, startStmtToken);
+    }
+}
+
+
+@new
+AbstractCreateChannelStatement CreateChannelStatement(Token startStmtToken) throws ParseException:
+{
+  AbstractCreateChannelStatement ccs = null;
+}
+{
+  (
+    <REPETITIVE> { ccs = CreateRepetitiveChannel(startStmtToken); }
+    | <CONTINUOUS> { ccs = CreateContinuousChannel(startStmtToken); }
+  )
+  {
+    return ccs;
+  }
+}
+
+@new
+CreateRepetitiveChannelStatement CreateRepetitiveChannel(Token startStmtToken) throws ParseException:
 {
   Pair<DataverseName,Identifier> nameComponents = null;
   FunctionSignature appliedFunction = null;
-  CreateChannelStatement ccs = null;
+  CreateRepetitiveChannelStatement ccs = null;
   String fqFunctionName = null;
   Expression period = null;
   boolean push = false;
 }
 {
-  (
-    "repetitive"
-    ( "push" { push = true; } )?
-     "channel"  nameComponents = QualifiedName()
-    <USING> appliedFunction = FunctionSignature()
-    "period" period = FunctionCallExpr()
+  ( <PUSH> { push = true; } )?
+  <CHANNEL>  nameComponents = QualifiedName()
+  <USING> appliedFunction = FunctionSignature()
+  <PERIOD> period = FunctionCallExpr()
+  {
+    ccs = new CreateRepetitiveChannelStatement(nameComponents.first,
+                                 nameComponents.second, appliedFunction, period, push);
+  }
+  {
+    return ccs;
+  }
+}
+
+
+@new
+CreateContinuousChannelStatement CreateContinuousChannel(Token startStmtToken) throws ParseException:
+{
+  // Channel related
+  CreateContinuousChannelStatement ccs = null;
+  Expression period = null;
+  boolean push = false;
+  boolean ifNotExists = false;
+  // Function related
+  FunctionSignature signature;
+  String functionBody;
+  Expression functionBodyExpr;
+  Token beginPos;
+  Token endPos;
+  FunctionName fctName = null;
+  TypeExpression returnType = null;
+  List<Pair<VarIdentifier,TypeExpression>> params = null;
+  DataverseName currentDataverse = defaultDataverse;
+}
+{
+     ( <PUSH> { push = true; } )?
+     <CHANNEL>
+     fctName = FunctionName()
+     {
+        defaultDataverse = fctName.dataverse;
+     }
+     ifNotExists = IfNotExists()
+     params = FunctionParameters()
+     <PERIOD> period = FunctionCallExpr()
+    <LEFTBRACE>
+      {
+        createNewScope();
+        beginPos = token;
+      }
+      functionBodyExpr = FunctionBody()
+      returnType = FunctionReturnType()
+    <RIGHTBRACE>
     {
-      ccs = new CreateChannelStatement(nameComponents.first,
-                                   nameComponents.second, appliedFunction, period, push);
-    }
-  )
-    {
-      return ccs;
+      endPos = token;
+      functionBody = extractFragment(beginPos.beginLine, beginPos.beginColumn, endPos.beginLine, endPos.beginColumn);
+      signature = new FunctionSignature(fctName.dataverse, fctName.function, params.size());
+      getCurrentScope().addFunctionDescriptor(signature, false);
+      removeCurrentScope();
+      defaultDataverse = currentDataverse;
+      ensureNoTypeDeclsInFunction(fctName.function, params, returnType, startStmtToken);
+      CreateFunctionStatement stmt = new CreateFunctionStatement(signature, params, functionBody, functionBodyExpr, false);
+      ccs =  new CreateContinuousChannelStatement(fctName.dataverse, new Identifier(fctName.function), period, push, stmt);
+      return addSourceLocation(ccs, startStmtToken);
     }
 }
 
@@ -131,7 +280,7 @@
   createNewScope();
 }
 {
-     "procedure" fctName = FunctionName()
+     <PROCEDURE> fctName = FunctionName()
      {
         defaultDataverse = fctName.dataverse;
      }
@@ -154,7 +303,7 @@
       removeCurrentScope();
       defaultDataverse = currentDataverse;
     }
-  ("period" period = FunctionCallExpr())?
+  (<PERIOD> period = FunctionCallExpr())?
   {
   List<VarIdentifier> paramListVariablesOnly = new ArrayList<VarIdentifier>();
   for(Pair<VarIdentifier,TypeExpression> p: paramList){
@@ -195,18 +344,24 @@
 }
 
 @new
-CreateBrokerStatement CreateBrokerStatement() throws ParseException:
+CreateBrokerStatement CreateBrokerStatement(Token startStmtToken) throws ParseException:
 {
   CreateBrokerStatement cbs = null;
   Pair<DataverseName,Identifier> name = null;
   String endPoint = null;
+  RecordConstructor withRecord = null;
 }
 {
   (
-    "broker"  name = QualifiedName()
+    <BROKER>  name = QualifiedName()
     <AT>  endPoint = StringLiteral()
+    ( <WITH> withRecord = RecordConstructor() )?
     {
-      cbs = new CreateBrokerStatement(name.first, name.second,endPoint);
+      try {
+        cbs = new CreateBrokerStatement(name.first, name.second, endPoint, withRecord);
+      } catch (CompilationException e) {
+        throw new SqlppParseException(getSourceLocation(startStmtToken), e.getMessage());
+      }
     }
   )
     {
@@ -227,7 +382,7 @@
 }
 {
   (
-  "subscribe" <TO> nameComponents = QualifiedName()
+  <SUBSCRIBE> <TO> nameComponents = QualifiedName()
    <LEFTPAREN> (tmp = Expression()
    {
       argList.add(tmp);
@@ -240,7 +395,7 @@
    {
       stmt = new ChannelSubscribeStatement(nameComponents.first, nameComponents.second, argList, getVarCounter(), brokerName.first, brokerName.second, subscriptionId);
    }
-   | "unsubscribe" id = StringLiteral() <FROM> nameComponents = QualifiedName()
+   | <UNSUBSCRIBE> id = StringLiteral() <FROM> nameComponents = QualifiedName()
       {
         VariableExpr varExp = new VariableExpr(new VarIdentifier("$subscriptionPlaceholder"));
         getCurrentScope().addNewVarSymbolToScope(varExp.getVar());
@@ -274,7 +429,7 @@
   boolean ifExists = false;
 }
 {
-  "channel" pairId = QualifiedName() ifExists = IfExists()
+  <CHANNEL> pairId = QualifiedName() ifExists = IfExists()
   {
     stmt = new ChannelDropStatement(pairId.first, pairId.second, ifExists);
     return addSourceLocation(stmt, startStmtToken);
@@ -289,7 +444,7 @@
   boolean ifExists = false;
 }
 {
-  "broker" pairId = QualifiedName() ifExists = IfExists()
+  <BROKER> pairId = QualifiedName() ifExists = IfExists()
   {
     stmt = new BrokerDropStatement(pairId.first, pairId.second, ifExists);
     return addSourceLocation(stmt, startStmtToken);
@@ -304,7 +459,7 @@
   boolean ifExists = false;
 }
 {
-  "procedure" funcSig = FunctionSignature() ifExists = IfExists()
+  <PROCEDURE> funcSig = FunctionSignature() ifExists = IfExists()
   {
     stmt = new ProcedureDropStatement(funcSig, ifExists);
     return addSourceLocation(stmt, startStmtToken);
diff --git a/asterix-bad/src/main/resources/log4j2-bad.xml b/asterix-bad/src/main/resources/log4j2-bad.xml
new file mode 100644
index 0000000..fcbb6bb
--- /dev/null
+++ b/asterix-bad/src/main/resources/log4j2-bad.xml
@@ -0,0 +1,45 @@
+<!--
+ ! Licensed to the Apache Software Foundation (ASF) under one
+ ! or more contributor license agreements.  See the NOTICE file
+ ! distributed with this work for additional information
+ ! regarding copyright ownership.  The ASF licenses this file
+ ! to you under the Apache License, Version 2.0 (the
+ ! "License"); you may not use this file except in compliance
+ ! with the License.  You may obtain a copy of the License at
+ !
+ !   http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing,
+ ! software distributed under the License is distributed on an
+ ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ! KIND, either express or implied.  See the License for the
+ ! specific language governing permissions and limitations
+ ! under the License.
+ !-->
+<Configuration status="WARN">
+  <Appenders>
+    <Console name="Console" target="SYSTEM_OUT">
+      <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
+    </Console>
+    <File name="InfoLog" fileName="target/info.log">
+        <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
+    </File>
+  </Appenders>
+  <Loggers>
+    <Root level="INFO">
+      <AppenderRef ref="InfoLog"/>
+    </Root>
+    <Logger name="org.apache.hyracks.control.nc.service" level="INFO"/>
+    <Logger name="org.apache.hyracks" level="INFO"/>
+    <Logger name="org.apache.asterix" level="INFO"/>
+    <Logger name="org.apache.hyracks.algebricks" level="TRACE">
+      <AppenderRef ref="Console"/>
+    </Logger>
+    <Logger name="org.apache.hyracks.algebricks" level="TRACE">
+      <AppenderRef ref="InfoLog"/>
+    </Logger>
+    <Logger name="org.apache.asterix.bad.metadata" level="WARN">
+      <AppenderRef ref="Console"/>
+    </Logger>
+  </Loggers>
+</Configuration>
