Add push-based channels and improve broker notifications

Change-Id: Ie3c7cae0f015d6bc01dd912499565bb12c15abc3
diff --git a/asterix-bad/src/test/resources/optimizerts/queries/channel/channel-push.sqlpp b/asterix-bad/src/test/resources/optimizerts/queries/channel/channel-push.sqlpp
new file mode 100644
index 0000000..e20638b
--- /dev/null
+++ b/asterix-bad/src/test/resources/optimizerts/queries/channel/channel-push.sqlpp
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Check a push-based channel plan
+ * Expected Res : Success
+ * Date         : Mar 2018
+ */
+
+drop dataverse channels7 if exists;
+create dataverse channels7;
+use channels7;
+
+create type UserLocation as {
+    location: circle,
+    userName: string,
+    timeStamp: datetime
+};
+
+
+create type UserLocationFeedType as {
+    location: circle,
+    userName: string
+};
+
+create type EmergencyReport as {
+    reportId: uuid,
+    Etype: string,
+    location: circle,
+    timeStamp: datetime
+};
+
+create type EmergencyReportFeedType as {
+    Etype: string,
+    location: circle
+};
+
+
+create type EmergencyShelter as {
+    shelterName: string,
+    location: point
+};
+
+create dataset UserLocations(UserLocation)
+primary key userName;
+create dataset Shelters(EmergencyShelter)
+primary key shelterName;
+create dataset Reports(EmergencyReport)
+primary key reportId autogenerated;
+
+create index location_time on UserLocations(timeStamp);
+create index u_location on UserLocations(location) type RTREE;
+create index s_location on Shelters(location) type RTREE;
+create index report_time on Reports(timeStamp);
+
+create function RecentEmergenciesNearUser(userName) {
+  (
+    select report, shelters from
+     ( select value r from Reports r where r.timeStamp >
+     current_datetime() - day_time_duration("PT10S"))report,
+    UserLocations u
+    let shelters = (select s.location from Shelters s where spatial_intersect(s.location,u.location))
+    where u.userName = userName
+    and spatial_intersect(report.location,u.location)
+  )
+};
+
+write output to nc1:"rttest/channel-push.sqlpp";
+
+create repetitive push channel EmergencyChannel using RecentEmergenciesNearUser@1 period duration("PT10S");
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/optimizerts/results/channel/channel-push.plan b/asterix-bad/src/test/resources/optimizerts/results/channel/channel-push.plan
new file mode 100644
index 0000000..770617f
--- /dev/null
+++ b/asterix-bad/src/test/resources/optimizerts/results/channel/channel-push.plan
@@ -0,0 +1,64 @@
+-- NOTIFY_BROKERS  |PARTITIONED|
+  -- STREAM_PROJECT  |PARTITIONED|
+    -- STREAM_PROJECT  |PARTITIONED|
+      -- ASSIGN  |PARTITIONED|
+        -- STREAM_PROJECT  |PARTITIONED|
+          -- STREAM_SELECT  |PARTITIONED|
+            -- STREAM_PROJECT  |PARTITIONED|
+              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                -- HYBRID_HASH_JOIN [$$128][$$135]  |PARTITIONED|
+                  -- HASH_PARTITION_EXCHANGE [$$128]  |PARTITIONED|
+                    -- STREAM_PROJECT  |PARTITIONED|
+                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                        -- HYBRID_HASH_JOIN [$$126, $$124][$$117, $$118]  |PARTITIONED|
+                          -- HASH_PARTITION_EXCHANGE [$$126, $$124]  |PARTITIONED|
+                            -- STREAM_PROJECT  |PARTITIONED|
+                              -- ASSIGN  |PARTITIONED|
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  -- DATASOURCE_SCAN  |PARTITIONED|
+                                    -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                      -- ASSIGN  |UNPARTITIONED|
+                                        -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
+                          -- HASH_PARTITION_EXCHANGE [$$117, $$118]  |PARTITIONED|
+                            -- STREAM_PROJECT  |PARTITIONED|
+                              -- ASSIGN  |PARTITIONED|
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  -- DATASOURCE_SCAN  |PARTITIONED|
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                  -- HASH_PARTITION_EXCHANGE [$$135]  |PARTITIONED|
+                    -- NESTED_LOOP  |PARTITIONED|
+                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                        -- STREAM_SELECT  |PARTITIONED|
+                          -- ASSIGN  |PARTITIONED|
+                            -- STREAM_PROJECT  |PARTITIONED|
+                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                -- DATASOURCE_SCAN  |PARTITIONED|
+                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                      -- BROADCAST_EXCHANGE  |PARTITIONED|
+                        -- PRE_CLUSTERED_GROUP_BY[$$120]  |PARTITIONED|
+                                {
+                                  -- AGGREGATE  |LOCAL|
+                                    -- STREAM_SELECT  |LOCAL|
+                                      -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                }
+                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                            -- STABLE_SORT [$$120(ASC)]  |PARTITIONED|
+                              -- HASH_PARTITION_EXCHANGE [$$120]  |PARTITIONED|
+                                -- NESTED_LOOP  |PARTITIONED|
+                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                    -- STREAM_PROJECT  |PARTITIONED|
+                                      -- ASSIGN  |PARTITIONED|
+                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                          -- DATASOURCE_SCAN  |PARTITIONED|
+                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                              -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                  -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                    -- STREAM_PROJECT  |PARTITIONED|
+                                      -- ASSIGN  |PARTITIONED|
+                                        -- STREAM_PROJECT  |PARTITIONED|
+                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                            -- DATASOURCE_SCAN  |PARTITIONED|
+                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
\ No newline at end of file