1) fixed bug in super feed manager (de) initialization 2) experimneting with feed manager
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedDashboardServlet.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedDashboardServlet.java
new file mode 100644
index 0000000..d035610
--- /dev/null
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedDashboardServlet.java
@@ -0,0 +1,119 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.api.http.servlet;
+
+import java.awt.image.BufferedImage;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.PrintWriter;
+
+import javax.imageio.ImageIO;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import edu.uci.ics.asterix.common.exceptions.ACIDException;
+import edu.uci.ics.asterix.metadata.MetadataException;
+import edu.uci.ics.asterix.metadata.MetadataManager;
+import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
+import edu.uci.ics.asterix.metadata.entities.Feed;
+
+public class FeedDashboardServlet extends HttpServlet {
+    private static final long serialVersionUID = 1L;
+
+    private static final String HYRACKS_CONNECTION_ATTR = "edu.uci.ics.asterix.HYRACKS_CONNECTION";
+
+    private static final String HYRACKS_DATASET_ATTR = "edu.uci.ics.asterix.HYRACKS_DATASET";
+
+    @Override
+    public void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException {
+        String resourcePath = null;
+        String requestURI = request.getRequestURI();
+
+        if (requestURI.equals("/")) {
+            response.setContentType("text/html");
+            resourcePath = "/feed/dashboard.html";
+        } else {
+            resourcePath = requestURI + ".html";
+        }
+
+        try {
+            InputStream is = FeedDashboardServlet.class.getResourceAsStream(resourcePath);
+            if (is == null) {
+                response.sendError(HttpServletResponse.SC_NOT_FOUND);
+                return;
+            }
+
+            // Special handler for font files and .png resources
+            if (resourcePath.endsWith(".png")) {
+
+                BufferedImage img = ImageIO.read(is);
+                OutputStream outputStream = response.getOutputStream();
+                String formatName = "png";
+                response.setContentType("image/png");
+                ImageIO.write(img, formatName, outputStream);
+                outputStream.close();
+                return;
+
+            }
+
+            response.setCharacterEncoding("utf-8");
+            InputStreamReader isr = new InputStreamReader(is);
+            StringBuilder sb = new StringBuilder();
+            BufferedReader br = new BufferedReader(isr);
+            String line = br.readLine();
+
+            while (line != null) {
+                sb.append(line);
+                line = br.readLine();
+            }
+
+            String feedName = request.getParameter("feed");
+            String datasetName = request.getParameter("dataset");
+            String dataverseName = request.getParameter("dataverse");
+
+            String outStr = null;
+            if (requestURI.startsWith("/webui/static")) {
+                outStr = sb.toString();
+            } else {
+                MetadataManager.INSTANCE.init();
+                MetadataTransactionContext ctx = MetadataManager.INSTANCE.beginTransaction();
+
+                Feed feed = MetadataManager.INSTANCE.getFeed(ctx, dataverseName, feedName);
+                MetadataManager.INSTANCE.commitTransaction(ctx);
+
+                StringBuilder ldStr = new StringBuilder();
+                ldStr.append("Feed " + feed.getDataverseName() + " " + feed.getFeedName() + " " + feed.getAdaptorName());
+                ldStr.append("<br />");
+                ldStr.append("<br />");
+                ldStr.append("Graph");
+                ldStr.append("<br />");
+                ldStr.append("<br />");
+
+                outStr = String.format(sb.toString(), dataverseName, datasetName, feedName);
+
+            }
+
+            PrintWriter out = response.getWriter();
+            out.println(outStr);
+        } catch (ACIDException | MetadataException e) {
+            e.printStackTrace();
+        }
+    }
+
+}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedDataProviderServlet.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedDataProviderServlet.java
new file mode 100644
index 0000000..65b3883
--- /dev/null
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedDataProviderServlet.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.api.http.servlet;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Random;
+
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.json.JSONException;
+import org.json.JSONObject;
+
+public class FeedDataProviderServlet extends HttpServlet {
+    private static final long serialVersionUID = 1L;
+
+    private Random random = new Random();
+
+    @Override
+    public void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException {
+
+        String feedName = request.getParameter("feed");
+        String datasetName = request.getParameter("dataset");
+        String dataverseName = request.getParameter("dataverse");
+
+        JSONObject obj = new JSONObject();
+        try {
+            obj.put("time", System.currentTimeMillis());
+            obj.put("value", random.nextInt(100) + "");
+        } catch (JSONException jsoe) {
+            throw new IOException(jsoe);
+        }
+
+        PrintWriter out = response.getWriter();
+        out.println(obj.toString());
+    }
+
+}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java
index 74c2c8a..5f59bea 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java
@@ -41,8 +41,8 @@
 import edu.uci.ics.asterix.aql.expression.Identifier;
 import edu.uci.ics.asterix.aql.translator.AqlTranslator;
 import edu.uci.ics.asterix.common.exceptions.ACIDException;
-import edu.uci.ics.asterix.common.exceptions.AsterixException;
 import edu.uci.ics.asterix.event.schema.cluster.Cluster;
+import edu.uci.ics.asterix.event.schema.cluster.Node;
 import edu.uci.ics.asterix.file.JobSpecificationUtils;
 import edu.uci.ics.asterix.hyracks.bootstrap.FeedLifecycleListener.FeedFailure.FailureType;
 import edu.uci.ics.asterix.metadata.MetadataException;
@@ -73,11 +73,9 @@
 import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
 import edu.uci.ics.asterix.om.util.AsterixClusterProperties;
 import edu.uci.ics.asterix.om.util.AsterixClusterProperties.State;
-import edu.uci.ics.asterix.om.util.AsterixRuntimeUtil;
 import edu.uci.ics.asterix.runtime.formats.NonTaggedDataFormat;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
-import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
 import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
 import edu.uci.ics.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
@@ -121,7 +119,7 @@
         responseInbox = new LinkedBlockingQueue<IClusterManagementWorkResponse>();
         feedWorkRequestResponseHandler = new FeedWorkRequestResponseHandler(responseInbox);
         this.healthDataParser = new FeedHealthDataParser();
-        feedHealthDataListener = new MessageListener(FEED_HEALTH_PORT, healthDataParser);
+        feedHealthDataListener = new MessageListener(FEED_HEALTH_PORT, healthDataParser.getMessageQueue());
         try {
             feedHealthDataListener.start();
         } catch (Exception e) {
@@ -214,9 +212,11 @@
 
     private static class FeedHealthDataParser implements IMessageAnalyzer {
 
+        private LinkedBlockingQueue<String> inbox = new LinkedBlockingQueue<String>();
+
         @Override
-        public void receiveMessage(String message) {
-            System.out.println(" HEALTH DATA RECEIVED :" + message);
+        public LinkedBlockingQueue<String> getMessageQueue() {
+            return inbox;
         }
 
     }
@@ -363,11 +363,27 @@
                 int superFeedManagerIndex = new Random().nextInt(feedInfo.ingestLocations.size());
                 String superFeedManagerHost = feedInfo.ingestLocations.get(superFeedManagerIndex);
 
-                SuperFeedManager sfm = new SuperFeedManager(feedInfo.feedConnectionId, superFeedManagerHost, 3000);
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("Super Feed Manager for " + feedInfo.feedConnectionId + " is " + sfm);
+                Cluster cluster = AsterixClusterProperties.INSTANCE.getCluster();
+                String instanceName = cluster.getInstanceName();
+                String node = superFeedManagerHost.substring(instanceName.length() + 1);
+                String hostIp = null;
+                for (Node n : cluster.getNode()) {
+                    if (n.getId().equals(node)) {
+                        hostIp = n.getClusterIp();
+                        break;
+                    }
                 }
-                FeedManagerElectMessage feedMessage = new FeedManagerElectMessage(sfm);
+                if (hostIp == null) {
+                    throw new IllegalStateException("Unknown node " + superFeedManagerHost);
+                }
+
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("Super Feed Manager for " + feedInfo.feedConnectionId + " is " + hostIp + " node "
+                            + superFeedManagerHost);
+                }
+
+                FeedManagerElectMessage feedMessage = new FeedManagerElectMessage(hostIp, superFeedManagerHost, 3000,
+                        feedInfo.feedConnectionId);
                 messengerOutbox.add(new FeedMessengerMessage(feedMessage, feedInfo));
                 MetadataManager.INSTANCE.acquireWriteLatch();
                 MetadataTransactionContext mdTxnCtx = null;
@@ -745,7 +761,7 @@
 
             IOperatorDescriptor feedMessenger;
             AlgebricksPartitionConstraint messengerPc;
-            List<String> locations = new ArrayList<String>();
+            Set<String> locations = new HashSet<String>();
             locations.addAll(feedInfo.computeLocations);
             locations.addAll(feedInfo.ingestLocations);
             locations.addAll(feedInfo.storageLocations);
diff --git a/asterix-app/src/main/resources/feed/dashboard.html b/asterix-app/src/main/resources/feed/dashboard.html
new file mode 100644
index 0000000..6576f75
--- /dev/null
+++ b/asterix-app/src/main/resources/feed/dashboard.html
@@ -0,0 +1,27 @@
+<!DOCTYPE html>
+<html>
+  <head>
+    <script type="text/javascript" src="/webui/static/js/smoothie.js"></script>
+    <script type="text/javascript"></script>
+  </head>
+  <body>
+    <canvas id="chart" width="400" height="100"></canvas>
+
+    <script type="text/javascript">
+
+    // Random data
+    
+    var feedSeries = new TimeSeries();
+    setInterval(function() {
+        line1.append(new Date().getTime(), Math.random());
+        line2.append(new Date().getTime(), Math.random());
+      }, 1000);
+  
+ var smoothie = new SmoothieChart({ grid: { strokeStyle: 'rgb(125, 0, 0)', fillStyle: 'rgb(60, 0, 0)', lineWidth: 1, millisPerLine: 250, verticalSections: 6 } });
+      smoothie.addTimeSeries(line1, { strokeStyle: 'rgb(0, 255, 0)', fillStyle: 'rgba(0, 255, 0, 0.4)', lineWidth: 3 });
+      smoothie.addTimeSeries(line2, { strokeStyle: 'rgb(255, 0, 255)', fillStyle: 'rgba(255, 0, 255, 0.3)', lineWidth: 3 });
+      smoothie.streamTo(document.getElementById("mycanvas"), 1000);
+
+    </script>
+  </body>
+</html>
diff --git a/asterix-app/src/main/resources/feed/home.html b/asterix-app/src/main/resources/feed/home.html
new file mode 100644
index 0000000..6c74cad
--- /dev/null
+++ b/asterix-app/src/main/resources/feed/home.html
@@ -0,0 +1,83 @@
+<!--
+ ! Copyright 2009-2013 by The Regents of the University of California
+ ! Licensed under the Apache License, Version 2.0 (the "License");
+ ! you may not use this file except in compliance with the License.
+ ! you may obtain a copy of the License from
+ ! 
+ !     http://www.apache.org/licenses/LICENSE-2.0
+ ! 
+ ! Unless required by applicable law or agreed to in writing, software
+ ! distributed under the License is distributed on an "AS IS" BASIS,
+ ! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ! See the License for the specific language governing permissions and
+ ! limitations under the License.
+ !-->
+<!DOCTYPE html>
+<html lang="en">
+<head>
+<meta name="description" content="ASTERIX WEB PAGE" />
+<meta name="viewport" content="width=device-width, initial-scale=1.0">
+<link href='http://fonts.googleapis.com/css?family=Bitter|PT+Sans+Caption|Open+Sans' rel='stylesheet' type='text/css'>
+<script src="/webui/static/js/jquery.min.js"></script>
+
+<link href="/webui/static/css/bootstrap.min.css" rel="stylesheet" type="text/css" />
+<link href="/webui/static/css/bootstrap-responsive.min.css" rel="stylesheet" type="text/css" />
+
+<script src="/webui/static/js/bootstrap.min.js"></script>
+
+<link href="/webui/static/css/style.css" rel="stylesheet" type="text/css" />
+
+
+<meta charset=utf-8 />
+<title>AsterixDB Web Interface</title>
+</head>
+
+<body>
+  <div class="navbar navbar-fixed-top">
+    <div class="navbar-inner">
+      <div class="container">
+        <a class="btn btn-navbar" data-toggle="collapse" data-target=".nav-collapse">
+          <span class="icon-bar"></span>
+          <span class="icon-bar"></span>
+          <span class="icon-bar"></span>
+        </a>
+
+        <!-- Temporary logo placeholder -->
+        <a class="brand" href="#"><img src="/webui/static/img/finalasterixlogo.png"></a>
+
+        <div class="nav-collapse collapse">
+          <ul class="nav">
+            <li><a href="http://code.google.com/p/asterixdb/" target="_blank">
+                    Open source<img class="extarget" src="/webui/static/img/targetlink.png"/></a></li>
+            <li><a href="http://code.google.com/p/asterixdb/issues/list" target="_blank">
+                    File issues<img class="extarget" src="/webui/static/img/targetlink.png"/></a></li>
+            <li><a href="https://groups.google.com/forum/?fromgroups#!forum/asterixdb-users" target="_blank">
+                    Contact<img class="extarget" src="/webui/static/img/targetlink.png"/></a></li>
+          </ul>
+        </div><!--/.nav-collapse -->
+      </div>
+    </div>
+  </div>
+
+  <div class="content">
+    <div class="container">
+      <div class="row-fluid">
+       <div class="span12">
+       %s
+       </div>
+
+      </div>
+    </div>
+</div>
+  <div class="footer">
+    <section class="line"><hr></section>
+    <section class="content">
+      <section class="left">
+      </section>
+      <section class="right">
+        &copy; Copyright 2013 University of California, Irvine
+      </section>
+    </section>
+  </div>
+</body>
+</html>
diff --git a/asterix-app/src/main/resources/webui/static/js/smoothie.js b/asterix-app/src/main/resources/webui/static/js/smoothie.js
new file mode 100644
index 0000000..4e46fa7
--- /dev/null
+++ b/asterix-app/src/main/resources/webui/static/js/smoothie.js
@@ -0,0 +1,660 @@
+// MIT License:
+//
+// Copyright (c) 2010-2013, Joe Walnes
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+// THE SOFTWARE.
+
+/**
+ * Smoothie Charts - http://smoothiecharts.org/
+ * (c) 2010-2013, Joe Walnes
+ *     2013, Drew Noakes
+ *
+ * v1.0: Main charting library, by Joe Walnes
+ * v1.1: Auto scaling of axis, by Neil Dunn
+ * v1.2: fps (frames per second) option, by Mathias Petterson
+ * v1.3: Fix for divide by zero, by Paul Nikitochkin
+ * v1.4: Set minimum, top-scale padding, remove timeseries, add optional timer to reset bounds, by Kelley Reynolds
+ * v1.5: Set default frames per second to 50... smoother.
+ *       .start(), .stop() methods for conserving CPU, by Dmitry Vyal
+ *       options.interpolation = 'bezier' or 'line', by Dmitry Vyal
+ *       options.maxValue to fix scale, by Dmitry Vyal
+ * v1.6: minValue/maxValue will always get converted to floats, by Przemek Matylla
+ * v1.7: options.grid.fillStyle may be a transparent color, by Dmitry A. Shashkin
+ *       Smooth rescaling, by Kostas Michalopoulos
+ * v1.8: Set max length to customize number of live points in the dataset with options.maxDataSetLength, by Krishna Narni
+ * v1.9: Display timestamps along the bottom, by Nick and Stev-io
+ *       (https://groups.google.com/forum/?fromgroups#!topic/smoothie-charts/-Ywse8FCpKI%5B1-25%5D)
+ *       Refactored by Krishna Narni, to support timestamp formatting function
+ * v1.10: Switch to requestAnimationFrame, removed the now obsoleted options.fps, by Gergely Imreh
+ * v1.11: options.grid.sharpLines option added, by @drewnoakes
+ *        Addressed warning seen in Firefox when seriesOption.fillStyle undefined, by @drewnoakes
+ * v1.12: Support for horizontalLines added, by @drewnoakes
+ *        Support for yRangeFunction callback added, by @drewnoakes
+ * v1.13: Fixed typo (#32), by @alnikitich
+ * v1.14: Timer cleared when last TimeSeries removed (#23), by @davidgaleano
+ *        Fixed diagonal line on chart at start/end of data stream, by @drewnoakes
+ * v1.15: Support for npm package (#18), by @dominictarr
+ *        Fixed broken removeTimeSeries function (#24) by @davidgaleano
+ *        Minor performance and tidying, by @drewnoakes
+ * v1.16: Bug fix introduced in v1.14 relating to timer creation/clearance (#23), by @drewnoakes
+ *        TimeSeries.append now deals with out-of-order timestamps, and can merge duplicates, by @zacwitte (#12)
+ *        Documentation and some local variable renaming for clarity, by @drewnoakes
+ * v1.17: Allow control over font size (#10), by @drewnoakes
+ *        Timestamp text won't overlap, by @drewnoakes
+ * v1.18: Allow control of max/min label precision, by @drewnoakes
+ *        Added 'borderVisible' chart option, by @drewnoakes
+ *        Allow drawing series with fill but no stroke (line), by @drewnoakes
+ */
+
+;(function(exports) {
+
+  var Util = {
+    extend: function() {
+      arguments[0] = arguments[0] || {};
+      for (var i = 1; i < arguments.length; i++)
+      {
+        for (var key in arguments[i])
+        {
+          if (arguments[i].hasOwnProperty(key))
+          {
+            if (typeof(arguments[i][key]) === 'object') {
+              if (arguments[i][key] instanceof Array) {
+                arguments[0][key] = arguments[i][key];
+              } else {
+                arguments[0][key] = Util.extend(arguments[0][key], arguments[i][key]);
+              }
+            } else {
+              arguments[0][key] = arguments[i][key];
+            }
+          }
+        }
+      }
+      return arguments[0];
+    }
+  };
+
+  /**
+   * Initialises a new <code>TimeSeries</code> with optional data options.
+   *
+   * Options are of the form (defaults shown):
+   *
+   * <pre>
+   * {
+   *   resetBounds: true,        // enables/disables automatic scaling of the y-axis
+   *   resetBoundsInterval: 3000 // the period between scaling calculations, in millis
+   * }
+   * </pre>
+   *
+   * Presentation options for TimeSeries are specified as an argument to <code>SmoothieChart.addTimeSeries</code>.
+   *
+   * @constructor
+   */
+  function TimeSeries(options) {
+    this.options = Util.extend({}, TimeSeries.defaultOptions, options);
+    this.data = [];
+    this.maxValue = Number.NaN; // The maximum value ever seen in this TimeSeries.
+    this.minValue = Number.NaN; // The minimum value ever seen in this TimeSeries.
+  }
+
+  TimeSeries.defaultOptions = {
+    resetBoundsInterval: 3000,
+    resetBounds: true
+  };
+
+  /**
+   * Recalculate the min/max values for this <code>TimeSeries</code> object.
+   *
+   * This causes the graph to scale itself in the y-axis.
+   */
+  TimeSeries.prototype.resetBounds = function() {
+    if (this.data.length) {
+      // Walk through all data points, finding the min/max value
+      this.maxValue = this.data[0][1];
+      this.minValue = this.data[0][1];
+      for (var i = 1; i < this.data.length; i++) {
+        var value = this.data[i][1];
+        if (value > this.maxValue) {
+          this.maxValue = value;
+        }
+        if (value < this.minValue) {
+          this.minValue = value;
+        }
+      }
+    } else {
+      // No data exists, so set min/max to NaN
+      this.maxValue = Number.NaN;
+      this.minValue = Number.NaN;
+    }
+  };
+
+  /**
+   * Adds a new data point to the <code>TimeSeries</code>, preserving chronological order.
+   *
+   * @param timestamp the position, in time, of this data point
+   * @param value the value of this data point
+   * @param sumRepeatedTimeStampValues if <code>timestamp</code> has an exact match in the series, this flag controls
+   * whether it is replaced, or the values summed (defaults to false.)
+   */
+  TimeSeries.prototype.append = function(timestamp, value, sumRepeatedTimeStampValues) {
+    // Rewind until we hit an older timestamp
+    var i = this.data.length - 1;
+    while (i > 0 && this.data[i][0] > timestamp) {
+      i--;
+    }
+
+    if (this.data.length > 0 && this.data[i][0] === timestamp) {
+      // Update existing values in the array
+      if (sumRepeatedTimeStampValues) {
+        // Sum this value into the existing 'bucket'
+        this.data[i][1] += value;
+        value = this.data[i][1];
+      } else {
+        // Replace the previous value
+        this.data[i][1] = value;
+      }
+    } else if (i < this.data.length - 1) {
+      // Splice into the correct position to keep timestamps in order
+      this.data.splice(i + 1, 0, [timestamp, value]);
+    } else {
+      // Add to the end of the array
+      this.data.push([timestamp, value]);
+    }
+
+    this.maxValue = isNaN(this.maxValue) ? value : Math.max(this.maxValue, value);
+    this.minValue = isNaN(this.minValue) ? value : Math.min(this.minValue, value);
+  };
+
+  TimeSeries.prototype.dropOldData = function(oldestValidTime, maxDataSetLength) {
+    // We must always keep one expired data point as we need this to draw the
+    // line that comes into the chart from the left, but any points prior to that can be removed.
+    var removeCount = 0;
+    while (this.data.length - removeCount >= maxDataSetLength && this.data[removeCount + 1][0] < oldestValidTime) {
+      removeCount++;
+    }
+    if (removeCount !== 0) {
+      this.data.splice(0, removeCount);
+    }
+  };
+
+  /**
+   * Initialises a new <code>SmoothieChart</code>.
+   *
+   * Options are optional, and should be of the form below. Just specify the values you
+   * need and the rest will be given sensible defaults as shown:
+   *
+   * <pre>
+   * {
+   *   minValue: undefined,        // specify to clamp the lower y-axis to a given value
+   *   maxValue: undefined,        // specify to clamp the upper y-axis to a given value
+   *   maxValueScale: 1,           // allows proportional padding to be added above the chart. for 10% padding, specify 1.1.
+   *   yRangeFunction: undefined,  // function({min: , max: }) { return {min: , max: }; }
+   *   scaleSmoothing: 0.125,      // controls the rate at which y-value zoom animation occurs
+   *   millisPerPixel: 20,         // sets the speed at which the chart pans by
+   *   maxDataSetLength: 2,
+   *   interpolation: 'bezier'     // or 'linear'
+   *   timestampFormatter: null,   // Optional function to format time stamps for bottom of chart. You may use SmoothieChart.timeFormatter, or your own: function(date) { return ''; }
+   *   horizontalLines: [],        // [ { value: 0, color: '#ffffff', lineWidth: 1 } ],
+   *   grid:
+   *   {
+   *     fillStyle: '#000000',     // the background colour of the chart
+   *     lineWidth: 1,             // the pixel width of grid lines
+   *     strokeStyle: '#777777',   // colour of grid lines
+   *     millisPerLine: 1000,      // distance between vertical grid lines
+   *     sharpLines: false,        // controls whether grid lines are 1px sharp, or softened
+   *     verticalSections: 2,      // number of vertical sections marked out by horizontal grid lines
+   *     borderVisible: true       // whether the grid lines trace the border of the chart or not
+   *   },
+   *   labels
+   *   {
+   *     disabled: false,          // enables/disables labels showing the min/max values
+   *     fillStyle: '#ffffff',     // colour for text of labels,
+   *     fontSize: 15,
+   *     fontFamily: 'sans-serif',
+   *     precision: 2
+   *   },
+   * }
+   * </pre>
+   *
+   * @constructor
+   */
+  function SmoothieChart(options) {
+    this.options = Util.extend({}, SmoothieChart.defaultChartOptions, options);
+    this.seriesSet = [];
+    this.currentValueRange = 1;
+    this.currentVisMinValue = 0;
+  }
+
+  SmoothieChart.defaultChartOptions = {
+    millisPerPixel: 20,
+    maxValueScale: 1,
+    interpolation: 'bezier',
+    scaleSmoothing: 0.125,
+    maxDataSetLength: 2,
+    grid: {
+      fillStyle: '#000000',
+      strokeStyle: '#777777',
+      lineWidth: 1,
+      sharpLines: false,
+      millisPerLine: 1000,
+      verticalSections: 2,
+      borderVisible: true
+    },
+    labels: {
+      fillStyle: '#ffffff',
+      disabled: false,
+      fontSize: 10,
+      fontFamily: 'monospace',
+      precision: 2
+    },
+    horizontalLines: []
+  };
+
+  // Based on http://inspirit.github.com/jsfeat/js/compatibility.js
+  SmoothieChart.AnimateCompatibility = (function() {
+    // TODO this global variable will cause bugs if more than one chart is used and the browser does not support *requestAnimationFrame natively
+    var lastTime = 0,
+        requestAnimationFrame = function(callback, element) {
+          var requestAnimationFrame =
+            window.requestAnimationFrame        ||
+            window.webkitRequestAnimationFrame  ||
+            window.mozRequestAnimationFrame     ||
+            window.oRequestAnimationFrame       ||
+            window.msRequestAnimationFrame      ||
+            function(callback) {
+              var currTime = new Date().getTime(),
+                  timeToCall = Math.max(0, 16 - (currTime - lastTime)),
+                  id = window.setTimeout(function() {
+                    callback(currTime + timeToCall);
+                  }, timeToCall);
+              lastTime = currTime + timeToCall;
+              return id;
+            };
+          return requestAnimationFrame.call(window, callback, element);
+        },
+        cancelAnimationFrame = function(id) {
+          var cancelAnimationFrame =
+            window.cancelAnimationFrame ||
+            function(id) {
+              clearTimeout(id);
+            };
+          return cancelAnimationFrame.call(window, id);
+        };
+
+    return {
+      requestAnimationFrame: requestAnimationFrame,
+      cancelAnimationFrame: cancelAnimationFrame
+    };
+  })();
+
+  SmoothieChart.defaultSeriesPresentationOptions = {
+    lineWidth: 1,
+    strokeStyle: '#ffffff'
+  };
+
+  /**
+   * Adds a <code>TimeSeries</code> to this chart, with optional presentation options.
+   *
+   * Presentation options should be of the form (defaults shown):
+   *
+   * <pre>
+   * {
+   *   lineWidth: 1,
+   *   strokeStyle: '#ffffff',
+   *   fillStyle: undefined
+   * }
+   * </pre>
+   */
+  SmoothieChart.prototype.addTimeSeries = function(timeSeries, options) {
+    this.seriesSet.push({timeSeries: timeSeries, options: Util.extend({}, SmoothieChart.defaultSeriesPresentationOptions, options)});
+    if (timeSeries.options.resetBounds && timeSeries.options.resetBoundsInterval > 0) {
+      timeSeries.resetBoundsTimerId = setInterval(
+        function() {
+          timeSeries.resetBounds();
+        },
+        timeSeries.options.resetBoundsInterval
+      );
+    }
+  };
+
+  /**
+   * Removes the specified <code>TimeSeries</code> from the chart.
+   */
+  SmoothieChart.prototype.removeTimeSeries = function(timeSeries) {
+    // Find the correct timeseries to remove, and remove it
+    var numSeries = this.seriesSet.length;
+    for (var i = 0; i < numSeries; i++) {
+      if (this.seriesSet[i].timeSeries === timeSeries) {
+        this.seriesSet.splice(i, 1);
+        break;
+      }
+    }
+    // If a timer was operating for that timeseries, remove it
+    if (timeSeries.resetBoundsTimerId) {
+      // Stop resetting the bounds, if we were
+      clearInterval(timeSeries.resetBoundsTimerId);
+    }
+  };
+
+  /**
+   * Instructs the <code>SmoothieChart</code> to start rendering to the provided canvas, with specified delay.
+   *
+   * @param canvas the target canvas element
+   * @param delayMillis an amount of time to wait before a data point is shown. This can prevent the end of the series
+   * from appearing on screen, with new values flashing into view, at the expense of some latency.
+   */
+  SmoothieChart.prototype.streamTo = function(canvas, delayMillis) {
+    this.canvas = canvas;
+    this.delay = delayMillis;
+    this.start();
+  };
+
+  /**
+   * Starts the animation of this chart.
+   */
+  SmoothieChart.prototype.start = function() {
+    if (this.frame) {
+      // We're already running, so just return
+      return;
+    }
+
+    // Renders a frame, and queues the next frame for later rendering
+    var animate = function() {
+      this.frame = SmoothieChart.AnimateCompatibility.requestAnimationFrame(function() {
+        this.render();
+        animate();
+      }.bind(this));
+    }.bind(this);
+
+    animate();
+  };
+
+  /**
+   * Stops the animation of this chart.
+   */
+  SmoothieChart.prototype.stop = function() {
+    if (this.frame) {
+      SmoothieChart.AnimateCompatibility.cancelAnimationFrame(this.frame);
+      delete this.frame;
+    }
+  };
+
+  SmoothieChart.prototype.updateValueRange = function() {
+    // Calculate the current scale of the chart, from all time series.
+    var chartOptions = this.options,
+        chartMaxValue = Number.NaN,
+        chartMinValue = Number.NaN;
+
+    for (var d = 0; d < this.seriesSet.length; d++) {
+      // TODO(ndunn): We could calculate / track these values as they stream in.
+      var timeSeries = this.seriesSet[d].timeSeries;
+      if (!isNaN(timeSeries.maxValue)) {
+        chartMaxValue = !isNaN(chartMaxValue) ? Math.max(chartMaxValue, timeSeries.maxValue) : timeSeries.maxValue;
+      }
+
+      if (!isNaN(timeSeries.minValue)) {
+        chartMinValue = !isNaN(chartMinValue) ? Math.min(chartMinValue, timeSeries.minValue) : timeSeries.minValue;
+      }
+    }
+
+    // Scale the chartMaxValue to add padding at the top if required
+    if (chartOptions.maxValue != null) {
+      chartMaxValue = chartOptions.maxValue;
+    } else {
+      chartMaxValue *= chartOptions.maxValueScale;
+    }
+
+    // Set the minimum if we've specified one
+    if (chartOptions.minValue != null) {
+      chartMinValue = chartOptions.minValue;
+    }
+
+    // If a custom range function is set, call it
+    if (this.options.yRangeFunction) {
+      var range = this.options.yRangeFunction({min: chartMinValue, max: chartMaxValue});
+      chartMinValue = range.min;
+      chartMaxValue = range.max;
+    }
+
+    if (!isNaN(chartMaxValue) && !isNaN(chartMinValue)) {
+      var targetValueRange = chartMaxValue - chartMinValue;
+      this.currentValueRange += chartOptions.scaleSmoothing * (targetValueRange - this.currentValueRange);
+      this.currentVisMinValue += chartOptions.scaleSmoothing * (chartMinValue - this.currentVisMinValue);
+    }
+
+    this.valueRange = { min: chartMinValue, max: chartMaxValue };
+  };
+
+  SmoothieChart.prototype.render = function(canvas, time) {
+    canvas = canvas || this.canvas;
+    time = time || new Date().getTime() - (this.delay || 0);
+
+    // TODO only render if the chart has moved at least 1px since the last rendered frame
+
+    // Round time down to pixel granularity, so motion appears smoother.
+    time -= time % this.options.millisPerPixel;
+
+    var context = canvas.getContext('2d'),
+        chartOptions = this.options,
+        dimensions = { top: 0, left: 0, width: canvas.clientWidth, height: canvas.clientHeight },
+        // Calculate the threshold time for the oldest data points.
+        oldestValidTime = time - (dimensions.width * chartOptions.millisPerPixel),
+        valueToYPixel = function(value) {
+          var offset = value - this.currentVisMinValue;
+          return this.currentValueRange === 0
+            ? dimensions.height
+            : dimensions.height - (Math.round((offset / this.currentValueRange) * dimensions.height));
+        }.bind(this),
+        timeToXPixel = function(t) {
+          return Math.round(dimensions.width - ((time - t) / chartOptions.millisPerPixel));
+        };
+
+    this.updateValueRange();
+
+    context.font = chartOptions.labels.fontSize + 'px ' + chartOptions.labels.fontFamily;
+
+    // Save the state of the canvas context, any transformations applied in this method
+    // will get removed from the stack at the end of this method when .restore() is called.
+    context.save();
+
+    // Move the origin.
+    context.translate(dimensions.left, dimensions.top);
+
+    // Create a clipped rectangle - anything we draw will be constrained to this rectangle.
+    // This prevents the occasional pixels from curves near the edges overrunning and creating
+    // screen cheese (that phrase should need no explanation).
+    context.beginPath();
+    context.rect(0, 0, dimensions.width, dimensions.height);
+    context.clip();
+
+    // Clear the working area.
+    context.save();
+    context.fillStyle = chartOptions.grid.fillStyle;
+    context.clearRect(0, 0, dimensions.width, dimensions.height);
+    context.fillRect(0, 0, dimensions.width, dimensions.height);
+    context.restore();
+
+    // Grid lines...
+    context.save();
+    context.lineWidth = chartOptions.grid.lineWidth;
+    context.strokeStyle = chartOptions.grid.strokeStyle;
+    // Vertical (time) dividers.
+    if (chartOptions.grid.millisPerLine > 0) {
+      var textUntilX = dimensions.width - context.measureText(minValueString).width + 4;
+      for (var t = time - (time % chartOptions.grid.millisPerLine);
+           t >= oldestValidTime;
+           t -= chartOptions.grid.millisPerLine) {
+        var gx = timeToXPixel(t);
+        if (chartOptions.grid.sharpLines) {
+          gx -= 0.5;
+        }
+        context.beginPath();
+        context.moveTo(gx, 0);
+        context.lineTo(gx, dimensions.height);
+        context.stroke();
+        context.closePath();
+
+        // Display timestamp at bottom of this line if requested, and it won't overlap
+        if (chartOptions.timestampFormatter && gx < textUntilX) {
+          // Formats the timestamp based on user specified formatting function
+          // SmoothieChart.timeFormatter function above is one such formatting option
+          var tx = new Date(t),
+            ts = chartOptions.timestampFormatter(tx),
+            tsWidth = context.measureText(ts).width;
+          textUntilX = gx - tsWidth - 2;
+          context.fillStyle = chartOptions.labels.fillStyle;
+          context.fillText(ts, gx - tsWidth, dimensions.height - 2);
+        }
+      }
+    }
+
+    // Horizontal (value) dividers.
+    for (var v = 1; v < chartOptions.grid.verticalSections; v++) {
+      var gy = Math.round(v * dimensions.height / chartOptions.grid.verticalSections);
+      if (chartOptions.grid.sharpLines) {
+        gy -= 0.5;
+      }
+      context.beginPath();
+      context.moveTo(0, gy);
+      context.lineTo(dimensions.width, gy);
+      context.stroke();
+      context.closePath();
+    }
+    // Bounding rectangle.
+    if (chartOptions.grid.borderVisible) {
+      context.beginPath();
+      context.strokeRect(0, 0, dimensions.width, dimensions.height);
+      context.closePath();
+    }
+    context.restore();
+
+    // Draw any horizontal lines...
+    if (chartOptions.horizontalLines && chartOptions.horizontalLines.length) {
+      for (var hl = 0; hl < chartOptions.horizontalLines.length; hl++) {
+        var line = chartOptions.horizontalLines[hl],
+            hly = Math.round(valueToYPixel(line.value)) - 0.5;
+        context.strokeStyle = line.color || '#ffffff';
+        context.lineWidth = line.lineWidth || 1;
+        context.beginPath();
+        context.moveTo(0, hly);
+        context.lineTo(dimensions.width, hly);
+        context.stroke();
+        context.closePath();
+      }
+    }
+
+    // For each data set...
+    for (var d = 0; d < this.seriesSet.length; d++) {
+      context.save();
+      var timeSeries = this.seriesSet[d].timeSeries,
+          dataSet = timeSeries.data,
+          seriesOptions = this.seriesSet[d].options;
+
+      // Delete old data that's moved off the left of the chart.
+      timeSeries.dropOldData(oldestValidTime, chartOptions.maxDataSetLength);
+
+      // Set style for this dataSet.
+      context.lineWidth = seriesOptions.lineWidth;
+      context.strokeStyle = seriesOptions.strokeStyle;
+      // Draw the line...
+      context.beginPath();
+      // Retain lastX, lastY for calculating the control points of bezier curves.
+      var firstX = 0, lastX = 0, lastY = 0;
+      for (var i = 0; i < dataSet.length && dataSet.length !== 1; i++) {
+        var x = timeToXPixel(dataSet[i][0]),
+            y = valueToYPixel(dataSet[i][1]);
+
+        if (i === 0) {
+          firstX = x;
+          context.moveTo(x, y);
+        } else {
+          switch (chartOptions.interpolation) {
+            case "linear":
+            case "line": {
+              context.lineTo(x,y);
+              break;
+            }
+            case "bezier":
+            default: {
+              // Great explanation of Bezier curves: http://en.wikipedia.org/wiki/Bezier_curve#Quadratic_curves
+              //
+              // Assuming A was the last point in the line plotted and B is the new point,
+              // we draw a curve with control points P and Q as below.
+              //
+              // A---P
+              //     |
+              //     |
+              //     |
+              //     Q---B
+              //
+              // Importantly, A and P are at the same y coordinate, as are B and Q. This is
+              // so adjacent curves appear to flow as one.
+              //
+              context.bezierCurveTo( // startPoint (A) is implicit from last iteration of loop
+                Math.round((lastX + x) / 2), lastY, // controlPoint1 (P)
+                Math.round((lastX + x)) / 2, y, // controlPoint2 (Q)
+                x, y); // endPoint (B)
+              break;
+            }
+          }
+        }
+
+        lastX = x; lastY = y;
+      }
+
+      if (dataSet.length > 1) {
+        if (seriesOptions.fillStyle) {
+          // Close up the fill region.
+          context.lineTo(dimensions.width + seriesOptions.lineWidth + 1, lastY);
+          context.lineTo(dimensions.width + seriesOptions.lineWidth + 1, dimensions.height + seriesOptions.lineWidth + 1);
+          context.lineTo(firstX, dimensions.height + seriesOptions.lineWidth);
+          context.fillStyle = seriesOptions.fillStyle;
+          context.fill();
+        }
+
+        if (seriesOptions.strokeStyle && seriesOptions.strokeStyle !== 'none') {
+          context.stroke();
+        }
+        context.closePath();
+      }
+      context.restore();
+    }
+
+    // Draw the axis values on the chart.
+    if (!chartOptions.labels.disabled && !isNaN(this.valueRange.min) && !isNaN(this.valueRange.max)) {
+      var maxValueString = parseFloat(this.valueRange.max).toFixed(chartOptions.labels.precision),
+          minValueString = parseFloat(this.valueRange.min).toFixed(chartOptions.labels.precision);
+      context.fillStyle = chartOptions.labels.fillStyle;
+      context.fillText(maxValueString, dimensions.width - context.measureText(maxValueString).width - 2, chartOptions.labels.fontSize);
+      context.fillText(minValueString, dimensions.width - context.measureText(minValueString).width - 2, dimensions.height - 2);
+    }
+
+    context.restore(); // See .save() above.
+  };
+
+  // Sample timestamp formatting function
+  SmoothieChart.timeFormatter = function(date) {
+    function pad2(number) { return (number < 10 ? '0' : '') + number }
+    return pad2(date.getHours()) + ':' + pad2(date.getMinutes()) + ':' + pad2(date.getSeconds());
+  };
+
+  exports.TimeSeries = TimeSeries;
+  exports.SmoothieChart = SmoothieChart;
+
+})(typeof exports === 'undefined' ?  this : exports);
+
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedFrameWriter.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedFrameWriter.java
index b9c5edc..7e33c9a 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedFrameWriter.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedFrameWriter.java
@@ -1,6 +1,6 @@
 package edu.uci.ics.asterix.metadata.feeds;
 
-import java.io.OutputStream;
+import java.io.IOException;
 import java.net.Socket;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -46,8 +46,6 @@
 
     private FramePushWait framePushWait;
 
-    private SuperFeedManager sfm;
-
     private FeedRuntimeType feedRuntimeType;
 
     private int partition;
@@ -65,7 +63,7 @@
 
     public FeedFrameWriter(IFrameWriter writer, IOperatorNodePushable nodePushable, FeedConnectionId feedId,
             FeedPolicyEnforcer policyEnforcer, String nodeId, FeedRuntimeType feedRuntimeType, int partition,
-            ExecutorService executorService, FrameTupleAccessor fta) {
+            FrameTupleAccessor fta) {
         this.writer = writer;
         this.mode = Mode.FORWARD;
         this.nodePushable = nodePushable;
@@ -73,14 +71,11 @@
         this.policyEnforcer = policyEnforcer;
         this.feedRuntimeType = feedRuntimeType;
         this.partition = partition;
-        this.executorService = executorService;
+        this.executorService = FeedManager.INSTANCE.getFeedExecutorService(feedId);
         this.collectStatistics = policyEnforcer.getFeedPolicyAccessor().collectStatistics();
         if (collectStatistics) {
             this.statsOutbox = new LinkedBlockingQueue<Long>();
-            Runnable task = new FeedOperatorStatisticsCollector(feedId, statsOutbox, nodePushable);
-            executorService.execute(task);
-            sfm = FeedManager.INSTANCE.getSuperFeedManager(feedId);
-            framePushWait = new FramePushWait(nodePushable, FLUSH_THRESHOLD_TIME, sfm, feedId, nodeId, feedRuntimeType,
+            framePushWait = new FramePushWait(nodePushable, FLUSH_THRESHOLD_TIME, feedId, nodeId, feedRuntimeType,
                     partition, FLUSH_THRESHOLD_TIME);
             Timer timer = new Timer();
             timer.scheduleAtFixedRate(framePushWait, 0, FLUSH_THRESHOLD_TIME);
@@ -127,6 +122,10 @@
         writer.open();
     }
 
+    public void reset() {
+        framePushWait.reset();
+    }
+
     @Override
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
         switch (mode) {
@@ -171,7 +170,6 @@
         private IOperatorNodePushable nodePushable;
         private State state;
         private long flushThresholdTime;
-        private SuperFeedManager sfm;
         private static final String EOL = "\n";
         private FeedConnectionId feedId;
         private String nodeId;
@@ -180,13 +178,13 @@
         private AtomicLong numTuplesInInterval = new AtomicLong(0);
         private long period;
         private boolean collectThroughput;
+        private FeedMessageService mesgService;
 
-        public FramePushWait(IOperatorNodePushable nodePushable, long flushThresholdTime, SuperFeedManager sfm,
-                FeedConnectionId feedId, String nodeId, FeedRuntimeType feedRuntimeType, int partition, long period) {
+        public FramePushWait(IOperatorNodePushable nodePushable, long flushThresholdTime, FeedConnectionId feedId,
+                String nodeId, FeedRuntimeType feedRuntimeType, int partition, long period) {
             this.nodePushable = nodePushable;
             this.flushThresholdTime = flushThresholdTime;
             this.state = State.INTIALIZED;
-            this.sfm = sfm;
             this.feedId = feedId;
             this.nodeId = nodeId;
             this.feedRuntimeType = feedRuntimeType;
@@ -201,15 +199,15 @@
 
         }
 
+        public void reset() {
+            mesgService = null;
+        }
+
         public void notifyFinish(int numTuples) {
             state = State.WAITNG_FOR_NEXT_FRAME;
             numTuplesInInterval.set(numTuplesInInterval.get() + numTuples);
         }
 
-        public void resetSuperFeedManager(SuperFeedManager sfm) {
-            this.sfm = sfm;
-        }
-
         @Override
         public void run() {
             if (state.equals(State.WAITING_FOR_FLUSH_COMPLETION)) {
@@ -218,46 +216,39 @@
                     if (LOGGER.isLoggable(Level.SEVERE)) {
                         LOGGER.severe("Congestion reported by " + feedRuntimeType + " [" + partition + "]");
                     }
-                    reportCongestionToSFM(currentTime - startTime);
+                    sendReportToSFM(currentTime - startTime, FeedReportMessageType.CONGESTION);
                 }
             }
             if (collectThroughput) {
-                int instantTput = (int) ((numTuplesInInterval.get() * 1000) / period);
-                reportThroughputToSFM(instantTput);
+                int instantTput = (int) Math.ceil(new Double(((numTuplesInInterval.get() * 1000) / period)));
+                sendReportToSFM(instantTput, FeedReportMessageType.THROUGHPUT);
             }
             numTuplesInInterval.set(0);
         }
 
-        private void reportCongestionToSFM(long waitingTime) {
-            sendReportToSFM(waitingTime, FeedReportMessageType.CONGESTION);
-        }
-
-        private void reportThroughputToSFM(long throughput) {
-            sendReportToSFM(throughput, FeedReportMessageType.THROUGHPUT);
-        }
-
         private void sendReportToSFM(long value, SuperFeedManager.FeedReportMessageType mesgType) {
             String feedRep = feedId.getDataverse() + ":" + feedId.getFeedName() + ":" + feedId.getDatasetName();
             String operator = "" + feedRuntimeType;
-            String mesg = mesgType.name().toLowerCase() + "|" + feedRep + "|" + operator + "|" + partition + "|"
-                    + value + "|" + EOL;
-
-            Socket sc = null;
-            try {
-                while (sfm == null) {
-                    sfm = FeedManager.INSTANCE.getSuperFeedManager(feedId);
-                    if (sfm == null) {
-                        Thread.sleep(2000);
-                    } else {
-                        break;
+            String message = mesgType.name().toLowerCase() + "|" + feedRep + "|" + operator + "|" + partition + "|"
+                    + value + "|" + nodeId + "|" + EOL;
+            if (mesgService == null) {
+                while (mesgService == null) {
+                    mesgService = FeedManager.INSTANCE.getFeedMessageService(feedId);
+                    if (mesgService == null) {
+                        try {
+                            Thread.sleep(2000);
+                        } catch (InterruptedException e) {
+                            e.printStackTrace();
+                        }
                     }
                 }
-                sc = new Socket(sfm.getHost(), sfm.getPort());
-                OutputStream os = sc.getOutputStream();
-                os.write(mesg.getBytes());
-            } catch (Exception e) {
+            }
+            try {
+                mesgService.sendMessage(message);
+            } catch (IOException ioe) {
                 if (LOGGER.isLoggable(Level.WARNING)) {
-                    LOGGER.warning("Unable to report congestion to " + sfm);
+                    LOGGER.warning("Unable to send feed report to SFM for feed " + feedId + " " + feedRuntimeType + "["
+                            + partition + "]");
                 }
             }
         }
@@ -355,6 +346,9 @@
     @Override
     public void fail() throws HyracksDataException {
         writer.fail();
+        if (timer != null) {
+            timer.cancel();
+        }
     }
 
     @Override
@@ -378,9 +372,4 @@
         return "MaterializingFrameWriter using " + writer;
     }
 
-    public void resetSuperFeedManager() {
-        sfm = null;
-        framePushWait.resetSuperFeedManager(null);
-    }
-
 }
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
index c0f6222..3293d26 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
@@ -64,10 +64,8 @@
         AdapterRuntimeManager adapterRuntimeMgr = null;
         try {
             if (ingestionRuntime == null) {
-                ingestionRuntime = new IngestionRuntime(feedId, partition, FeedRuntimeType.INGESTION, adapterRuntimeMgr);
-                ExecutorService executorService = FeedManager.INSTANCE.registerFeedRuntime(ingestionRuntime);
                 FeedFrameWriter mWriter = new FeedFrameWriter(writer, this, feedId, policyEnforcer, nodeId,
-                        FeedRuntimeType.INGESTION, partition, executorService, fta);
+                        FeedRuntimeType.INGESTION, partition, fta);
                 adapterRuntimeMgr = new AdapterRuntimeManager(feedId, adapter, mWriter, partition, inbox);
 
                 if (adapter instanceof AbstractFeedDatasourceAdapter) {
@@ -86,6 +84,7 @@
                 adapter = adapterRuntimeMgr.getFeedAdapter();
                 writer.open();
                 adapterRuntimeMgr.getAdapterExecutor().setWriter(writer);
+                adapterRuntimeMgr.getAdapterExecutor().getWriter().reset();
                 adapterRuntimeMgr.setState(State.ACTIVE_INGESTION);
             }
 
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedManager.java
index 6e8cc63..a67e137 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedManager.java
@@ -15,6 +15,7 @@
 package edu.uci.ics.asterix.metadata.feeds;
 
 import java.io.IOException;
+import java.net.UnknownHostException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
@@ -40,11 +41,16 @@
     private Map<FeedConnectionId, SuperFeedManager> superFeedManagers = new HashMap<FeedConnectionId, SuperFeedManager>();
     private Map<FeedConnectionId, Map<FeedRuntimeId, FeedRuntime>> feedRuntimes = new HashMap<FeedConnectionId, Map<FeedRuntimeId, FeedRuntime>>();
     private Map<FeedConnectionId, ExecutorService> feedExecutorService = new HashMap<FeedConnectionId, ExecutorService>();
+    private Map<FeedConnectionId, FeedMessageService> feedMessageService = new HashMap<FeedConnectionId, FeedMessageService>();
 
     public ExecutorService getFeedExecutorService(FeedConnectionId feedId) {
         return feedExecutorService.get(feedId);
     }
 
+    public FeedMessageService getFeedMessageService(FeedConnectionId feedId) {
+        return feedMessageService.get(feedId);
+    }
+
     @Override
     public void deregisterFeed(FeedConnectionId feedId) {
         try {
@@ -67,6 +73,9 @@
             if (executorService != null && !executorService.isShutdown()) {
                 executorService.shutdownNow();
             }
+            superFeedManagers.remove(feedId);
+            feedMessageService.remove(feedId);
+
         } catch (Exception e) {
             e.printStackTrace();
             if (LOGGER.isLoggable(Level.WARNING)) {
@@ -76,11 +85,12 @@
     }
 
     @Override
-    public ExecutorService registerFeedRuntime(FeedRuntime feedRuntime) {
-        ExecutorService execService = feedExecutorService.get(feedRuntime.getFeedRuntimeId().getFeedId());
+    public ExecutorService registerFeedRuntime(FeedRuntime feedRuntime) throws Exception {
+        FeedConnectionId feedId = feedRuntime.getFeedRuntimeId().getFeedId();
+        ExecutorService execService = feedExecutorService.get(feedId);
         if (execService == null) {
             execService = Executors.newCachedThreadPool();
-            feedExecutorService.put(feedRuntime.getFeedRuntimeId().getFeedId(), execService);
+            feedExecutorService.put(feedId, execService);
         }
 
         Map<FeedRuntimeId, FeedRuntime> feedRuntimesForFeed = feedRuntimes.get(feedRuntime.getFeedRuntimeId()
@@ -120,15 +130,34 @@
     }
 
     @Override
-    public void registerSuperFeedManager(FeedConnectionId feedId, SuperFeedManager sfm) {
+    public void registerSuperFeedManager(FeedConnectionId feedId, SuperFeedManager sfm) throws Exception {
+        boolean overriden = superFeedManagers.get(feedId) != null;
         superFeedManagers.put(feedId, sfm);
+        FeedMessageService mesgService = feedMessageService.get(feedId);
+        if (overriden && mesgService != null) {
+            mesgService.stop();
+        }
+        if (mesgService == null || overriden) {
+            mesgService = new FeedMessageService(feedId);
+            feedMessageService.put(feedId, mesgService);
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("Started Feed Message Service for feed :" + feedId);
+            }
+            mesgService.start();
+        }
     }
 
     @Override
     public void deregisterSuperFeedManager(FeedConnectionId feedId) {
         SuperFeedManager sfm = superFeedManagers.remove(feedId);
         try {
-            sfm.stop();
+            if (sfm.isLocal()) {
+                sfm.stop();
+            }
+            FeedMessageService fms = feedMessageService.remove(feedId);
+            if (fms != null) {
+                fms.stop();
+            }
         } catch (IOException e) {
             e.printStackTrace();
         }
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedManagerElectMessage.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedManagerElectMessage.java
index 103eb94..aedd124 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedManagerElectMessage.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedManagerElectMessage.java
@@ -23,11 +23,15 @@
 
     private static final long serialVersionUID = 1L;
 
-    private final SuperFeedManager superFeedMaanger;
+    private final String host;
+    private final String nodeId;
+    private final int port;
 
-    public FeedManagerElectMessage(SuperFeedManager superFeedManager) {
-        super(MessageType.SUPER_FEED_MANAGER_ELECT, superFeedManager.getFeedConnectionId());
-        this.superFeedMaanger = superFeedManager;
+    public FeedManagerElectMessage(String host, String nodeId, int port, FeedConnectionId feedId) {
+        super(MessageType.SUPER_FEED_MANAGER_ELECT, feedId);
+        this.host = host;
+        this.port = port;
+        this.nodeId = nodeId;
     }
 
     @Override
@@ -35,13 +39,21 @@
         return MessageType.SUPER_FEED_MANAGER_ELECT;
     }
 
-    public SuperFeedManager getSuperFeedMaanger() {
-        return superFeedMaanger;
-    }
-
     @Override
     public String toString() {
-        return superFeedMaanger.toString();
+        return host + "_" + nodeId + "[" + port + "]";
+    }
+
+    public String getHost() {
+        return host;
+    }
+
+    public String getNodeId() {
+        return nodeId;
+    }
+
+    public int getPort() {
+        return port;
     }
 
 }
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java
index 47e155c..81cbd96 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java
@@ -74,22 +74,33 @@
                     if (LOGGER.isLoggable(Level.INFO)) {
                         LOGGER.info("Registering SUPER Feed MGR for :" + feedId);
                     }
-                    SuperFeedManager sfm = ((FeedManagerElectMessage) feedMessage).getSuperFeedMaanger();
+                    FeedManagerElectMessage mesg = ((FeedManagerElectMessage) feedMessage);
+                    SuperFeedManager sfm = new SuperFeedManager(mesg.getFeedId(), mesg.getHost(), mesg.getNodeId(),
+                            mesg.getPort());
                     synchronized (FeedManager.INSTANCE) {
-                        if (FeedManager.INSTANCE.getSuperFeedManager(feedId) == null) {
-                            FeedManager.INSTANCE.registerSuperFeedManager(feedId, sfm);
-                            INCApplicationContext ncCtx = ctx.getJobletContext().getApplicationContext();
-                            String nodeId = ncCtx.getNodeId();
+                        INCApplicationContext ncCtx = ctx.getJobletContext().getApplicationContext();
+                        String nodeId = ncCtx.getNodeId();
 
-                            if (sfm.getNodeId().equals(nodeId)) {
-                                System.out.println("STARTED SUPER FEED MANAGER !!!!!!!!!!!");
-                                sfm.setLocal(true);
-                                sfm.start();
-                                if (LOGGER.isLoggable(Level.INFO)) {
-                                    LOGGER.info("Started Super Feed Manager for " + feedId);
-                                }
+                        if (sfm.getNodeId().equals(nodeId)) {
+                            SuperFeedManager currentManager = FeedManager.INSTANCE.getSuperFeedManager(feedId);
+                            if (currentManager != null) {
+                                currentManager.stop();
+                                FeedManager.INSTANCE.deregisterSuperFeedManager(feedId);
                             }
+
+                            sfm.setLocal(true);
+                            sfm.start();
+                            System.out.println("STARTED SUPER FEED MANAGER !!!!!!!!!!!");
+
+                            if (LOGGER.isLoggable(Level.INFO)) {
+                                LOGGER.info("Started Super Feed Manager for " + feedId);
+                            }
+                        } else {
+                            Thread.sleep(5000);
                         }
+                        FeedManager.INSTANCE.registerSuperFeedManager(feedId, sfm);
+                        System.out.println("REGISTERED SUPER FEED MANAGER ! + is LOCAL ?" + sfm.isLocal());
+
                     }
                     break;
 
@@ -104,6 +115,7 @@
             }
 
         } catch (Exception e) {
+            e.printStackTrace();
             throw new HyracksDataException(e);
         } finally {
             writer.close();
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageService.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageService.java
new file mode 100644
index 0000000..12b22a9
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageService.java
@@ -0,0 +1,122 @@
+package edu.uci.ics.asterix.metadata.feeds;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.Socket;
+import java.net.UnknownHostException;
+import java.nio.CharBuffer;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class FeedMessageService {
+
+    private static final char EOL = (char) "\n".getBytes()[0];
+
+    private final FeedConnectionId feedId;
+    private final LinkedBlockingQueue<String> inbox;
+    private final FeedMessageHandler mesgHandler;
+
+    public FeedMessageService(FeedConnectionId feedId) {
+        this.feedId = feedId;
+        this.inbox = new LinkedBlockingQueue<String>();
+        mesgHandler = new FeedMessageHandler(inbox, feedId);
+    }
+
+    public void start() throws UnknownHostException, IOException, Exception {
+        FeedManager.INSTANCE.getFeedExecutorService(feedId).execute(mesgHandler);
+    }
+
+    public void stop() throws IOException {
+        mesgHandler.stop();
+    }
+
+    public void sendMessage(String message) throws IOException {
+        inbox.add(message);
+    }
+
+    private static class FeedMessageHandler implements Runnable {
+
+        private final LinkedBlockingQueue<String> inbox;
+        private final FeedConnectionId feedId;
+        private Socket sfmSocket;
+        private boolean process = true;
+
+        public FeedMessageHandler(LinkedBlockingQueue<String> inbox, FeedConnectionId feedId) {
+            this.inbox = inbox;
+            this.feedId = feedId;
+        }
+
+        public void run() {
+            try {
+                sfmSocket = obtainSFMSocket();
+                while (process) {
+                    String message = inbox.take();
+                    sfmSocket.getOutputStream().write(message.getBytes());
+                }
+            } catch (Exception e) {
+                e.printStackTrace();
+            } finally {
+                System.out.println("STOPPING MESSAGE HANDLER");
+                if (sfmSocket != null) {
+                    try {
+                        sfmSocket.close();
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    }
+                }
+            }
+
+        }
+
+        public void stop() {
+            process = false;
+        }
+
+        private Socket obtainSFMSocket() throws UnknownHostException, IOException, Exception {
+            Socket sfmDirServiceSocket = null;
+            FeedMessageService fsm = FeedManager.INSTANCE.getFeedMessageService(feedId);
+            while (fsm == null) {
+                fsm = FeedManager.INSTANCE.getFeedMessageService(feedId);
+                if (fsm == null) {
+                    Thread.sleep(2000);
+                } else {
+                    break;
+                }
+            }
+            SuperFeedManager sfm = FeedManager.INSTANCE.getSuperFeedManager(feedId);
+
+            System.out.println(" OBTAINED SFM DETAILS WILL TRY TO CONNECT " + sfm);
+
+            try {
+                sfmDirServiceSocket = new Socket(sfm.getHost(), sfm.getPort());
+                System.out.println(" CONNECTED TO " + sfm.getHost() + " " + sfm.getPort());
+
+                while (!sfmDirServiceSocket.isConnected()) {
+                    Thread.sleep(2000);
+                }
+                InputStream in = sfmDirServiceSocket.getInputStream();
+                CharBuffer buffer = CharBuffer.allocate(50);
+                char ch = 0;
+                while (ch != EOL) {
+                    buffer.put(ch);
+                    ch = (char) in.read();
+                }
+                buffer.flip();
+                String s = new String(buffer.array());
+                int port = Integer.parseInt(s.trim());
+                System.out.println("OBTAINED PORT " + port + " WILL CONNECT AT " + sfm.getHost() + " " + port);
+
+                sfmSocket = new Socket(sfm.getHost(), port);
+            } catch (Exception e) {
+                System.out.println(" COULT NOT CONNECT TO " + sfm);
+                e.printStackTrace();
+                throw e;
+            } finally {
+                if (sfmDirServiceSocket != null) {
+                    sfmDirServiceSocket.close();
+                }
+            }
+            return sfmSocket;
+        }
+    }
+
+}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaOperatorDescriptor.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaOperatorDescriptor.java
index cdc4d2a..f933504 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaOperatorDescriptor.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaOperatorDescriptor.java
@@ -87,26 +87,29 @@
         @Override
         public void open() throws HyracksDataException {
             FeedRuntimeId runtimeId = new FeedRuntimeId(runtimeType, feedId, partition);
-            System.out.println("TRYING TO OBTAIN FEED RUNTIME" + runtimeId);
-            feedRuntime = FeedManager.INSTANCE.getFeedRuntime(runtimeId);
-            if (feedRuntime == null) {
-                feedRuntime = new FeedRuntime(feedId, partition, runtimeType);
-                feedExecService = FeedManager.INSTANCE.registerFeedRuntime(feedRuntime);
-                if (LOGGER.isLoggable(Level.WARNING)) {
-                    LOGGER.warning("Did not find a saved state, starting fresh for " + runtimeType + " node.");
+            try {
+                feedRuntime = FeedManager.INSTANCE.getFeedRuntime(runtimeId);
+                if (feedRuntime == null) {
+                    feedRuntime = new FeedRuntime(feedId, partition, runtimeType);
+                    feedExecService = FeedManager.INSTANCE.registerFeedRuntime(feedRuntime);
+                    if (LOGGER.isLoggable(Level.WARNING)) {
+                        LOGGER.warning("Did not find a saved state, starting fresh for " + runtimeType + " node.");
+                    }
+                    resumeOldState = false;
+                } else {
+                    if (LOGGER.isLoggable(Level.WARNING)) {
+                        LOGGER.warning("Resuming from saved state (if any) of " + runtimeType + " node.");
+                    }
+                    feedExecService = FeedManager.INSTANCE.getFeedExecutorService(feedId);
+                    resumeOldState = true;
                 }
-                resumeOldState = false;
-            } else {
-                if (LOGGER.isLoggable(Level.WARNING)) {
-                    LOGGER.warning("Resuming from saved state (if any) of " + runtimeType + " node.");
-                }
-                feedExecService = FeedManager.INSTANCE.getFeedExecutorService(feedId);
-                resumeOldState = true;
+                FeedFrameWriter mWriter = new FeedFrameWriter(writer, this, feedId, policyEnforcer, nodeId,
+                        runtimeType, partition, fta);
+                coreOperatorNodePushable.setOutputFrameWriter(0, mWriter, recordDesc);
+                coreOperatorNodePushable.open();
+            } catch (Exception e) {
+                throw new HyracksDataException(e);
             }
-            FeedFrameWriter mWriter = new FeedFrameWriter(writer, this, feedId, policyEnforcer, nodeId, runtimeType,
-                    partition, feedExecService, fta);
-            coreOperatorNodePushable.setOutputFrameWriter(0, mWriter, recordDesc);
-            coreOperatorNodePushable.open();
         }
 
         @Override
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedReport.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedReport.java
new file mode 100644
index 0000000..d0a83d8
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedReport.java
@@ -0,0 +1,104 @@
+package edu.uci.ics.asterix.metadata.feeds;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import edu.uci.ics.asterix.metadata.feeds.FeedRuntime.FeedRuntimeType;
+import edu.uci.ics.asterix.metadata.feeds.SuperFeedManager.FeedReportMessageType;
+
+public class FeedReport implements Comparable {
+
+    private FeedConnectionId feedId;
+    private FeedReportMessageType reportType;
+    private int partition = -1;
+    private FeedRuntimeType runtimeType;
+    private long value = -1;
+    private final String[] representation;
+
+    public FeedReport(String rep) {
+        representation = rep.split("\\|");
+    }
+
+    @Override
+    public String toString() {
+        return getFeedId() + " " + getReportType() + " " + getPartition() + " " + getRuntimeType() + " " + getValue();
+    }
+
+    public FeedConnectionId getFeedId() {
+        if (feedId == null) {
+            String feedIdRep = representation[1];
+            String[] feedIdComp = feedIdRep.split(":");
+            feedId = new FeedConnectionId(feedIdComp[0], feedIdComp[1], feedIdComp[2]);
+        }
+        return feedId;
+    }
+
+    public FeedReportMessageType getReportType() {
+        if (reportType == null) {
+            reportType = FeedReportMessageType.valueOf(representation[0].toUpperCase());
+        }
+        return reportType;
+    }
+
+    public int getPartition() {
+        if (partition < 0) {
+            partition = Integer.parseInt(representation[3]);
+        }
+        return partition;
+    }
+
+    public FeedRuntimeType getRuntimeType() {
+        if (runtimeType == null) {
+            runtimeType = FeedRuntimeType.valueOf(representation[2].toUpperCase());
+        }
+        return runtimeType;
+    }
+
+    public long getValue() {
+        if (value < 0) {
+            value = Long.parseLong(representation[4]);
+        }
+        return value;
+    }
+
+    public String[] getRepresentation() {
+        return representation;
+    }
+
+    @Override
+    public int compareTo(Object o) {
+        if (!(o instanceof FeedReport)) {
+            throw new IllegalArgumentException("Incorrect operand type " + o);
+        }
+
+        FeedReport other = (FeedReport) o;
+        if (!other.getReportType().equals(getReportType())) {
+            throw new IllegalArgumentException("Incorrect operand type " + o);
+        }
+
+        int returnValue = 0;
+
+        switch (getReportType()) {
+            case CONGESTION:
+                returnValue = ranking.get(getRuntimeType()) - ranking.get(other.getRuntimeType());
+                break;
+
+            case THROUGHPUT:
+                returnValue = (int) (other.getValue() - getValue());
+                break;
+        }
+
+        return returnValue;
+    }
+
+    private static Map<FeedRuntimeType, Integer> ranking = populateRanking();
+
+    private static Map<FeedRuntimeType, Integer> populateRanking() {
+        Map<FeedRuntimeType, Integer> ranking = new HashMap<FeedRuntimeType, Integer>();
+        ranking.put(FeedRuntimeType.INGESTION, 1);
+        ranking.put(FeedRuntimeType.COMPUTE, 2);
+        ranking.put(FeedRuntimeType.STORAGE, 3);
+        ranking.put(FeedRuntimeType.COMMIT, 4);
+        return ranking;
+    }
+}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedManager.java
index a0be4ff..278fadd 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedManager.java
@@ -32,8 +32,9 @@
 
     /**
      * @param feedRuntime
+     * @throws Exception
      */
-    public ExecutorService registerFeedRuntime(FeedRuntime feedRuntime);
+    public ExecutorService registerFeedRuntime(FeedRuntime feedRuntime) throws Exception;
 
     /**
      * @param feedRuntimeId
@@ -49,8 +50,9 @@
     /**
      * @param feedId
      * @param sfm
+     * @throws Exception
      */
-    public void registerSuperFeedManager(FeedConnectionId feedId, SuperFeedManager sfm);
+    public void registerSuperFeedManager(FeedConnectionId feedId, SuperFeedManager sfm) throws Exception;
 
     /**
      * @param feedId
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/MessageListener.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/MessageListener.java
new file mode 100644
index 0000000..3be39f3
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/MessageListener.java
@@ -0,0 +1,165 @@
+package edu.uci.ics.asterix.metadata.feeds;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.nio.CharBuffer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class MessageListener {
+
+    private static final Logger LOGGER = Logger.getLogger(MessageListener.class.getName());
+
+    private int port;
+    private final LinkedBlockingQueue<String> outbox;
+
+    private ExecutorService executorService = Executors.newFixedThreadPool(10);
+
+    private MessageListenerServer listenerServer;
+
+    public MessageListener(int port, LinkedBlockingQueue<String> outbox) {
+        this.port = port;
+        this.outbox = outbox;
+    }
+
+    public void stop() {
+        if (!executorService.isShutdown()) {
+            executorService.shutdownNow();
+        }
+        listenerServer.stop();
+        System.out.println("STOPPED MESSAGE RECEIVING SERVICE AT " + port);
+
+    }
+
+    public void start() throws IOException {
+        System.out.println("STARTING MESSAGE RECEIVING SERVICE AT " + port);
+        listenerServer = new MessageListenerServer(port, outbox);
+        executorService.execute(listenerServer);
+    }
+
+    private static class MessageListenerServer implements Runnable {
+
+        private final int port;
+        private final LinkedBlockingQueue<String> outbox;
+        private ServerSocket server;
+
+        public MessageListenerServer(int port, LinkedBlockingQueue<String> outbox) {
+            this.port = port;
+            this.outbox = outbox;
+        }
+
+        public void stop() {
+            try {
+                server.close();
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+        }
+
+        @Override
+        public void run() {
+            char EOL = (char) "\n".getBytes()[0];
+            Socket client = null;
+            try {
+                server = new ServerSocket(port);
+                client = server.accept();
+                InputStream in = client.getInputStream();
+                CharBuffer buffer = CharBuffer.allocate(5000);
+                char ch;
+                while (true) {
+                    ch = (char) in.read();
+                    if (((int) ch) == -1) {
+                        break;
+                    }
+                    while (ch != EOL) {
+                        buffer.put(ch);
+                        ch = (char) in.read();
+                    }
+                    buffer.flip();
+                    String s = new String(buffer.array());
+                    synchronized (outbox) {
+                        outbox.add(s + "\n");
+                    }
+                    buffer.position(0);
+                    buffer.limit(5000);
+                }
+
+            } catch (Exception e) {
+                if (LOGGER.isLoggable(Level.WARNING)) {
+                    LOGGER.warning("Unable to start Message listener" + server);
+                }
+            } finally {
+                if (server != null) {
+                    try {
+                        server.close();
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    }
+                }
+            }
+
+        }
+
+    }
+
+    private static class MessageParser implements Runnable {
+
+        private Socket client;
+        private IMessageAnalyzer messageAnalyzer;
+        private static final char EOL = (char) "\n".getBytes()[0];
+
+        public MessageParser(Socket client, IMessageAnalyzer messageAnalyzer) {
+            this.client = client;
+            this.messageAnalyzer = messageAnalyzer;
+        }
+
+        @Override
+        public void run() {
+            CharBuffer buffer = CharBuffer.allocate(5000);
+            char ch;
+            try {
+                InputStream in = client.getInputStream();
+                while (true) {
+                    ch = (char) in.read();
+                    if (((int) ch) == -1) {
+                        break;
+                    }
+                    while (ch != EOL) {
+                        buffer.put(ch);
+                        ch = (char) in.read();
+                    }
+                    buffer.flip();
+                    String s = new String(buffer.array());
+                    synchronized (messageAnalyzer) {
+                        messageAnalyzer.getMessageQueue().add(s + "\n");
+                    }
+                    buffer.position(0);
+                    buffer.limit(5000);
+                }
+            } catch (IOException ioe) {
+                ioe.printStackTrace();
+            } finally {
+                try {
+                    client.close();
+                } catch (IOException ioe) {
+                    // do nothing
+                }
+            }
+        }
+    }
+
+    public static interface IMessageAnalyzer {
+
+        /**
+         * @return
+         */
+        public LinkedBlockingQueue<String> getMessageQueue();
+
+    }
+
+}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/SuperFeedManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/SuperFeedManager.java
index a51bf6e..fcc9026 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/SuperFeedManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/SuperFeedManager.java
@@ -16,13 +16,12 @@
 
 import java.io.IOException;
 import java.io.OutputStream;
-import java.io.Serializable;
+import java.net.ServerSocket;
 import java.net.Socket;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.logging.Level;
 import java.util.logging.Logger;
@@ -30,13 +29,10 @@
 import edu.uci.ics.asterix.metadata.feeds.FeedRuntime.FeedRuntimeType;
 import edu.uci.ics.asterix.metadata.feeds.MessageListener.IMessageAnalyzer;
 import edu.uci.ics.asterix.om.util.AsterixClusterProperties;
-import edu.uci.ics.asterix.om.util.AsterixRuntimeUtil;
 
-public class SuperFeedManager implements Serializable {
+public class SuperFeedManager {
 
     private static final Logger LOGGER = Logger.getLogger(SuperFeedManager.class.getName());
-
-    private static final long serialVersionUID = 1L;
     private String host;
 
     private final int port;
@@ -45,22 +41,25 @@
 
     private final FeedConnectionId feedConnectionId;
 
-    private MessageListener listener;
+    // private MessageListener listener;
 
     private boolean isLocal = false;
 
-    private transient ExecutorService executorService;
+    private SuperFeedManagerService sfmService;
+
+    private LinkedBlockingQueue<String> inbox;
 
     public enum FeedReportMessageType {
         CONGESTION,
         THROUGHPUT
     }
 
-    public SuperFeedManager(FeedConnectionId feedId, String nodeId, int port) throws Exception {
+    public SuperFeedManager(FeedConnectionId feedId, String host, String nodeId, int port) throws Exception {
         this.feedConnectionId = feedId;
         this.nodeId = nodeId;
         this.port = port;
-        initialize();
+        this.host = host;
+        this.inbox = new LinkedBlockingQueue<String>();
     }
 
     public int getPort() {
@@ -75,18 +74,6 @@
         return nodeId;
     }
 
-    private void initialize() throws Exception {
-        Map<String, Set<String>> ncs = AsterixRuntimeUtil.getNodeControllerMap();
-        for (Entry<String, Set<String>> entry : ncs.entrySet()) {
-            String ip = entry.getKey();
-            Set<String> nc = entry.getValue();
-            if (nc.contains(nodeId)) {
-                host = ip;
-                break;
-            }
-        }
-    }
-
     public FeedConnectionId getFeedConnectionId() {
         return feedConnectionId;
     }
@@ -100,20 +87,17 @@
     }
 
     public void start() throws IOException {
-        if (listener == null) {
-            if (executorService == null) {
-                executorService = Executors.newCachedThreadPool();
-            }
-            listener = new MessageListener(port, new SuperFeedManagerMessageAnalzer(executorService));
-            listener.start();
+        if (sfmService == null) {
+            ExecutorService executorService = FeedManager.INSTANCE.getFeedExecutorService(feedConnectionId);
+            sfmService = new SuperFeedManagerService(port, inbox, feedConnectionId);
+            executorService.execute(sfmService);
         }
+        System.out.println("STARTED SUPER FEED MANAGER!");
     }
 
     public void stop() throws IOException {
-        if (listener != null) {
-            listener.stop();
-        }
-        executorService.shutdownNow();
+        sfmService.stop();
+        System.out.println("STOPPED SUPER FEED MANAGER!");
     }
 
     @Override
@@ -121,19 +105,156 @@
         return feedConnectionId + "[" + nodeId + "(" + host + ")" + ":" + port + "]";
     }
 
+    public static class SuperFeedManagerMessages {
+
+        private static final String EOL = "\n";
+
+        public enum MessageType {
+            FEED_PORT_REQUEST,
+            FEED_PORT_RESPONSE
+        }
+
+        public static final byte[] SEND_PORT_REQUEST = (MessageType.FEED_PORT_REQUEST.name() + EOL).getBytes();
+    }
+
+    private static class SuperFeedManagerService implements Runnable {
+
+        private int nextPort;
+        private ServerSocket server;
+        private String EOM = "\n";
+
+        private final LinkedBlockingQueue<String> inbox;
+        private List<MessageListener> messageListeners;
+        private SFMessageAnalyzer mesgAnalyzer;
+        private final FeedConnectionId feedId;
+        private boolean process = true;
+
+        public SuperFeedManagerService(int port, LinkedBlockingQueue<String> inbox, FeedConnectionId feedId)
+                throws IOException {
+            server = new ServerSocket(port);
+            nextPort = port;
+            this.inbox = inbox;
+            this.feedId = feedId;
+            this.messageListeners = new ArrayList<MessageListener>();
+            mesgAnalyzer = new SFMessageAnalyzer(inbox);
+            FeedManager.INSTANCE.getFeedExecutorService(feedId).execute(mesgAnalyzer);
+        }
+
+        public void stop() {
+            process = false;
+            if (server != null) {
+                try {
+                    server.close();
+                } catch (IOException e) {
+                    e.printStackTrace();
+                }
+            }
+            mesgAnalyzer.stop();
+        }
+
+        @Override
+        public void run() {
+            Socket client = null;
+            while (true) {
+                try {
+                    client = server.accept();
+                    OutputStream os = client.getOutputStream();
+                    nextPort++;
+                    MessageListener listener = new MessageListener(nextPort, inbox);
+                    listener.start();
+                    messageListeners.add(listener);
+                    os.write((nextPort + EOM).getBytes());
+                    os.flush();
+                } catch (IOException e) {
+                    if (process == false) {
+                        break;
+                    }
+                    e.printStackTrace();
+                } finally {
+                    if (client != null) {
+                        try {
+                            client.close();
+                        } catch (Exception e) {
+                            e.printStackTrace();
+                        }
+                    }
+                }
+            }
+        }
+
+    }
+
+    private static class SFMessageAnalyzer implements Runnable {
+
+        private final LinkedBlockingQueue<String> inbox;
+        private final Socket socket;
+        private final OutputStream os;
+        private boolean process = true;
+
+        public SFMessageAnalyzer(LinkedBlockingQueue<String> inbox) throws UnknownHostException, IOException {
+            this.inbox = inbox;
+            String ccHost = AsterixClusterProperties.INSTANCE.getCluster().getMasterNode().getClusterIp();
+            socket = null; //new Socket(ccHost, 2999);
+            os = null; //socket.getOutputStream();
+        }
+
+        public void stop() {
+            process = false;
+        }
+
+        public void run() {
+            while (process) {
+                try {
+                    String message = inbox.take();
+                    FeedReport report = new FeedReport(message);
+                    FeedReportMessageType mesgType = report.getReportType();
+                    switch (mesgType) {
+                        case THROUGHPUT:
+                            //send message to FeedHealthDataReceiver at CC (2999)
+                            //          os.write(message.getBytes());
+                            if (LOGGER.isLoggable(Level.INFO)) {
+                                LOGGER.warning("SuperFeedManager received message " + message);
+                            }
+                            break;
+                        case CONGESTION:
+                            // congestionInbox.add(report);
+                            break;
+                    }
+                    if (LOGGER.isLoggable(Level.WARNING)) {
+                        LOGGER.warning(message);
+                    }
+                    if (os != null) {
+                        os.close();
+                    }
+                    if (socket != null) {
+                        socket.close();
+                    }
+                } catch (InterruptedException e) {
+                    // TODO Auto-generated catch block
+                    e.printStackTrace();
+                } catch (IOException e) {
+                    // TODO Auto-generated catch block
+                    e.printStackTrace();
+                }
+            }
+        }
+    }
+
     private static class SuperFeedManagerMessageAnalzer implements IMessageAnalyzer {
 
         private String ccHost;
         private CongestionAnalyzer congestionAnalyzer;
         private LinkedBlockingQueue<FeedReport> congestionInbox = new LinkedBlockingQueue<FeedReport>();
+        private ExecutorService executorService;
+        private LinkedBlockingQueue<String> mesgInbox = new LinkedBlockingQueue<String>();
 
-        public SuperFeedManagerMessageAnalzer(ExecutorService executorService) {
+        public SuperFeedManagerMessageAnalzer(FeedConnectionId feedId) {
             ccHost = AsterixClusterProperties.INSTANCE.getCluster().getMasterNode().getClusterIp();
             congestionAnalyzer = new CongestionAnalyzer(congestionInbox);
+            executorService = FeedManager.INSTANCE.getFeedExecutorService(feedId);
             executorService.execute(congestionAnalyzer);
         }
 
-        @Override
         public void receiveMessage(String message) {
             Socket socket = null;
             OutputStream os = null;
@@ -146,6 +267,9 @@
                         socket = new Socket(ccHost, 2999);
                         os = socket.getOutputStream();
                         os.write(message.getBytes());
+                        if (LOGGER.isLoggable(Level.INFO)) {
+                            LOGGER.warning("SuperFeedManager received message " + message);
+                        }
                         break;
                     case CONGESTION:
                         congestionInbox.add(report);
@@ -168,6 +292,11 @@
             }
         }
 
+        @Override
+        public LinkedBlockingQueue<String> getMessageQueue() {
+            return mesgInbox;
+        }
+
     }
 
     private static class CongestionAnalyzer implements Runnable {
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapter.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapter.java
index 94250d2..acd9d7a 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapter.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapter.java
@@ -240,8 +240,8 @@
 
                 int currentMeasureTweets = gen.getNumFlushedTweets();
 
-                if (LOGGER.isLoggable(Level.WARNING)) {
-                    LOGGER.warning(dataset + " " + partition + " " + gen.getNumFlushedTweets() + "\t"
+                if (LOGGER.isLoggable(Level.FINE)) {
+                    LOGGER.fine(dataset + " " + partition + " " + gen.getNumFlushedTweets() + "\t"
                             + ((currentMeasureTweets - prevMeasuredTweets) / tputDuration) + " ID "
                             + Thread.currentThread().getId());
                 }