Enable Feed Changes to work with BAD project
Extracts the ActiveListener
Enables listeners to survive after job executions
Change-Id: Ib62184b67aff564475ef9b58790ff96409195b77
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1524
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java
index e4491bd..d7998f8 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java
@@ -55,14 +55,15 @@
if (entityId != null) {
IActiveEntityEventsListener listener = entityEventListeners.get(entityId);
LOGGER.log(Level.FINER, "Next event is of type " + event.getEventKind());
- LOGGER.log(Level.FINER, "Notifying the listener");
- listener.notify(event);
if (event.getEventKind() == Kind.JOB_FINISHED) {
LOGGER.log(Level.FINER, "Removing the job");
jobId2ActiveJobInfos.remove(event.getJobId());
- LOGGER.log(Level.FINER, "Removing the listener since it is not active anymore");
- entityEventListeners.remove(listener.getEntityId());
}
+ if (listener != null) {
+ LOGGER.log(Level.FINER, "Notifying the listener");
+ listener.notify(event);
+ }
+
} else {
LOGGER.log(Level.SEVERE, "Entity not found for received message for job " + event.getJobId());
}
@@ -75,6 +76,11 @@
LOGGER.log(Level.INFO, "Stopped " + ActiveJobNotificationHandler.class.getSimpleName());
}
+ public synchronized void removeListener(IActiveEntityEventsListener listener) throws HyracksDataException {
+ LOGGER.log(Level.FINER, "Removing the listener since it is not active anymore");
+ unregisterListener(listener);
+ }
+
public IActiveEntityEventsListener getActiveEntityListener(EntityId entityId) {
if (DEBUG) {
LOGGER.log(Level.WARNING, "getActiveEntityListener(EntityId entityId) was called with entity " + entityId);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ActiveEntityEventsListener.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ActiveEntityEventsListener.java
new file mode 100644
index 0000000..365c3ce
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ActiveEntityEventsListener.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.management;
+
+import java.util.List;
+
+import org.apache.asterix.active.ActivityState;
+import org.apache.asterix.active.EntityId;
+import org.apache.asterix.active.IActiveEntityEventsListener;
+import org.apache.asterix.common.metadata.IDataset;
+import org.apache.hyracks.api.job.JobId;
+
+public abstract class ActiveEntityEventsListener implements IActiveEntityEventsListener {
+
+ // members
+ protected EntityId entityId;
+ protected List<IDataset> datasets;
+ protected volatile ActivityState state;
+ protected JobId jobId;
+
+ @Override
+ public EntityId getEntityId() {
+ return entityId;
+ }
+
+ @Override
+ public ActivityState getState() {
+ return state;
+ }
+
+ @Override
+ public boolean isEntityUsingDataset(IDataset dataset) {
+ return datasets.contains(dataset);
+ }
+
+ public JobId getJobId() {
+ return jobId;
+ }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java
index 2a87cab..f49da3c 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java
@@ -25,9 +25,9 @@
import java.util.logging.Logger;
import org.apache.asterix.active.ActiveEvent;
+import org.apache.asterix.active.ActiveJobNotificationHandler;
import org.apache.asterix.active.ActivityState;
import org.apache.asterix.active.EntityId;
-import org.apache.asterix.active.IActiveEntityEventsListener;
import org.apache.asterix.active.IActiveEventSubscriber;
import org.apache.asterix.active.message.ActivePartitionMessage;
import org.apache.asterix.common.metadata.IDataset;
@@ -36,20 +36,15 @@
import org.apache.asterix.runtime.utils.AppContextInfo;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobStatus;
-public class FeedEventsListener implements IActiveEntityEventsListener {
+public class FeedEventsListener extends ActiveEntityEventsListener {
// constants
private static final Logger LOGGER = Logger.getLogger(FeedEventsListener.class.getName());
// members
- private final EntityId entityId;
- private final List<IDataset> datasets;
private final String[] sources;
private final List<IActiveEventSubscriber> subscribers;
- private volatile ActivityState state;
private int numRegistered;
- private JobId jobId;
public FeedEventsListener(EntityId entityId, List<IDataset> datasets, String[] sources) {
this.entityId = entityId;
@@ -111,6 +106,7 @@
IHyracksClientConnection hcc = AppContextInfo.INSTANCE.getHcc();
JobStatus status = hcc.getJobStatus(jobId);
state = status.equals(JobStatus.FAILURE) ? ActivityState.FAILED : ActivityState.STOPPED;
+ ActiveJobNotificationHandler.INSTANCE.removeListener(this);
}
private void start(ActiveEvent event) {
@@ -119,16 +115,6 @@
}
@Override
- public EntityId getEntityId() {
- return entityId;
- }
-
- @Override
- public ActivityState getState() {
- return state;
- }
-
- @Override
public IActiveEventSubscriber subscribe(ActivityState state) throws HyracksDataException {
if (state != ActivityState.STARTED && state != ActivityState.STOPPED) {
throw new HyracksDataException("Can only wait for STARTED or STOPPED state");
@@ -150,11 +136,6 @@
return subscriber;
}
- @Override
- public boolean isEntityUsingDataset(IDataset dataset) {
- return datasets.contains(dataset);
- }
-
public String[] getSources() {
return sources;
}