mechanism for collection of feed statistics (checkpoint 1)
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedDashboardServlet.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedDashboardServlet.java
index 8955342..89af26a 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedDashboardServlet.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedDashboardServlet.java
@@ -21,6 +21,12 @@
 import java.io.InputStreamReader;
 import java.io.OutputStream;
 import java.io.PrintWriter;
+import java.net.Socket;
+import java.nio.CharBuffer;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
 import javax.imageio.ImageIO;
 import javax.servlet.http.HttpServlet;
@@ -28,17 +34,22 @@
 import javax.servlet.http.HttpServletResponse;
 
 import edu.uci.ics.asterix.common.exceptions.ACIDException;
+import edu.uci.ics.asterix.hyracks.bootstrap.FeedLifecycleListener;
 import edu.uci.ics.asterix.metadata.MetadataException;
 import edu.uci.ics.asterix.metadata.MetadataManager;
 import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
 import edu.uci.ics.asterix.metadata.entities.Feed;
+import edu.uci.ics.asterix.metadata.entities.FeedActivity;
+import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityType;
+import edu.uci.ics.asterix.metadata.feeds.FeedConnectionId;
+import edu.uci.ics.asterix.metadata.feeds.RemoteSocketMessageListener;
 
 public class FeedDashboardServlet extends HttpServlet {
     private static final long serialVersionUID = 1L;
 
-    private static final String HYRACKS_CONNECTION_ATTR = "edu.uci.ics.asterix.HYRACKS_CONNECTION";
+    private static final Logger LOGGER = Logger.getLogger(FeedDashboardServlet.class.getName());
 
-    private static final String HYRACKS_DATASET_ATTR = "edu.uci.ics.asterix.HYRACKS_DATASET";
+    private static final char EOL = (char) "\n".getBytes()[0];
 
     @Override
     public void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException {
@@ -86,6 +97,7 @@
             String feedName = request.getParameter("feed");
             String datasetName = request.getParameter("dataset");
             String dataverseName = request.getParameter("dataverse");
+            FeedConnectionId feedId = new FeedConnectionId(dataverseName, feedName, datasetName);
 
             String outStr = null;
             if (requestURI.startsWith("/webui/static")) {
@@ -97,7 +109,18 @@
                 Feed feed = MetadataManager.INSTANCE.getFeed(ctx, dataverseName, feedName);
                 MetadataManager.INSTANCE.commitTransaction(ctx);
 
+                FeedActivity activity = MetadataManager.INSTANCE.getRecentActivityOnFeedConnection(ctx, feedId,
+                        FeedActivityType.FEED_BEGIN, FeedActivityType.FEED_RESUME);
+                Map<String, String> activityDetails = activity.getFeedActivityDetails();
+
+                String host = activityDetails.get(FeedActivity.FeedActivityDetails.SUPER_FEED_MANAGER_HOST);
+                int port = Integer.parseInt(activityDetails
+                        .get(FeedActivity.FeedActivityDetails.SUPER_FEED_MANAGER_PORT));
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info(" Feed Report Source :" + host + "[" + port + "]");
+                }
                 outStr = String.format(sb.toString(), dataverseName, datasetName, feedName);
+                initiateSubscription(feedId, host, port);
             }
 
             PrintWriter out = response.getWriter();
@@ -107,4 +130,28 @@
         }
     }
 
+    private void initiateSubscription(FeedConnectionId feedId, String host, int port) throws IOException {
+        LinkedBlockingQueue<String> outbox = new LinkedBlockingQueue<String>();
+        Socket sc = new Socket(host, port);
+        InputStream in = sc.getInputStream();
+
+        CharBuffer buffer = CharBuffer.allocate(50);
+        char ch = 0;
+        while (ch != EOL) {
+            buffer.put(ch);
+            ch = (char) in.read();
+        }
+        buffer.flip();
+        String s = new String(buffer.array());
+        int feedSubscriptionPort = Integer.parseInt(s.trim());
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Response from Super Feed Manager Report Service " + port + " will connect at " + host + " "
+                    + port);
+        }
+
+        FeedLifecycleListener.INSTANCE.registerFeedReportQueue(feedId, outbox);
+        RemoteSocketMessageListener listener = new RemoteSocketMessageListener(host, feedSubscriptionPort, outbox);
+        listener.start();
+    }
+
 }
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedDataProviderServlet.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedDataProviderServlet.java
index 65b3883..1bec374 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedDataProviderServlet.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedDataProviderServlet.java
@@ -17,6 +17,7 @@
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.util.Random;
+import java.util.concurrent.LinkedBlockingQueue;
 
 import javax.servlet.http.HttpServlet;
 import javax.servlet.http.HttpServletRequest;
@@ -25,6 +26,9 @@
 import org.json.JSONException;
 import org.json.JSONObject;
 
+import edu.uci.ics.asterix.hyracks.bootstrap.FeedLifecycleListener;
+import edu.uci.ics.asterix.metadata.feeds.FeedConnectionId;
+
 public class FeedDataProviderServlet extends HttpServlet {
     private static final long serialVersionUID = 1L;
 
@@ -37,6 +41,8 @@
         String datasetName = request.getParameter("dataset");
         String dataverseName = request.getParameter("dataverse");
 
+        String report = getFeedReport(feedName, datasetName, dataverseName);
+        System.out.println(" RECEIVED REPORT " + report);
         JSONObject obj = new JSONObject();
         try {
             obj.put("time", System.currentTimeMillis());
@@ -49,4 +55,15 @@
         out.println(obj.toString());
     }
 
+    private String getFeedReport(String feedName, String datasetName, String dataverseName) {
+        FeedConnectionId feedId = new FeedConnectionId(dataverseName, feedName, datasetName);
+        LinkedBlockingQueue<String> queue = FeedLifecycleListener.INSTANCE.getFeedReportQueue(feedId);
+        String report = null;
+        try {
+            report = queue.take();
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        return report;
+    }
 }
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java
index 5f59bea..00bed93 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java
@@ -110,7 +110,7 @@
     private IMessageAnalyzer healthDataParser;
     private MessageListener feedHealthDataListener;
     private ExecutorService executorService = Executors.newCachedThreadPool();
-
+    private Map<FeedConnectionId, LinkedBlockingQueue<String>> feedReportQueue = new HashMap<FeedConnectionId, LinkedBlockingQueue<String>>();
     private State state;
 
     private FeedLifecycleListener() {
@@ -177,6 +177,18 @@
 
     }
 
+    public void registerFeedReportQueue(FeedConnectionId feedId, LinkedBlockingQueue<String> queue) {
+        feedReportQueue.put(feedId, queue);
+    }
+
+    public void deregisterFeedReportQueue(FeedConnectionId feedId, LinkedBlockingQueue<String> queue) {
+        feedReportQueue.remove(feedId);
+    }
+
+    public LinkedBlockingQueue<String>  getFeedReportQueue(FeedConnectionId feedId) {
+        return feedReportQueue.get(feedId);
+    }
+
     private static class Message {
         public JobId jobId;
 
@@ -228,6 +240,7 @@
         private Map<JobId, FeedInfo> registeredFeeds = new HashMap<JobId, FeedInfo>();
         private FeedMessenger feedMessenger;
         private LinkedBlockingQueue<FeedMessengerMessage> messengerOutbox;
+        private int superFeedManagerPort = 3000;
 
         public FeedJobNotificationHandler(LinkedBlockingQueue<Message> inbox) {
             this.inbox = inbox;
@@ -377,13 +390,18 @@
                     throw new IllegalStateException("Unknown node " + superFeedManagerHost);
                 }
 
+                feedActivityDetails.put(FeedActivity.FeedActivityDetails.SUPER_FEED_MANAGER_HOST, hostIp);
+                feedActivityDetails.put(FeedActivity.FeedActivityDetails.SUPER_FEED_MANAGER_PORT, ""
+                        + superFeedManagerPort);
+
                 if (LOGGER.isLoggable(Level.INFO)) {
                     LOGGER.info("Super Feed Manager for " + feedInfo.feedConnectionId + " is " + hostIp + " node "
                             + superFeedManagerHost);
                 }
 
-                FeedManagerElectMessage feedMessage = new FeedManagerElectMessage(hostIp, superFeedManagerHost, 3000,
-                        feedInfo.feedConnectionId);
+                FeedManagerElectMessage feedMessage = new FeedManagerElectMessage(hostIp, superFeedManagerHost,
+                        superFeedManagerPort, feedInfo.feedConnectionId);
+                superFeedManagerPort += SuperFeedManager.PORT_RANGE_ASSIGNED;
                 messengerOutbox.add(new FeedMessengerMessage(feedMessage, feedInfo));
                 MetadataManager.INSTANCE.acquireWriteLatch();
                 MetadataTransactionContext mdTxnCtx = null;
diff --git a/asterix-app/src/main/resources/feed/dashboard.html b/asterix-app/src/main/resources/feed/dashboard.html
index 72d996d..addf706 100644
--- a/asterix-app/src/main/resources/feed/dashboard.html
+++ b/asterix-app/src/main/resources/feed/dashboard.html
@@ -5,7 +5,9 @@
 
       var random = new TimeSeries();
       setInterval(function() {
-        random.append(new Date().getTime(), Math.random() * 10000);
+
+      $.get('/feed/data?dataverse=%s&dataset=%s&feed=%s', function(data) { 
+          feedSeries.append(data[“time”], data[“value”]);
       }, 500);
       
       function createTimeline() {
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/FeedActivity.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/FeedActivity.java
index 639d5d1..c193053 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/FeedActivity.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/FeedActivity.java
@@ -55,6 +55,8 @@
         public static final String EXCEPTION_LOCATION = "exception-location";
         public static final String EXCEPTION_MESSAGE = "exception-message";
         public static final String FEED_POLICY_NAME = "feed-policy-name";
+        public static final String SUPER_FEED_MANAGER_HOST = "super-feed-manager-host";
+        public static final String SUPER_FEED_MANAGER_PORT = "super-feed-manager-port";
         public static final String FEED_NODE_FAILURE = "feed-node-failure";
 
     }
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedRuntimeManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedRuntimeManager.java
index a8055f7..31eea44 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedRuntimeManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedRuntimeManager.java
@@ -12,6 +12,7 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -27,11 +28,13 @@
     private final ExecutorService executorService;
     private FeedMessageService messageService;
     private SocketFactory socketFactory = new SocketFactory();
+    private final LinkedBlockingQueue<String> feedReportQueue;
 
     public FeedRuntimeManager(FeedConnectionId feedId) {
         this.feedId = feedId;
         feedRuntimes = new ConcurrentHashMap<FeedRuntimeId, FeedRuntime>();
         executorService = Executors.newCachedThreadPool();
+        feedReportQueue = new LinkedBlockingQueue<String>();
     }
 
     public void close() throws IOException {
@@ -213,4 +216,8 @@
         return feedId;
     }
 
+    public LinkedBlockingQueue<String> getFeedReportQueue() {
+        return feedReportQueue;
+    }
+
 }
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/RemoteSocketMessageListener.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/RemoteSocketMessageListener.java
new file mode 100644
index 0000000..822c638
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/RemoteSocketMessageListener.java
@@ -0,0 +1,165 @@
+package edu.uci.ics.asterix.metadata.feeds;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.Socket;
+import java.nio.CharBuffer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class RemoteSocketMessageListener {
+
+    private static final Logger LOGGER = Logger.getLogger(RemoteSocketMessageListener.class.getName());
+
+    private final String host;
+    private final int port;
+    private final LinkedBlockingQueue<String> outbox;
+
+    private ExecutorService executorService = Executors.newFixedThreadPool(10);
+
+    private RemoteMessageListenerServer listenerServer;
+
+    public RemoteSocketMessageListener(String host, int port, LinkedBlockingQueue<String> outbox) {
+        this.host = host;
+        this.port = port;
+        this.outbox = outbox;
+    }
+
+    public void stop() {
+        if (!executorService.isShutdown()) {
+            executorService.shutdownNow();
+        }
+        listenerServer.stop();
+
+    }
+
+    public void start() throws IOException {
+        listenerServer = new RemoteMessageListenerServer(host, port, outbox);
+        executorService.execute(listenerServer);
+    }
+
+    private static class RemoteMessageListenerServer implements Runnable {
+
+        private final String host;
+        private final int port;
+        private final LinkedBlockingQueue<String> outbox;
+        private Socket client;
+
+        public RemoteMessageListenerServer(String host, int port, LinkedBlockingQueue<String> outbox) {
+            this.host = host;
+            this.port = port;
+            this.outbox = outbox;
+        }
+
+        public void stop() {
+            try {
+                client.close();
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+        }
+
+        @Override
+        public void run() {
+            char EOL = (char) "\n".getBytes()[0];
+            Socket client = null;
+            try {
+                client = new Socket(host, port);
+                InputStream in = client.getInputStream();
+                CharBuffer buffer = CharBuffer.allocate(5000);
+                char ch;
+                while (true) {
+                    ch = (char) in.read();
+                    if (((int) ch) == -1) {
+                        break;
+                    }
+                    while (ch != EOL) {
+                        buffer.put(ch);
+                        ch = (char) in.read();
+                    }
+                    buffer.flip();
+                    String s = new String(buffer.array());
+                    synchronized (outbox) {
+                        outbox.add(s + "\n");
+                    }
+                    buffer.position(0);
+                    buffer.limit(5000);
+                }
+
+            } catch (Exception e) {
+                if (LOGGER.isLoggable(Level.WARNING)) {
+                    LOGGER.warning("Unable to start Remote Message listener" + client);
+                }
+            } finally {
+                if (client != null && !client.isClosed()) {
+                    try {
+                        client.close();
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    }
+                }
+            }
+
+        }
+
+    }
+
+    private static class MessageParser implements Runnable {
+
+        private Socket client;
+        private IMessageAnalyzer messageAnalyzer;
+        private static final char EOL = (char) "\n".getBytes()[0];
+
+        public MessageParser(Socket client, IMessageAnalyzer messageAnalyzer) {
+            this.client = client;
+            this.messageAnalyzer = messageAnalyzer;
+        }
+
+        @Override
+        public void run() {
+            CharBuffer buffer = CharBuffer.allocate(5000);
+            char ch;
+            try {
+                InputStream in = client.getInputStream();
+                while (true) {
+                    ch = (char) in.read();
+                    if (((int) ch) == -1) {
+                        break;
+                    }
+                    while (ch != EOL) {
+                        buffer.put(ch);
+                        ch = (char) in.read();
+                    }
+                    buffer.flip();
+                    String s = new String(buffer.array());
+                    synchronized (messageAnalyzer) {
+                        messageAnalyzer.getMessageQueue().add(s + "\n");
+                    }
+                    buffer.position(0);
+                    buffer.limit(5000);
+                }
+            } catch (IOException ioe) {
+                ioe.printStackTrace();
+            } finally {
+                try {
+                    client.close();
+                } catch (IOException ioe) {
+                    // do nothing
+                }
+            }
+        }
+    }
+
+    public static interface IMessageAnalyzer {
+
+        /**
+         * @return
+         */
+        public LinkedBlockingQueue<String> getMessageQueue();
+
+    }
+
+}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/SuperFeedManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/SuperFeedManager.java
index 1a3f204..c30c23e 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/SuperFeedManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/SuperFeedManager.java
@@ -35,7 +35,9 @@
     private static final Logger LOGGER = Logger.getLogger(SuperFeedManager.class.getName());
     private String host;
 
-    private final int port;
+    private final int feedReportPort;
+
+    private final int feedReportSubscribePort;
 
     private final String nodeId;
 
@@ -43,12 +45,16 @@
 
     private boolean isLocal = false;
 
-    private SuperFeedManagerService sfmService;
+    private SuperFeedReportService sfmService;
 
-    private LinkedBlockingQueue<String> inbox;
+    private SuperFeedReportSubscriptionService subscriptionService;
+
+    private LinkedBlockingQueue<String> feedReportInbox; ///
 
     private boolean started = false;
 
+    public static final int PORT_RANGE_ASSIGNED = 10;
+
     public enum FeedReportMessageType {
         CONGESTION,
         THROUGHPUT
@@ -57,13 +63,14 @@
     public SuperFeedManager(FeedConnectionId feedId, String host, String nodeId, int port) throws Exception {
         this.feedConnectionId = feedId;
         this.nodeId = nodeId;
-        this.port = port;
+        this.feedReportPort = port;
+        this.feedReportSubscribePort = port + 1;
         this.host = host;
-        this.inbox = new LinkedBlockingQueue<String>();
+        this.feedReportInbox = new LinkedBlockingQueue<String>();
     }
 
     public int getPort() {
-        return port;
+        return feedReportPort;
     }
 
     public String getHost() throws Exception {
@@ -89,65 +96,161 @@
     public void start() throws IOException {
         if (sfmService == null) {
             ExecutorService executorService = FeedManager.INSTANCE.getFeedExecutorService(feedConnectionId);
-            sfmService = new SuperFeedManagerService(port, inbox, feedConnectionId);
+            sfmService = new SuperFeedReportService(feedReportPort, feedReportInbox, feedConnectionId);
             executorService.execute(sfmService);
+            subscriptionService = new SuperFeedReportSubscriptionService(feedConnectionId, feedReportSubscribePort,
+                    sfmService.getMesgAnalyzer());
+            executorService.execute(subscriptionService);
         }
-        System.out.println("STARTED SUPER FEED MANAGER!");
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Started super feed manager! " + this);
+        }
         started = true;
     }
 
     public void stop() throws IOException {
         sfmService.stop();
-        System.out.println("STOPPED SUPER FEED MANAGER!");
+        subscriptionService.stop();
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Stopped super feed manager! " + this);
+        }
     }
 
     @Override
     public String toString() {
-        return feedConnectionId + "[" + nodeId + "(" + host + ")" + ":" + port + "]"
+        return feedConnectionId + "[" + nodeId + "(" + host + ")" + ":" + feedReportPort + "]"
                 + (isLocal ? started ? "Started " : "Not Started" : " Remote ");
     }
 
-    public static class SuperFeedManagerMessages {
+    private static class SuperFeedReportSubscriptionService implements Runnable {
 
-        private static final String EOL = "\n";
+        private final FeedConnectionId feedId;
+        private ServerSocket serverFeedSubscribe;
+        private int subscriptionPort;
+        private boolean active = true;
+        private String EOM = "\n";
+        private final SFMessageAnalyzer reportProvider;
+        private final List<FeedDataProviderService> dataProviders = new ArrayList<FeedDataProviderService>();
 
-        public enum MessageType {
-            FEED_PORT_REQUEST,
-            FEED_PORT_RESPONSE
+        public SuperFeedReportSubscriptionService(FeedConnectionId feedId, int port, SFMessageAnalyzer reportProvider)
+                throws IOException {
+            this.feedId = feedId;
+            serverFeedSubscribe = FeedManager.INSTANCE.getFeedRuntimeManager(feedId).createServerSocket(port);
+            this.subscriptionPort = port + 1;
+            this.reportProvider = reportProvider;
         }
 
-        public static final byte[] SEND_PORT_REQUEST = (MessageType.FEED_PORT_REQUEST.name() + EOL).getBytes();
+        public void stop() {
+            active = false;
+            for (FeedDataProviderService dataProviderService : dataProviders) {
+                dataProviderService.stop();
+            }
+        }
+
+        @Override
+        public void run() {
+            while (active) {
+                try {
+                    Socket client = serverFeedSubscribe.accept();
+                    OutputStream os = client.getOutputStream();
+                    subscriptionPort++;
+                    LinkedBlockingQueue<String> reportInbox = new LinkedBlockingQueue<String>();
+                    reportProvider.registerSubsription(reportInbox);
+                    FeedDataProviderService dataProviderService = new FeedDataProviderService(feedId, subscriptionPort,
+                            reportInbox);
+                    dataProviders.add(dataProviderService);
+                    FeedManager.INSTANCE.getFeedRuntimeManager(feedId).getExecutorService()
+                            .execute(dataProviderService);
+                    if (LOGGER.isLoggable(Level.INFO)) {
+                        LOGGER.info("Recevied subscription request for feed :" + feedId
+                                + " Subscripton available at port " + subscriptionPort);
+                    }
+                    os.write((subscriptionPort + EOM).getBytes());
+                    os.flush();
+                } catch (IOException e) {
+                    e.printStackTrace();
+                }
+            }
+        }
     }
 
-    private static class SuperFeedManagerService implements Runnable {
+    private static class FeedDataProviderService implements Runnable {
+
+        private final FeedConnectionId feedId;
+        private int subscriptionPort;
+        private ServerSocket dataProviderSocket;
+        private LinkedBlockingQueue<String> inbox;
+        private boolean active = true;
+        private String EOM = "\n";
+
+        public FeedDataProviderService(FeedConnectionId feedId, int port, LinkedBlockingQueue<String> inbox)
+                throws IOException {
+            this.feedId = feedId;
+            this.subscriptionPort = port;
+            this.inbox = inbox;
+            dataProviderSocket = FeedManager.INSTANCE.getFeedRuntimeManager(feedId).createServerSocket(port);
+        }
+
+        @Override
+        public void run() {
+            try {
+                Socket client = dataProviderSocket.accept();
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("Subscriber to " + feedId + " data connected");
+                }
+                OutputStream os = client.getOutputStream();
+                while (active) {
+                    String message = inbox.take();
+                    os.write((message + EOM).getBytes());
+                }
+            } catch (IOException e) {
+                e.printStackTrace();
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+        }
+
+        public void stop() {
+            active = false;
+        }
+
+        @Override
+        public String toString() {
+            return "DATA_PROVIDER_" + feedId + "[" + subscriptionPort + "]";
+        }
+
+    }
+
+    private static class SuperFeedReportService implements Runnable {
 
         private int nextPort;
-        private ServerSocket server;
+        private final ServerSocket serverFeedReport;
+
         private String EOM = "\n";
 
         private final LinkedBlockingQueue<String> inbox;
-        private List<MessageListener> messageListeners;
-        private SFMessageAnalyzer mesgAnalyzer;
+        private final List<MessageListener> messageListeners;
+        private final SFMessageAnalyzer mesgAnalyzer;
         private final FeedConnectionId feedId;
         private boolean process = true;
 
-        public SuperFeedManagerService(int port, LinkedBlockingQueue<String> inbox, FeedConnectionId feedId)
+        public SuperFeedReportService(int port, LinkedBlockingQueue<String> inbox, FeedConnectionId feedId)
                 throws IOException {
             FeedRuntimeManager runtimeManager = FeedManager.INSTANCE.getFeedRuntimeManager(feedId);
-            server = runtimeManager.createServerSocket(port);
+            serverFeedReport = runtimeManager.createServerSocket(port);
             nextPort = port;
             this.inbox = inbox;
             this.feedId = feedId;
             this.messageListeners = new ArrayList<MessageListener>();
-            mesgAnalyzer = new SFMessageAnalyzer(inbox);
+            mesgAnalyzer = new SFMessageAnalyzer(inbox, feedId);
             FeedManager.INSTANCE.getFeedExecutorService(feedId).execute(mesgAnalyzer);
         }
 
         public void stop() {
             process = false;
-            if (server != null) {
+            if (serverFeedReport != null) {
                 try {
-                    server.close();
+                    serverFeedReport.close();
                     process = false;
                 } catch (IOException e) {
                     e.printStackTrace();
@@ -161,7 +264,7 @@
             Socket client = null;
             while (process) {
                 try {
-                    client = server.accept();
+                    client = serverFeedReport.accept();
                     OutputStream os = client.getOutputStream();
                     nextPort++;
                     MessageListener listener = new MessageListener(nextPort, inbox);
@@ -185,6 +288,10 @@
             }
         }
 
+        public SFMessageAnalyzer getMesgAnalyzer() {
+            return mesgAnalyzer;
+        }
+
     }
 
     private static class SFMessageAnalyzer implements Runnable {
@@ -192,19 +299,33 @@
         private final LinkedBlockingQueue<String> inbox;
         private final Socket socket;
         private final OutputStream os;
+        private final FeedConnectionId feedId;
         private boolean process = true;
+        private final List<LinkedBlockingQueue<String>> subscriptionQueues;
 
-        public SFMessageAnalyzer(LinkedBlockingQueue<String> inbox) throws UnknownHostException, IOException {
+        public SFMessageAnalyzer(LinkedBlockingQueue<String> inbox, FeedConnectionId feedId)
+                throws UnknownHostException, IOException {
             this.inbox = inbox;
+            this.feedId = feedId;
+            this.subscriptionQueues = new ArrayList<LinkedBlockingQueue<String>>();
             String ccHost = AsterixClusterProperties.INSTANCE.getCluster().getMasterNode().getClusterIp();
-            socket = null; //new Socket(ccHost, 2999);
-            os = null; //socket.getOutputStream();
+            FeedRuntimeManager runtimeMgr = FeedManager.INSTANCE.getFeedRuntimeManager(feedId);
+            socket = runtimeMgr.createClientSocket(ccHost, 2999, 5000);
+            os = socket.getOutputStream();
         }
 
         public void stop() {
             process = false;
         }
 
+        public void registerSubsription(LinkedBlockingQueue<String> subscriptionQueue) {
+            subscriptionQueues.add(subscriptionQueue);
+        }
+
+        public void deregisterSubsription(LinkedBlockingQueue<String> subscriptionQueue) {
+            subscriptionQueues.remove(subscriptionQueue);
+        }
+
         public void run() {
             while (process) {
                 try {
@@ -214,10 +335,13 @@
                     switch (mesgType) {
                         case THROUGHPUT:
                             //send message to FeedHealthDataReceiver at CC (2999)
-                            //          os.write(message.getBytes());
+                            os.write(message.getBytes());
                             if (LOGGER.isLoggable(Level.INFO)) {
                                 LOGGER.warning("SuperFeedManager received message " + message);
                             }
+                            for (LinkedBlockingQueue<String> q : subscriptionQueues) {
+                                q.add(message);
+                            }
                             break;
                         case CONGESTION:
                             // congestionInbox.add(report);