adding missing file
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedRuntimeManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedRuntimeManager.java
new file mode 100644
index 0000000..e2c3182
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedRuntimeManager.java
@@ -0,0 +1,187 @@
+package edu.uci.ics.asterix.metadata.feeds;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.metadata.feeds.FeedRuntime.FeedRuntimeId;
+
+public class FeedRuntimeManager {
+
+ private static Logger LOGGER = Logger.getLogger(FeedRuntimeManager.class.getName());
+
+ private final FeedConnectionId feedId;
+ private SuperFeedManager superFeedManager;
+ private final Map<FeedRuntimeId, FeedRuntime> feedRuntimes;
+ private final ExecutorService executorService;
+ private FeedMessageService messageService;
+ private SocketFactory socketFactory = new SocketFactory();
+
+ public FeedRuntimeManager(FeedConnectionId feedId) {
+ this.feedId = feedId;
+ feedRuntimes = new HashMap<FeedRuntimeId, FeedRuntime>();
+ executorService = Executors.newCachedThreadPool();
+ }
+
+ public void close() throws IOException {
+ socketFactory.close();
+ if (executorService != null) {
+ executorService.shutdownNow();
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Shut down executor service for :" + feedId);
+ }
+ }
+
+ if (messageService != null) {
+ messageService.stop();
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Shut down message service s for :" + feedId);
+ }
+ }
+ if (superFeedManager != null && superFeedManager.isLocal()) {
+ superFeedManager.stop();
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Shut down super feed manager for :" + feedId);
+ }
+ }
+ }
+
+ public void setSuperFeedManager(SuperFeedManager sfm) throws UnknownHostException, IOException, Exception {
+ this.superFeedManager = sfm;
+ if (sfm.isLocal()) {
+ sfm.start();
+ }
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Started Super Feed Manager for feed :" + feedId);
+ }
+ this.messageService = new FeedMessageService(feedId);
+ messageService.start();
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Started Feed Message Service for feed :" + feedId);
+ }
+ }
+
+ public SuperFeedManager getSuperFeedManager() {
+ return superFeedManager;
+ }
+
+ public FeedRuntime getFeedRuntime(FeedRuntimeId runtimeId) {
+ return feedRuntimes.get(runtimeId);
+ }
+
+ public void registerFeedRuntime(FeedRuntimeId runtimeId, FeedRuntime feedRuntime) {
+ feedRuntimes.put(runtimeId, feedRuntime);
+ }
+
+ public void deregisterFeedRuntime(FeedRuntimeId runtimeId) {
+ feedRuntimes.remove(runtimeId);
+ }
+
+ public ExecutorService getExecutorService() {
+ return executorService;
+ }
+
+ public FeedMessageService getMessageService() {
+ return messageService;
+ }
+
+ public Socket createClientSocket(String host, int port) throws UnknownHostException, IOException {
+ return socketFactory.createClientSocket(host, port);
+ }
+
+ public ServerSocket createServerSocket(int port) throws IOException {
+ return socketFactory.createServerSocket(port);
+ }
+
+ private static class SocketFactory {
+
+ private Map<SocketId, Socket> sockets = new HashMap<SocketId, Socket>();
+ private List<ServerSocket> serverSockets = new ArrayList<ServerSocket>();
+
+ public Socket createClientSocket(String host, int port) throws UnknownHostException, IOException {
+ Socket socket = new Socket(host, port);
+ sockets.put(new SocketId(host, port), socket);
+ return socket;
+ }
+
+ public void close() throws IOException {
+ for (ServerSocket socket : serverSockets) {
+ socket.close();
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Closed server socket :" + socket);
+ }
+ }
+
+ for (Entry<SocketId, Socket> entry : sockets.entrySet()) {
+ entry.getValue().close();
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Closed client socket :" + entry.getKey());
+ }
+ }
+ }
+
+ public ServerSocket createServerSocket(int port) throws IOException {
+ ServerSocket socket = new ServerSocket(port);
+ serverSockets.add(socket);
+ return socket;
+ }
+
+ private static class SocketId {
+ private final String host;
+ private final int port;
+
+ public SocketId(String host, int port) {
+ this.host = host;
+ this.port = port;
+ }
+
+ public SocketId(int port) {
+ this.host = "127.0.0.1";
+ this.port = port;
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ @Override
+ public String toString() {
+ return host + "[" + port + "]";
+ }
+
+ @Override
+ public int hashCode() {
+ return toString().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof SocketId)) {
+ return false;
+ }
+
+ return ((SocketId) o).getHost().equals(host) && ((SocketId) o).getPort() == port;
+ }
+
+ }
+ }
+
+ public FeedConnectionId getFeedId() {
+ return feedId;
+ }
+
+}