Enable Feed Changes to work with BAD project

Extracts the ActiveListener
Enables listeners to survive after job executions

Change-Id: Ib62184b67aff564475ef9b58790ff96409195b77
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1524
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java
index e4491bd..d7998f8 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java
@@ -55,14 +55,15 @@
                 if (entityId != null) {
                     IActiveEntityEventsListener listener = entityEventListeners.get(entityId);
                     LOGGER.log(Level.FINER, "Next event is of type " + event.getEventKind());
-                    LOGGER.log(Level.FINER, "Notifying the listener");
-                    listener.notify(event);
                     if (event.getEventKind() == Kind.JOB_FINISHED) {
                         LOGGER.log(Level.FINER, "Removing the job");
                         jobId2ActiveJobInfos.remove(event.getJobId());
-                        LOGGER.log(Level.FINER, "Removing the listener since it is not active anymore");
-                        entityEventListeners.remove(listener.getEntityId());
                     }
+                    if (listener != null) {
+                        LOGGER.log(Level.FINER, "Notifying the listener");
+                        listener.notify(event);
+                    }
+
                 } else {
                     LOGGER.log(Level.SEVERE, "Entity not found for received message for job " + event.getJobId());
                 }
@@ -75,6 +76,11 @@
         LOGGER.log(Level.INFO, "Stopped " + ActiveJobNotificationHandler.class.getSimpleName());
     }
 
+    public synchronized void removeListener(IActiveEntityEventsListener listener) throws HyracksDataException {
+        LOGGER.log(Level.FINER, "Removing the listener since it is not active anymore");
+        unregisterListener(listener);
+    }
+
     public IActiveEntityEventsListener getActiveEntityListener(EntityId entityId) {
         if (DEBUG) {
             LOGGER.log(Level.WARNING, "getActiveEntityListener(EntityId entityId) was called with entity " + entityId);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ActiveEntityEventsListener.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ActiveEntityEventsListener.java
new file mode 100644
index 0000000..365c3ce
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ActiveEntityEventsListener.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.management;
+
+import java.util.List;
+
+import org.apache.asterix.active.ActivityState;
+import org.apache.asterix.active.EntityId;
+import org.apache.asterix.active.IActiveEntityEventsListener;
+import org.apache.asterix.common.metadata.IDataset;
+import org.apache.hyracks.api.job.JobId;
+
+public abstract class ActiveEntityEventsListener implements IActiveEntityEventsListener {
+
+    // members
+    protected EntityId entityId;
+    protected List<IDataset> datasets;
+    protected volatile ActivityState state;
+    protected JobId jobId;
+
+    @Override
+    public EntityId getEntityId() {
+        return entityId;
+    }
+
+    @Override
+    public ActivityState getState() {
+        return state;
+    }
+
+    @Override
+    public boolean isEntityUsingDataset(IDataset dataset) {
+        return datasets.contains(dataset);
+    }
+
+    public JobId getJobId() {
+        return jobId;
+    }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java
index 2a87cab..f49da3c 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java
@@ -25,9 +25,9 @@
 import java.util.logging.Logger;
 
 import org.apache.asterix.active.ActiveEvent;
+import org.apache.asterix.active.ActiveJobNotificationHandler;
 import org.apache.asterix.active.ActivityState;
 import org.apache.asterix.active.EntityId;
-import org.apache.asterix.active.IActiveEntityEventsListener;
 import org.apache.asterix.active.IActiveEventSubscriber;
 import org.apache.asterix.active.message.ActivePartitionMessage;
 import org.apache.asterix.common.metadata.IDataset;
@@ -36,20 +36,15 @@
 import org.apache.asterix.runtime.utils.AppContextInfo;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobStatus;
 
-public class FeedEventsListener implements IActiveEntityEventsListener {
+public class FeedEventsListener extends ActiveEntityEventsListener {
     // constants
     private static final Logger LOGGER = Logger.getLogger(FeedEventsListener.class.getName());
     // members
-    private final EntityId entityId;
-    private final List<IDataset> datasets;
     private final String[] sources;
     private final List<IActiveEventSubscriber> subscribers;
-    private volatile ActivityState state;
     private int numRegistered;
-    private JobId jobId;
 
     public FeedEventsListener(EntityId entityId, List<IDataset> datasets, String[] sources) {
         this.entityId = entityId;
@@ -111,6 +106,7 @@
         IHyracksClientConnection hcc = AppContextInfo.INSTANCE.getHcc();
         JobStatus status = hcc.getJobStatus(jobId);
         state = status.equals(JobStatus.FAILURE) ? ActivityState.FAILED : ActivityState.STOPPED;
+        ActiveJobNotificationHandler.INSTANCE.removeListener(this);
     }
 
     private void start(ActiveEvent event) {
@@ -119,16 +115,6 @@
     }
 
     @Override
-    public EntityId getEntityId() {
-        return entityId;
-    }
-
-    @Override
-    public ActivityState getState() {
-        return state;
-    }
-
-    @Override
     public IActiveEventSubscriber subscribe(ActivityState state) throws HyracksDataException {
         if (state != ActivityState.STARTED && state != ActivityState.STOPPED) {
             throw new HyracksDataException("Can only wait for STARTED or STOPPED state");
@@ -150,11 +136,6 @@
         return subscriber;
     }
 
-    @Override
-    public boolean isEntityUsingDataset(IDataset dataset) {
-        return datasets.contains(dataset);
-    }
-
     public String[] getSources() {
         return sources;
     }