introduced feed runtime factory for managing feeds
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAdapter.java
index 4022584..f144a34 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAdapter.java
@@ -16,6 +16,7 @@
 
 import java.nio.ByteBuffer;
 import java.util.Map;
+import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import edu.uci.ics.asterix.external.dataset.adapter.IPullBasedFeedClient.InflowState;
@@ -90,7 +91,9 @@
                         tupleCount++;
                         break;
                     case NO_MORE_DATA:
-                        LOGGER.info("Reached end of feed");
+                        if (LOGGER.isLoggable(Level.INFO)) {
+                            LOGGER.info("Reached end of feed");
+                        }
                         FrameUtils.flushFrame(frame, writer);
                         continueIngestion = false;
                         break;
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AdapterRuntimeManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AdapterRuntimeManager.java
index 1112fdd..312ad46 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AdapterRuntimeManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AdapterRuntimeManager.java
@@ -72,12 +72,13 @@
     public void start() throws Exception {
         state = State.ACTIVE_INGESTION;
         FeedRuntime ingestionRuntime = new IngestionRuntime(feedId, partition, FeedRuntimeType.INGESTION, this);
-        ExecutorService executorService = FeedManager.INSTANCE.registerFeedRuntime(ingestionRuntime);
+        FeedManager.INSTANCE.registerFeedRuntime(ingestionRuntime);
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Registered feed runtime manager for " + this.getFeedId());
         }
-        getFeedExecutorService().execute(feedInboxMonitor);
-        getFeedExecutorService().execute(adapterExecutor);
+        ExecutorService executorService = FeedManager.INSTANCE.getFeedExecutorService(feedId);
+        executorService.execute(feedInboxMonitor);
+        executorService.execute(adapterExecutor);
     }
 
     @Override
@@ -198,9 +199,4 @@
         return partition;
     }
 
-    @Override
-    public ExecutorService getFeedExecutorService() {
-        return FeedManager.INSTANCE.getFeedExecutorService(feedId);
-    }
-
 }
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
index 3293d26..f36639f 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
@@ -100,8 +100,8 @@
                     LOGGER.info("Continuing on failure as per feed policy");
                 }
                 adapterRuntimeMgr.setState(State.INACTIVE_INGESTION);
-                FeedManager.INSTANCE.deregisterSuperFeedManager(feedId);
                 writer.fail();
+                FeedManager.INSTANCE.deregisterFeed(feedId);
                 /*
                  * Do not de-register feed 
                  */
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedManager.java
index a67e137..477aa30 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedManager.java
@@ -14,20 +14,14 @@
  */
 package edu.uci.ics.asterix.metadata.feeds;
 
-import java.io.IOException;
-import java.net.UnknownHostException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import edu.uci.ics.asterix.metadata.feeds.FeedRuntime.FeedRuntimeId;
 
-/**
- * Handle (de)registration of feeds for delivery of control messages.
- */
 public class FeedManager implements IFeedManager {
 
     private static final Logger LOGGER = Logger.getLogger(FeedManager.class.getName());
@@ -38,134 +32,82 @@
 
     }
 
-    private Map<FeedConnectionId, SuperFeedManager> superFeedManagers = new HashMap<FeedConnectionId, SuperFeedManager>();
-    private Map<FeedConnectionId, Map<FeedRuntimeId, FeedRuntime>> feedRuntimes = new HashMap<FeedConnectionId, Map<FeedRuntimeId, FeedRuntime>>();
-    private Map<FeedConnectionId, ExecutorService> feedExecutorService = new HashMap<FeedConnectionId, ExecutorService>();
-    private Map<FeedConnectionId, FeedMessageService> feedMessageService = new HashMap<FeedConnectionId, FeedMessageService>();
+    private Map<FeedConnectionId, FeedRuntimeManager> feedRuntimeManagers = new HashMap<FeedConnectionId, FeedRuntimeManager>();
 
     public ExecutorService getFeedExecutorService(FeedConnectionId feedId) {
-        return feedExecutorService.get(feedId);
+        FeedRuntimeManager mgr = feedRuntimeManagers.get(feedId);
+        return mgr == null ? null : mgr.getExecutorService();
     }
 
     public FeedMessageService getFeedMessageService(FeedConnectionId feedId) {
-        return feedMessageService.get(feedId);
+        FeedRuntimeManager mgr = feedRuntimeManagers.get(feedId);
+        return mgr == null ? null : mgr.getMessageService();
     }
 
     @Override
     public void deregisterFeed(FeedConnectionId feedId) {
         try {
-            Map<FeedRuntimeId, FeedRuntime> feedRuntimesForFeed = feedRuntimes.get(feedId);
-            if (feedRuntimesForFeed != null) {
-                feedRuntimesForFeed.clear();
-            }
-
-            feedRuntimes.remove(feedId);
-
-            SuperFeedManager sfm = superFeedManagers.get(feedId);
-            if (sfm != null && sfm.isLocal()) {
-                sfm.stop();
+            FeedRuntimeManager mgr = feedRuntimeManagers.get(feedId);
+            if (mgr == null) {
                 if (LOGGER.isLoggable(Level.WARNING)) {
-                    LOGGER.warning("Shutdown super feed manager " + sfm);
+                    LOGGER.warning("unknown feed id: " + feedId);
                 }
             }
-
-            ExecutorService executorService = feedExecutorService.remove(feedId);
-            if (executorService != null && !executorService.isShutdown()) {
-                executorService.shutdownNow();
-            }
-            superFeedManagers.remove(feedId);
-            feedMessageService.remove(feedId);
-
+            mgr.close();
         } catch (Exception e) {
-            e.printStackTrace();
             if (LOGGER.isLoggable(Level.WARNING)) {
-                LOGGER.warning("unable to shutdown feed services for" + feedId);
+                LOGGER.warning("Exception in closing feed runtime" + e.getMessage());
             }
+            e.printStackTrace();
         }
+
+        feedRuntimeManagers.remove(feedId);
     }
 
     @Override
-    public ExecutorService registerFeedRuntime(FeedRuntime feedRuntime) throws Exception {
+    public void registerFeedRuntime(FeedRuntime feedRuntime) throws Exception {
         FeedConnectionId feedId = feedRuntime.getFeedRuntimeId().getFeedId();
-        ExecutorService execService = feedExecutorService.get(feedId);
-        if (execService == null) {
-            execService = Executors.newCachedThreadPool();
-            feedExecutorService.put(feedId, execService);
+        FeedRuntimeManager runtimeMgr = feedRuntimeManagers.get(feedId);
+        if (runtimeMgr == null) {
+            synchronized (feedRuntimeManagers) {
+                if (runtimeMgr == null) {
+                    runtimeMgr = new FeedRuntimeManager(feedId);
+                    feedRuntimeManagers.put(feedId, runtimeMgr);
+                }
+            }
         }
 
-        Map<FeedRuntimeId, FeedRuntime> feedRuntimesForFeed = feedRuntimes.get(feedRuntime.getFeedRuntimeId()
-                .getFeedId());
-        if (feedRuntimesForFeed == null) {
-            feedRuntimesForFeed = new HashMap<FeedRuntimeId, FeedRuntime>();
-            feedRuntimes.put(feedRuntime.getFeedRuntimeId().getFeedId(), feedRuntimesForFeed);
+        runtimeMgr.registerFeedRuntime(feedRuntime.getFeedRuntimeId(), feedRuntime);
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Registered runtime " + feedRuntime + " for feed " + feedId);
         }
-        feedRuntimesForFeed.put(feedRuntime.getFeedRuntimeId(), feedRuntime);
-        System.out.println("REGISTERED feed runtime " + feedRuntime);
-        return execService;
     }
 
     @Override
     public void deRegisterFeedRuntime(FeedRuntimeId feedRuntimeId) {
-        Map<FeedRuntimeId, FeedRuntime> feedRuntimesForFeed = feedRuntimes.get(feedRuntimeId.getFeedId());
-        if (feedRuntimesForFeed != null) {
-            FeedRuntime feedRuntime = feedRuntimesForFeed.get(feedRuntimeId);
-            if (feedRuntime != null) {
-                feedRuntimesForFeed.remove(feedRuntimeId);
-                if (feedRuntimesForFeed.isEmpty()) {
-                    System.out.println("CLEARING OUT FEED RUNTIME INFO" + feedRuntimeId.getFeedId());
-                    feedRuntimes.remove(feedRuntimeId.getFeedId());
-                }
-            }
+        FeedRuntimeManager runtimeMgr = feedRuntimeManagers.get(feedRuntimeId.getFeedId());
+        if (runtimeMgr != null) {
+            runtimeMgr.deregisterFeedRuntime(feedRuntimeId);
         }
-
     }
 
     @Override
     public FeedRuntime getFeedRuntime(FeedRuntimeId feedRuntimeId) {
-        Map<FeedRuntimeId, FeedRuntime> feedRuntimesForFeed = feedRuntimes.get(feedRuntimeId.getFeedId());
-        if (feedRuntimesForFeed != null) {
-            return feedRuntimesForFeed.get(feedRuntimeId);
-        }
-        return null;
+        FeedRuntimeManager runtimeMgr = feedRuntimeManagers.get(feedRuntimeId.getFeedId());
+        return runtimeMgr != null ? runtimeMgr.getFeedRuntime(feedRuntimeId) : null;
     }
 
     @Override
     public void registerSuperFeedManager(FeedConnectionId feedId, SuperFeedManager sfm) throws Exception {
-        boolean overriden = superFeedManagers.get(feedId) != null;
-        superFeedManagers.put(feedId, sfm);
-        FeedMessageService mesgService = feedMessageService.get(feedId);
-        if (overriden && mesgService != null) {
-            mesgService.stop();
-        }
-        if (mesgService == null || overriden) {
-            mesgService = new FeedMessageService(feedId);
-            feedMessageService.put(feedId, mesgService);
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Started Feed Message Service for feed :" + feedId);
-            }
-            mesgService.start();
-        }
-    }
-
-    @Override
-    public void deregisterSuperFeedManager(FeedConnectionId feedId) {
-        SuperFeedManager sfm = superFeedManagers.remove(feedId);
-        try {
-            if (sfm.isLocal()) {
-                sfm.stop();
-            }
-            FeedMessageService fms = feedMessageService.remove(feedId);
-            if (fms != null) {
-                fms.stop();
-            }
-        } catch (IOException e) {
-            e.printStackTrace();
+        FeedRuntimeManager runtimeMgr = feedRuntimeManagers.get(feedId);
+        if (runtimeMgr != null) {
+            runtimeMgr.setSuperFeedManager(sfm);
         }
     }
 
     @Override
     public SuperFeedManager getSuperFeedManager(FeedConnectionId feedId) {
-        return superFeedManagers.get(feedId);
+        FeedRuntimeManager runtimeMgr = feedRuntimeManagers.get(feedId);
+        return runtimeMgr != null ? runtimeMgr.getSuperFeedManager() : null;
     }
-
 }
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java
index 81cbd96..f3b99c8 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java
@@ -80,27 +80,15 @@
                     synchronized (FeedManager.INSTANCE) {
                         INCApplicationContext ncCtx = ctx.getJobletContext().getApplicationContext();
                         String nodeId = ncCtx.getNodeId();
-
                         if (sfm.getNodeId().equals(nodeId)) {
-                            SuperFeedManager currentManager = FeedManager.INSTANCE.getSuperFeedManager(feedId);
-                            if (currentManager != null) {
-                                currentManager.stop();
-                                FeedManager.INSTANCE.deregisterSuperFeedManager(feedId);
-                            }
-
                             sfm.setLocal(true);
-                            sfm.start();
-                            System.out.println("STARTED SUPER FEED MANAGER !!!!!!!!!!!");
-
-                            if (LOGGER.isLoggable(Level.INFO)) {
-                                LOGGER.info("Started Super Feed Manager for " + feedId);
-                            }
                         } else {
                             Thread.sleep(5000);
                         }
                         FeedManager.INSTANCE.registerSuperFeedManager(feedId, sfm);
-                        System.out.println("REGISTERED SUPER FEED MANAGER ! + is LOCAL ?" + sfm.isLocal());
-
+                        if (LOGGER.isLoggable(Level.INFO)) {
+                            LOGGER.info("Registered super feed mgr " + sfm + " for feed " + feedId);
+                        }
                     }
                     break;
 
@@ -121,5 +109,4 @@
             writer.close();
         }
     }
-
 }
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaOperatorDescriptor.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaOperatorDescriptor.java
index f933504..6523914 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaOperatorDescriptor.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaOperatorDescriptor.java
@@ -91,7 +91,7 @@
                 feedRuntime = FeedManager.INSTANCE.getFeedRuntime(runtimeId);
                 if (feedRuntime == null) {
                     feedRuntime = new FeedRuntime(feedId, partition, runtimeType);
-                    feedExecService = FeedManager.INSTANCE.registerFeedRuntime(feedRuntime);
+                    FeedManager.INSTANCE.registerFeedRuntime(feedRuntime);
                     if (LOGGER.isLoggable(Level.WARNING)) {
                         LOGGER.warning("Did not find a saved state, starting fresh for " + runtimeType + " node.");
                     }
@@ -100,9 +100,9 @@
                     if (LOGGER.isLoggable(Level.WARNING)) {
                         LOGGER.warning("Resuming from saved state (if any) of " + runtimeType + " node.");
                     }
-                    feedExecService = FeedManager.INSTANCE.getFeedExecutorService(feedId);
                     resumeOldState = true;
                 }
+                feedExecService = FeedManager.INSTANCE.getFeedExecutorService(feedId);
                 FeedFrameWriter mWriter = new FeedFrameWriter(writer, this, feedId, policyEnforcer, nodeId,
                         runtimeType, partition, fta);
                 coreOperatorNodePushable.setOutputFrameWriter(0, mWriter, recordDesc);
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IAdapterExecutor.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IAdapterExecutor.java
index 3c151f3..0d1f955 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IAdapterExecutor.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IAdapterExecutor.java
@@ -1,15 +1,20 @@
 package edu.uci.ics.asterix.metadata.feeds;
 
-import java.util.concurrent.ExecutorService;
-
 public interface IAdapterExecutor {
 
+    /**
+     * @throws Exception
+     */
     public void start() throws Exception;
 
+    /**
+     * @throws Exception
+     */
     public void stop() throws Exception;
 
+    /**
+     * @return
+     */
     public FeedConnectionId getFeedId();
 
-    public ExecutorService getFeedExecutorService();
-
 }
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedManager.java
index 278fadd..ac33966 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedManager.java
@@ -34,7 +34,7 @@
      * @param feedRuntime
      * @throws Exception
      */
-    public ExecutorService registerFeedRuntime(FeedRuntime feedRuntime) throws Exception;
+    public void registerFeedRuntime(FeedRuntime feedRuntime) throws Exception;
 
     /**
      * @param feedRuntimeId
@@ -56,18 +56,13 @@
 
     /**
      * @param feedId
-     */
-    public void deregisterSuperFeedManager(FeedConnectionId feedId);
-
-    /**
-     * @param feedId
      * @return
      */
     public SuperFeedManager getSuperFeedManager(FeedConnectionId feedId);
 
     /**
      * @param feedId
-     * @throws IOException 
+     * @throws IOException
      */
     void deregisterFeed(FeedConnectionId feedId) throws IOException;
 
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/SuperFeedManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/SuperFeedManager.java
index fcc9026..95b0da8 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/SuperFeedManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/SuperFeedManager.java
@@ -41,14 +41,14 @@
 
     private final FeedConnectionId feedConnectionId;
 
-    // private MessageListener listener;
-
     private boolean isLocal = false;
 
     private SuperFeedManagerService sfmService;
 
     private LinkedBlockingQueue<String> inbox;
 
+    private boolean started = false;
+
     public enum FeedReportMessageType {
         CONGESTION,
         THROUGHPUT
@@ -93,6 +93,7 @@
             executorService.execute(sfmService);
         }
         System.out.println("STARTED SUPER FEED MANAGER!");
+        started = true;
     }
 
     public void stop() throws IOException {
@@ -102,7 +103,8 @@
 
     @Override
     public String toString() {
-        return feedConnectionId + "[" + nodeId + "(" + host + ")" + ":" + port + "]";
+        return feedConnectionId + "[" + nodeId + "(" + host + ")" + ":" + port + "]"
+                + (isLocal ? started ? "Started " : "Not Started" : " Remote ");
     }
 
     public static class SuperFeedManagerMessages {
@@ -365,4 +367,8 @@
         }
     }
 
+    public boolean isStarted() {
+        return started;
+    }
+
 }
\ No newline at end of file