introduced feed runtime factory for managing feeds
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAdapter.java
index 4022584..f144a34 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAdapter.java
@@ -16,6 +16,7 @@
import java.nio.ByteBuffer;
import java.util.Map;
+import java.util.logging.Level;
import java.util.logging.Logger;
import edu.uci.ics.asterix.external.dataset.adapter.IPullBasedFeedClient.InflowState;
@@ -90,7 +91,9 @@
tupleCount++;
break;
case NO_MORE_DATA:
- LOGGER.info("Reached end of feed");
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Reached end of feed");
+ }
FrameUtils.flushFrame(frame, writer);
continueIngestion = false;
break;
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AdapterRuntimeManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AdapterRuntimeManager.java
index 1112fdd..312ad46 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AdapterRuntimeManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AdapterRuntimeManager.java
@@ -72,12 +72,13 @@
public void start() throws Exception {
state = State.ACTIVE_INGESTION;
FeedRuntime ingestionRuntime = new IngestionRuntime(feedId, partition, FeedRuntimeType.INGESTION, this);
- ExecutorService executorService = FeedManager.INSTANCE.registerFeedRuntime(ingestionRuntime);
+ FeedManager.INSTANCE.registerFeedRuntime(ingestionRuntime);
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Registered feed runtime manager for " + this.getFeedId());
}
- getFeedExecutorService().execute(feedInboxMonitor);
- getFeedExecutorService().execute(adapterExecutor);
+ ExecutorService executorService = FeedManager.INSTANCE.getFeedExecutorService(feedId);
+ executorService.execute(feedInboxMonitor);
+ executorService.execute(adapterExecutor);
}
@Override
@@ -198,9 +199,4 @@
return partition;
}
- @Override
- public ExecutorService getFeedExecutorService() {
- return FeedManager.INSTANCE.getFeedExecutorService(feedId);
- }
-
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
index 3293d26..f36639f 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
@@ -100,8 +100,8 @@
LOGGER.info("Continuing on failure as per feed policy");
}
adapterRuntimeMgr.setState(State.INACTIVE_INGESTION);
- FeedManager.INSTANCE.deregisterSuperFeedManager(feedId);
writer.fail();
+ FeedManager.INSTANCE.deregisterFeed(feedId);
/*
* Do not de-register feed
*/
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedManager.java
index a67e137..477aa30 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedManager.java
@@ -14,20 +14,14 @@
*/
package edu.uci.ics.asterix.metadata.feeds;
-import java.io.IOException;
-import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.logging.Level;
import java.util.logging.Logger;
import edu.uci.ics.asterix.metadata.feeds.FeedRuntime.FeedRuntimeId;
-/**
- * Handle (de)registration of feeds for delivery of control messages.
- */
public class FeedManager implements IFeedManager {
private static final Logger LOGGER = Logger.getLogger(FeedManager.class.getName());
@@ -38,134 +32,82 @@
}
- private Map<FeedConnectionId, SuperFeedManager> superFeedManagers = new HashMap<FeedConnectionId, SuperFeedManager>();
- private Map<FeedConnectionId, Map<FeedRuntimeId, FeedRuntime>> feedRuntimes = new HashMap<FeedConnectionId, Map<FeedRuntimeId, FeedRuntime>>();
- private Map<FeedConnectionId, ExecutorService> feedExecutorService = new HashMap<FeedConnectionId, ExecutorService>();
- private Map<FeedConnectionId, FeedMessageService> feedMessageService = new HashMap<FeedConnectionId, FeedMessageService>();
+ private Map<FeedConnectionId, FeedRuntimeManager> feedRuntimeManagers = new HashMap<FeedConnectionId, FeedRuntimeManager>();
public ExecutorService getFeedExecutorService(FeedConnectionId feedId) {
- return feedExecutorService.get(feedId);
+ FeedRuntimeManager mgr = feedRuntimeManagers.get(feedId);
+ return mgr == null ? null : mgr.getExecutorService();
}
public FeedMessageService getFeedMessageService(FeedConnectionId feedId) {
- return feedMessageService.get(feedId);
+ FeedRuntimeManager mgr = feedRuntimeManagers.get(feedId);
+ return mgr == null ? null : mgr.getMessageService();
}
@Override
public void deregisterFeed(FeedConnectionId feedId) {
try {
- Map<FeedRuntimeId, FeedRuntime> feedRuntimesForFeed = feedRuntimes.get(feedId);
- if (feedRuntimesForFeed != null) {
- feedRuntimesForFeed.clear();
- }
-
- feedRuntimes.remove(feedId);
-
- SuperFeedManager sfm = superFeedManagers.get(feedId);
- if (sfm != null && sfm.isLocal()) {
- sfm.stop();
+ FeedRuntimeManager mgr = feedRuntimeManagers.get(feedId);
+ if (mgr == null) {
if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Shutdown super feed manager " + sfm);
+ LOGGER.warning("unknown feed id: " + feedId);
}
}
-
- ExecutorService executorService = feedExecutorService.remove(feedId);
- if (executorService != null && !executorService.isShutdown()) {
- executorService.shutdownNow();
- }
- superFeedManagers.remove(feedId);
- feedMessageService.remove(feedId);
-
+ mgr.close();
} catch (Exception e) {
- e.printStackTrace();
if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("unable to shutdown feed services for" + feedId);
+ LOGGER.warning("Exception in closing feed runtime" + e.getMessage());
}
+ e.printStackTrace();
}
+
+ feedRuntimeManagers.remove(feedId);
}
@Override
- public ExecutorService registerFeedRuntime(FeedRuntime feedRuntime) throws Exception {
+ public void registerFeedRuntime(FeedRuntime feedRuntime) throws Exception {
FeedConnectionId feedId = feedRuntime.getFeedRuntimeId().getFeedId();
- ExecutorService execService = feedExecutorService.get(feedId);
- if (execService == null) {
- execService = Executors.newCachedThreadPool();
- feedExecutorService.put(feedId, execService);
+ FeedRuntimeManager runtimeMgr = feedRuntimeManagers.get(feedId);
+ if (runtimeMgr == null) {
+ synchronized (feedRuntimeManagers) {
+ if (runtimeMgr == null) {
+ runtimeMgr = new FeedRuntimeManager(feedId);
+ feedRuntimeManagers.put(feedId, runtimeMgr);
+ }
+ }
}
- Map<FeedRuntimeId, FeedRuntime> feedRuntimesForFeed = feedRuntimes.get(feedRuntime.getFeedRuntimeId()
- .getFeedId());
- if (feedRuntimesForFeed == null) {
- feedRuntimesForFeed = new HashMap<FeedRuntimeId, FeedRuntime>();
- feedRuntimes.put(feedRuntime.getFeedRuntimeId().getFeedId(), feedRuntimesForFeed);
+ runtimeMgr.registerFeedRuntime(feedRuntime.getFeedRuntimeId(), feedRuntime);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Registered runtime " + feedRuntime + " for feed " + feedId);
}
- feedRuntimesForFeed.put(feedRuntime.getFeedRuntimeId(), feedRuntime);
- System.out.println("REGISTERED feed runtime " + feedRuntime);
- return execService;
}
@Override
public void deRegisterFeedRuntime(FeedRuntimeId feedRuntimeId) {
- Map<FeedRuntimeId, FeedRuntime> feedRuntimesForFeed = feedRuntimes.get(feedRuntimeId.getFeedId());
- if (feedRuntimesForFeed != null) {
- FeedRuntime feedRuntime = feedRuntimesForFeed.get(feedRuntimeId);
- if (feedRuntime != null) {
- feedRuntimesForFeed.remove(feedRuntimeId);
- if (feedRuntimesForFeed.isEmpty()) {
- System.out.println("CLEARING OUT FEED RUNTIME INFO" + feedRuntimeId.getFeedId());
- feedRuntimes.remove(feedRuntimeId.getFeedId());
- }
- }
+ FeedRuntimeManager runtimeMgr = feedRuntimeManagers.get(feedRuntimeId.getFeedId());
+ if (runtimeMgr != null) {
+ runtimeMgr.deregisterFeedRuntime(feedRuntimeId);
}
-
}
@Override
public FeedRuntime getFeedRuntime(FeedRuntimeId feedRuntimeId) {
- Map<FeedRuntimeId, FeedRuntime> feedRuntimesForFeed = feedRuntimes.get(feedRuntimeId.getFeedId());
- if (feedRuntimesForFeed != null) {
- return feedRuntimesForFeed.get(feedRuntimeId);
- }
- return null;
+ FeedRuntimeManager runtimeMgr = feedRuntimeManagers.get(feedRuntimeId.getFeedId());
+ return runtimeMgr != null ? runtimeMgr.getFeedRuntime(feedRuntimeId) : null;
}
@Override
public void registerSuperFeedManager(FeedConnectionId feedId, SuperFeedManager sfm) throws Exception {
- boolean overriden = superFeedManagers.get(feedId) != null;
- superFeedManagers.put(feedId, sfm);
- FeedMessageService mesgService = feedMessageService.get(feedId);
- if (overriden && mesgService != null) {
- mesgService.stop();
- }
- if (mesgService == null || overriden) {
- mesgService = new FeedMessageService(feedId);
- feedMessageService.put(feedId, mesgService);
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Started Feed Message Service for feed :" + feedId);
- }
- mesgService.start();
- }
- }
-
- @Override
- public void deregisterSuperFeedManager(FeedConnectionId feedId) {
- SuperFeedManager sfm = superFeedManagers.remove(feedId);
- try {
- if (sfm.isLocal()) {
- sfm.stop();
- }
- FeedMessageService fms = feedMessageService.remove(feedId);
- if (fms != null) {
- fms.stop();
- }
- } catch (IOException e) {
- e.printStackTrace();
+ FeedRuntimeManager runtimeMgr = feedRuntimeManagers.get(feedId);
+ if (runtimeMgr != null) {
+ runtimeMgr.setSuperFeedManager(sfm);
}
}
@Override
public SuperFeedManager getSuperFeedManager(FeedConnectionId feedId) {
- return superFeedManagers.get(feedId);
+ FeedRuntimeManager runtimeMgr = feedRuntimeManagers.get(feedId);
+ return runtimeMgr != null ? runtimeMgr.getSuperFeedManager() : null;
}
-
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java
index 81cbd96..f3b99c8 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java
@@ -80,27 +80,15 @@
synchronized (FeedManager.INSTANCE) {
INCApplicationContext ncCtx = ctx.getJobletContext().getApplicationContext();
String nodeId = ncCtx.getNodeId();
-
if (sfm.getNodeId().equals(nodeId)) {
- SuperFeedManager currentManager = FeedManager.INSTANCE.getSuperFeedManager(feedId);
- if (currentManager != null) {
- currentManager.stop();
- FeedManager.INSTANCE.deregisterSuperFeedManager(feedId);
- }
-
sfm.setLocal(true);
- sfm.start();
- System.out.println("STARTED SUPER FEED MANAGER !!!!!!!!!!!");
-
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Started Super Feed Manager for " + feedId);
- }
} else {
Thread.sleep(5000);
}
FeedManager.INSTANCE.registerSuperFeedManager(feedId, sfm);
- System.out.println("REGISTERED SUPER FEED MANAGER ! + is LOCAL ?" + sfm.isLocal());
-
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Registered super feed mgr " + sfm + " for feed " + feedId);
+ }
}
break;
@@ -121,5 +109,4 @@
writer.close();
}
}
-
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaOperatorDescriptor.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaOperatorDescriptor.java
index f933504..6523914 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaOperatorDescriptor.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaOperatorDescriptor.java
@@ -91,7 +91,7 @@
feedRuntime = FeedManager.INSTANCE.getFeedRuntime(runtimeId);
if (feedRuntime == null) {
feedRuntime = new FeedRuntime(feedId, partition, runtimeType);
- feedExecService = FeedManager.INSTANCE.registerFeedRuntime(feedRuntime);
+ FeedManager.INSTANCE.registerFeedRuntime(feedRuntime);
if (LOGGER.isLoggable(Level.WARNING)) {
LOGGER.warning("Did not find a saved state, starting fresh for " + runtimeType + " node.");
}
@@ -100,9 +100,9 @@
if (LOGGER.isLoggable(Level.WARNING)) {
LOGGER.warning("Resuming from saved state (if any) of " + runtimeType + " node.");
}
- feedExecService = FeedManager.INSTANCE.getFeedExecutorService(feedId);
resumeOldState = true;
}
+ feedExecService = FeedManager.INSTANCE.getFeedExecutorService(feedId);
FeedFrameWriter mWriter = new FeedFrameWriter(writer, this, feedId, policyEnforcer, nodeId,
runtimeType, partition, fta);
coreOperatorNodePushable.setOutputFrameWriter(0, mWriter, recordDesc);
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IAdapterExecutor.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IAdapterExecutor.java
index 3c151f3..0d1f955 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IAdapterExecutor.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IAdapterExecutor.java
@@ -1,15 +1,20 @@
package edu.uci.ics.asterix.metadata.feeds;
-import java.util.concurrent.ExecutorService;
-
public interface IAdapterExecutor {
+ /**
+ * @throws Exception
+ */
public void start() throws Exception;
+ /**
+ * @throws Exception
+ */
public void stop() throws Exception;
+ /**
+ * @return
+ */
public FeedConnectionId getFeedId();
- public ExecutorService getFeedExecutorService();
-
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedManager.java
index 278fadd..ac33966 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedManager.java
@@ -34,7 +34,7 @@
* @param feedRuntime
* @throws Exception
*/
- public ExecutorService registerFeedRuntime(FeedRuntime feedRuntime) throws Exception;
+ public void registerFeedRuntime(FeedRuntime feedRuntime) throws Exception;
/**
* @param feedRuntimeId
@@ -56,18 +56,13 @@
/**
* @param feedId
- */
- public void deregisterSuperFeedManager(FeedConnectionId feedId);
-
- /**
- * @param feedId
* @return
*/
public SuperFeedManager getSuperFeedManager(FeedConnectionId feedId);
/**
* @param feedId
- * @throws IOException
+ * @throws IOException
*/
void deregisterFeed(FeedConnectionId feedId) throws IOException;
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/SuperFeedManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/SuperFeedManager.java
index fcc9026..95b0da8 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/SuperFeedManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/SuperFeedManager.java
@@ -41,14 +41,14 @@
private final FeedConnectionId feedConnectionId;
- // private MessageListener listener;
-
private boolean isLocal = false;
private SuperFeedManagerService sfmService;
private LinkedBlockingQueue<String> inbox;
+ private boolean started = false;
+
public enum FeedReportMessageType {
CONGESTION,
THROUGHPUT
@@ -93,6 +93,7 @@
executorService.execute(sfmService);
}
System.out.println("STARTED SUPER FEED MANAGER!");
+ started = true;
}
public void stop() throws IOException {
@@ -102,7 +103,8 @@
@Override
public String toString() {
- return feedConnectionId + "[" + nodeId + "(" + host + ")" + ":" + port + "]";
+ return feedConnectionId + "[" + nodeId + "(" + host + ")" + ":" + port + "]"
+ + (isLocal ? started ? "Started " : "Not Started" : " Remote ");
}
public static class SuperFeedManagerMessages {
@@ -365,4 +367,8 @@
}
}
+ public boolean isStarted() {
+ return started;
+ }
+
}
\ No newline at end of file