fixed issue related to election of super feed manager
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedServlet.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedServlet.java
index dbcf566..19fec30 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedServlet.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedServlet.java
@@ -14,33 +14,26 @@
  */
 package edu.uci.ics.asterix.api.http.servlet;
 
+import java.awt.image.BufferedImage;
+import java.io.BufferedReader;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
 import java.io.PrintWriter;
 import java.util.List;
-import java.util.logging.Level;
 
-import javax.servlet.ServletContext;
+import javax.imageio.ImageIO;
 import javax.servlet.http.HttpServlet;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
-import org.json.JSONObject;
-
-import edu.uci.ics.asterix.api.common.APIFramework.DisplayFormat;
-import edu.uci.ics.asterix.api.common.SessionConfig;
-import edu.uci.ics.asterix.aql.base.Statement;
-import edu.uci.ics.asterix.aql.base.Statement.Kind;
-import edu.uci.ics.asterix.aql.parser.AQLParser;
-import edu.uci.ics.asterix.aql.parser.ParseException;
-import edu.uci.ics.asterix.aql.parser.TokenMgrError;
-import edu.uci.ics.asterix.aql.translator.AqlTranslator;
-import edu.uci.ics.asterix.common.config.GlobalConfig;
+import edu.uci.ics.asterix.common.exceptions.ACIDException;
+import edu.uci.ics.asterix.metadata.MetadataException;
 import edu.uci.ics.asterix.metadata.MetadataManager;
-import edu.uci.ics.asterix.result.ResultReader;
-import edu.uci.ics.asterix.result.ResultUtils;
-import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
-import edu.uci.ics.hyracks.api.dataset.IHyracksDataset;
-import edu.uci.ics.hyracks.client.dataset.HyracksDataset;
+import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
+import edu.uci.ics.asterix.metadata.entities.Dataverse;
+import edu.uci.ics.asterix.metadata.entities.FeedActivity;
 
 public class FeedServlet extends HttpServlet {
     private static final long serialVersionUID = 1L;
@@ -51,48 +44,74 @@
 
     @Override
     public void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException {
-        response.setContentType("application/json");
-        response.setCharacterEncoding("utf-8");
+        String resourcePath = null;
+        String requestURI = request.getRequestURI();
 
-        PrintWriter out = response.getWriter();
-
-        DisplayFormat format = DisplayFormat.HTML;
-
-        String contentType = request.getContentType();
-
-        if ((contentType == null) || (contentType.equals("text/plain"))) {
-            format = DisplayFormat.TEXT;
-        } else if (contentType.equals("application/json")) {
-            format = DisplayFormat.JSON;
+        if (requestURI.equals("/")) {
+            response.setContentType("text/html");
+            resourcePath = "/feed/home.html";
+        } else {
+            resourcePath = requestURI;
         }
 
-
-        ServletContext context = getServletContext();
-        IHyracksClientConnection hcc;
-        IHyracksDataset hds;
-
         try {
-            synchronized (context) {
-                hcc = (IHyracksClientConnection) context.getAttribute(HYRACKS_CONNECTION_ATTR);
-
-                hds = (IHyracksDataset) context.getAttribute(HYRACKS_DATASET_ATTR);
-                if (hds == null) {
-                    hds = new HyracksDataset(hcc, ResultReader.FRAME_SIZE, ResultReader.NUM_READERS);
-                    context.setAttribute(HYRACKS_DATASET_ATTR, hds);
-                }
+            InputStream is = FeedServlet.class.getResourceAsStream(resourcePath);
+            if (is == null) {
+                response.sendError(HttpServletResponse.SC_NOT_FOUND);
+                return;
             }
 
-        } catch (ParseException | TokenMgrError | edu.uci.ics.asterix.aqlplus.parser.TokenMgrError pe) {
-            GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, pe.getMessage(), pe);
-            String errorMessage = ResultUtils.buildParseExceptionMessage(pe, "");
-            JSONObject errorResp = ResultUtils.getErrorResponse(2, errorMessage, "", "");
-            out.write(errorResp.toString());
-            response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
-        } catch (Exception e) {
-            GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, e.getMessage(), e);
-            ResultUtils.apiErrorHandler(out, e);
-            response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
+            // Special handler for font files and .png resources
+            if (resourcePath.endsWith(".png")) {
+
+                BufferedImage img = ImageIO.read(is);
+                OutputStream outputStream = response.getOutputStream();
+                String formatName = "png";
+                response.setContentType("image/png");
+                ImageIO.write(img, formatName, outputStream);
+                outputStream.close();
+                return;
+
+            }
+
+            response.setCharacterEncoding("utf-8");
+            InputStreamReader isr = new InputStreamReader(is);
+            StringBuilder sb = new StringBuilder();
+            BufferedReader br = new BufferedReader(isr);
+            String line = br.readLine();
+
+            while (line != null) {
+                sb.append(line);
+                line = br.readLine();
+            }
+
+            String outStr = null;
+            if (requestURI.startsWith("/webui/static")) {
+                outStr = sb.toString();
+            } else {
+                MetadataManager.INSTANCE.init();
+                MetadataTransactionContext ctx = MetadataManager.INSTANCE.beginTransaction();
+                List<FeedActivity> lfa = MetadataManager.INSTANCE.getActiveFeeds(ctx, null, null);
+                StringBuilder ldStr = new StringBuilder();
+                ldStr.append("Feeds");
+                for (FeedActivity feedActivity : lfa) {
+                    ldStr.append("<br />");
+                    ldStr.append("<br />");
+                    ldStr.append("<a href=\"/feed/dashboard?dataverse=" + feedActivity.getDataverseName() + "&feed="
+                            + feedActivity.getFeedName() + "&dataset=" + feedActivity.getDatasetName() + "\">"
+                            + feedActivity + "</a>");
+                    ldStr.append("<br />");
+                }
+
+                outStr = String.format(sb.toString(), ldStr.toString());
+                MetadataManager.INSTANCE.commitTransaction(ctx);
+
+            }
+
+            PrintWriter out = response.getWriter();
+            out.println(outStr);
+        } catch (ACIDException | MetadataException e) {
+
         }
     }
-
 }
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
index 38f0fd2..b197765 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
@@ -568,7 +568,7 @@
                 }
 
                 for (int i = 0; i < nodegroupCardinality - numChosen; i++) {
-                    int selected = i + random.nextInt(nodeNames.size() - i);
+                    int selected = i + random.nextInt(nodeNamesClone.size() - i);
                     int selNodeIndex = b[selected];
                     selectedNodes.add(nodes[selNodeIndex]);
                     int temp = b[0];
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
index 4b60764..c2f9192 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
@@ -23,6 +23,8 @@
 
 import edu.uci.ics.asterix.api.http.servlet.APIServlet;
 import edu.uci.ics.asterix.api.http.servlet.DDLAPIServlet;
+import edu.uci.ics.asterix.api.http.servlet.FeedDashboardServlet;
+import edu.uci.ics.asterix.api.http.servlet.FeedDataProviderServlet;
 import edu.uci.ics.asterix.api.http.servlet.FeedServlet;
 import edu.uci.ics.asterix.api.http.servlet.QueryAPIServlet;
 import edu.uci.ics.asterix.api.http.servlet.QueryResultAPIServlet;
@@ -145,5 +147,9 @@
 
         feedServer.setHandler(context);
         context.addServlet(new ServletHolder(new FeedServlet()), "/");
+        context.addServlet(new ServletHolder(new FeedDashboardServlet()), "/feed/dashboard");
+        context.addServlet(new ServletHolder(new FeedDataProviderServlet()), "/feed/data");
+
+        // add paths here
     }
 }
\ No newline at end of file
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java
index 63be317..74c2c8a 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java
@@ -26,6 +26,8 @@
 import java.util.Map.Entry;
 import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.logging.Level;
 import java.util.logging.Logger;
@@ -65,6 +67,8 @@
 import edu.uci.ics.asterix.metadata.feeds.FeedMetaOperatorDescriptor;
 import edu.uci.ics.asterix.metadata.feeds.FeedPolicyAccessor;
 import edu.uci.ics.asterix.metadata.feeds.IFeedMessage;
+import edu.uci.ics.asterix.metadata.feeds.MessageListener;
+import edu.uci.ics.asterix.metadata.feeds.MessageListener.IMessageAnalyzer;
 import edu.uci.ics.asterix.metadata.feeds.SuperFeedManager;
 import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
 import edu.uci.ics.asterix.om.util.AsterixClusterProperties;
@@ -100,9 +104,14 @@
 
     public static FeedLifecycleListener INSTANCE = new FeedLifecycleListener();
 
+    public static final int FEED_HEALTH_PORT = 2999;
+
     private LinkedBlockingQueue<Message> jobEventInbox;
     private LinkedBlockingQueue<IClusterManagementWorkResponse> responseInbox;
     private Map<FeedInfo, List<String>> dependentFeeds = new HashMap<FeedInfo, List<String>>();
+    private IMessageAnalyzer healthDataParser;
+    private MessageListener feedHealthDataListener;
+    private ExecutorService executorService = Executors.newCachedThreadPool();
 
     private State state;
 
@@ -111,11 +120,20 @@
         feedJobNotificationHandler = new FeedJobNotificationHandler(jobEventInbox);
         responseInbox = new LinkedBlockingQueue<IClusterManagementWorkResponse>();
         feedWorkRequestResponseHandler = new FeedWorkRequestResponseHandler(responseInbox);
-
-        new Thread(feedJobNotificationHandler).start();
-        new Thread(feedWorkRequestResponseHandler).start();
+        this.healthDataParser = new FeedHealthDataParser();
+        feedHealthDataListener = new MessageListener(FEED_HEALTH_PORT, healthDataParser);
+        try {
+            feedHealthDataListener.start();
+        } catch (Exception e) {
+            if (LOGGER.isLoggable(Level.WARNING)) {
+                LOGGER.warning("Unable to start Feed health data listener");
+            }
+        }
+        executorService.execute(feedJobNotificationHandler);
+        executorService.execute(feedWorkRequestResponseHandler);
         ClusterManager.INSTANCE.registerSubscriber(this);
         state = AsterixClusterProperties.INSTANCE.getState();
+
     }
 
     private final FeedJobNotificationHandler feedJobNotificationHandler;
@@ -194,6 +212,15 @@
         }
     }
 
+    private static class FeedHealthDataParser implements IMessageAnalyzer {
+
+        @Override
+        public void receiveMessage(String message) {
+            System.out.println(" HEALTH DATA RECEIVED :" + message);
+        }
+
+    }
+
     private static class FeedJobNotificationHandler implements Runnable, Serializable {
 
         private static final long serialVersionUID = 1L;
@@ -309,14 +336,23 @@
                     ingestLocs.append(ingestLoc);
                     ingestLocs.append(",");
                 }
+                if (ingestLocs.length() > 1) {
+                    ingestLocs.deleteCharAt(ingestLocs.length() - 1);
+                }
                 for (String computeLoc : feedInfo.computeLocations) {
                     computeLocs.append(computeLoc);
                     computeLocs.append(",");
                 }
+                if (computeLocs.length() > 1) {
+                    computeLocs.deleteCharAt(computeLocs.length() - 1);
+                }
                 for (String storageLoc : feedInfo.storageLocations) {
                     storageLocs.append(storageLoc);
                     storageLocs.append(",");
                 }
+                if (storageLocs.length() > 1) {
+                    storageLocs.deleteCharAt(storageLocs.length() - 1);
+                }
 
                 feedActivityDetails.put(FeedActivity.FeedActivityDetails.INGEST_LOCATIONS, ingestLocs.toString());
                 feedActivityDetails.put(FeedActivity.FeedActivityDetails.COMPUTE_LOCATIONS, computeLocs.toString());
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
index b806be0..acda21e 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
@@ -1371,6 +1371,10 @@
             RemoteException {
         List<FeedActivity> activeFeeds = new ArrayList<FeedActivity>();
         Map<FeedConnectionId, FeedActivity> aFeeds = new HashMap<FeedConnectionId, FeedActivity>();
+        boolean invalidArgs = (dataverse == null && dataset != null);
+        if (invalidArgs) {
+            throw new MetadataException("Invalid arguments " + dataverse + " " + dataset);
+        }
         try {
             ITupleReference searchKey = createTuple();
             FeedActivityTupleTranslator tupleReaderWriter = new FeedActivityTupleTranslator(true);
@@ -1382,10 +1386,13 @@
             FeedConnectionId fid = null;
             Set<FeedConnectionId> terminatedFeeds = new HashSet<FeedConnectionId>();
             for (FeedActivity fa : results) {
-                if (dataset != null
-                        && (!fa.getDataverseName().equals(dataverse) || !dataset.equals(fa.getDatasetName()))) {
-                    continue;
+                if (dataverse != null) {
+                    if (dataset != null
+                            && (!fa.getDataverseName().equals(dataverse) || !dataset.equals(fa.getDatasetName()))) {
+                        continue;
+                    }
                 }
+
                 fid = new FeedConnectionId(fa.getDataverseName(), fa.getFeedName(), fa.getDatasetName());
                 switch (fa.getActivityType()) {
                     case FEED_RESUME:
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedFrameWriter.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedFrameWriter.java
index 3a481f3..b9c5edc 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedFrameWriter.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedFrameWriter.java
@@ -42,7 +42,7 @@
 
     private String nodeId;
 
-    private long FLUSH_THRESHOLD_TIME = 5000;
+    public static final long FLUSH_THRESHOLD_TIME = 5000;
 
     private FramePushWait framePushWait;
 
@@ -206,21 +206,23 @@
             numTuplesInInterval.set(numTuplesInInterval.get() + numTuples);
         }
 
+        public void resetSuperFeedManager(SuperFeedManager sfm) {
+            this.sfm = sfm;
+        }
+
         @Override
         public void run() {
             if (state.equals(State.WAITING_FOR_FLUSH_COMPLETION)) {
                 long currentTime = System.currentTimeMillis();
                 if (currentTime - startTime > flushThresholdTime) {
                     if (LOGGER.isLoggable(Level.SEVERE)) {
-                        LOGGER.severe("CONGESTION!!!!!!!!  BY " + nodePushable);
+                        LOGGER.severe("Congestion reported by " + feedRuntimeType + " [" + partition + "]");
                     }
                     reportCongestionToSFM(currentTime - startTime);
                 }
             }
             if (collectThroughput) {
                 int instantTput = (int) ((numTuplesInInterval.get() * 1000) / period);
-                System.out.println("Instantaneous throughput " + instantTput + " (" + feedRuntimeType + "[" + partition
-                        + "]" + ")");
                 reportThroughputToSFM(instantTput);
             }
             numTuplesInInterval.set(0);
@@ -376,4 +378,9 @@
         return "MaterializingFrameWriter using " + writer;
     }
 
+    public void resetSuperFeedManager() {
+        sfm = null;
+        framePushWait.resetSuperFeedManager(null);
+    }
+
 }
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
index 8f3a9b6..c0f6222 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
@@ -21,7 +21,6 @@
 import java.util.logging.Logger;
 
 import edu.uci.ics.asterix.metadata.feeds.AdapterRuntimeManager.State;
-import edu.uci.ics.asterix.metadata.feeds.FeedRuntime.FeedRuntimeId;
 import edu.uci.ics.asterix.metadata.feeds.FeedRuntime.FeedRuntimeType;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -102,6 +101,7 @@
                     LOGGER.info("Continuing on failure as per feed policy");
                 }
                 adapterRuntimeMgr.setState(State.INACTIVE_INGESTION);
+                FeedManager.INSTANCE.deregisterSuperFeedManager(feedId);
                 writer.fail();
                 /*
                  * Do not de-register feed 
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/SuperFeedManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/SuperFeedManager.java
index 2f499e6..a51bf6e 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/SuperFeedManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/SuperFeedManager.java
@@ -15,11 +15,9 @@
 package edu.uci.ics.asterix.metadata.feeds;
 
 import java.io.IOException;
-import java.io.InputStream;
+import java.io.OutputStream;
 import java.io.Serializable;
-import java.net.ServerSocket;
 import java.net.Socket;
-import java.nio.CharBuffer;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
@@ -29,6 +27,9 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import edu.uci.ics.asterix.metadata.feeds.FeedRuntime.FeedRuntimeType;
+import edu.uci.ics.asterix.metadata.feeds.MessageListener.IMessageAnalyzer;
+import edu.uci.ics.asterix.om.util.AsterixClusterProperties;
 import edu.uci.ics.asterix.om.util.AsterixRuntimeUtil;
 
 public class SuperFeedManager implements Serializable {
@@ -44,10 +45,12 @@
 
     private final FeedConnectionId feedConnectionId;
 
-    private SuperFeedManagerListener listener;
+    private MessageListener listener;
 
     private boolean isLocal = false;
 
+    private transient ExecutorService executorService;
+
     public enum FeedReportMessageType {
         CONGESTION,
         THROUGHPUT
@@ -98,7 +101,10 @@
 
     public void start() throws IOException {
         if (listener == null) {
-            listener = new SuperFeedManagerListener(port);
+            if (executorService == null) {
+                executorService = Executors.newCachedThreadPool();
+            }
+            listener = new MessageListener(port, new SuperFeedManagerMessageAnalzer(executorService));
             listener.start();
         }
     }
@@ -107,6 +113,7 @@
         if (listener != null) {
             listener.stop();
         }
+        executorService.shutdownNow();
     }
 
     @Override
@@ -114,111 +121,119 @@
         return feedConnectionId + "[" + nodeId + "(" + host + ")" + ":" + port + "]";
     }
 
-    private static class SuperFeedManagerListener implements Serializable {
+    private static class SuperFeedManagerMessageAnalzer implements IMessageAnalyzer {
 
-        private static final long serialVersionUID = 1L;
-        private ServerSocket server;
-        private int port;
-        private LinkedBlockingQueue<String> messages;
-        private CongestionAnalyzer ca;
+        private String ccHost;
+        private CongestionAnalyzer congestionAnalyzer;
+        private LinkedBlockingQueue<FeedReport> congestionInbox = new LinkedBlockingQueue<FeedReport>();
 
-        private ExecutorService executorService = Executors.newFixedThreadPool(10);
-
-        public SuperFeedManagerListener(int port) throws IOException {
-            this.port = port;
-            messages = new LinkedBlockingQueue<String>();
-            ca = new CongestionAnalyzer(messages);
-            executorService.execute(ca);
+        public SuperFeedManagerMessageAnalzer(ExecutorService executorService) {
+            ccHost = AsterixClusterProperties.INSTANCE.getCluster().getMasterNode().getClusterIp();
+            congestionAnalyzer = new CongestionAnalyzer(congestionInbox);
+            executorService.execute(congestionAnalyzer);
         }
 
-        public void stop() {
-            executorService.shutdown();
-        }
-
-        public void start() throws IOException {
-            server = new ServerSocket(port);
-            while (true) {
-                Socket client = server.accept();
-                executorService.execute(new MessageProcessor(client, this));
+        @Override
+        public void receiveMessage(String message) {
+            Socket socket = null;
+            OutputStream os = null;
+            try {
+                FeedReport report = new FeedReport(message);
+                FeedReportMessageType mesgType = report.getReportType();
+                switch (mesgType) {
+                    case THROUGHPUT:
+                        //send message to FeedHealthDataReceiver at CC (2999)
+                        socket = new Socket(ccHost, 2999);
+                        os = socket.getOutputStream();
+                        os.write(message.getBytes());
+                        break;
+                    case CONGESTION:
+                        congestionInbox.add(report);
+                        break;
+                }
+                if (LOGGER.isLoggable(Level.WARNING)) {
+                    LOGGER.warning(message);
+                }
+                if (os != null) {
+                    os.close();
+                }
+                if (socket != null) {
+                    socket.close();
+                }
+            } catch (IOException ioe) {
+                ioe.printStackTrace();
+                if (LOGGER.isLoggable(Level.WARNING)) {
+                    LOGGER.warning("unable to send message to FeedHealthDataReceiver");
+                }
             }
         }
 
-        public synchronized void notifyMessage(String s) {
-            messages.add(s);
-        }
     }
 
     private static class CongestionAnalyzer implements Runnable {
 
-        private LinkedBlockingQueue<String> messages;
+        private final LinkedBlockingQueue<FeedReport> inbox;
+        private FeedReport lastMaxReport;
+        private int congestionCount;
+        private long lastReportedCongestion = 0;
+        private long closeEnoughTimeBound = FeedFrameWriter.FLUSH_THRESHOLD_TIME * 2;
 
-        public CongestionAnalyzer(LinkedBlockingQueue<String> messages) {
-            this.messages = messages;
+        public CongestionAnalyzer(LinkedBlockingQueue<FeedReport> inbox) {
+            this.inbox = inbox;
         }
 
         @Override
         public void run() {
-            try {
-                while (true) {
-                    try {
-                        String message = messages.take();
-                        String[] messageComponents = message.split("|");
-                        SuperFeedManager.FeedReportMessageType mesgType = FeedReportMessageType
-                                .valueOf(messageComponents[0]);
-                        switch (mesgType) {
-                            case THROUGHPUT:
-                            case CONGESTION:
-                        }
-                        if (LOGGER.isLoggable(Level.WARNING)) {
-                            LOGGER.warning(message);
-                        }
-                    } catch (InterruptedException ie) {
-                        throw ie;
-                    } catch (Exception e) {
-
-                    }
-
-                }
-
-            } catch (InterruptedException ie) {
-                // do nothing
-            }
-        }
-    }
-
-    private static class MessageProcessor implements Runnable {
-
-        private SuperFeedManagerListener listener;
-        private Socket client;
-        private static final char EOL = (char) "\n".getBytes()[0];
-
-        public MessageProcessor(Socket client, SuperFeedManagerListener listener) {
-            this.listener = listener;
-            this.client = client;
-        }
-
-        @Override
-        public void run() {
-            CharBuffer buffer = CharBuffer.allocate(2000);
-            char ch;
-            try {
-                InputStream in = client.getInputStream();
-                ch = (char) in.read();
-                while (ch != EOL) {
-                    buffer.put(ch);
-                    ch = (char) in.read();
-                }
-                buffer.flip();
-                String s = new String(buffer.array());
-                listener.notifyMessage(s);
-            } catch (IOException ioe) {
-            } finally {
+            FeedReport report;
+            while (true) {
                 try {
-                    client.close();
-                } catch (IOException ioe) {
-                    // do nothing
+                    report = inbox.take();
+                    long currentReportedCongestionTime = System.currentTimeMillis();
+                    boolean closeEnough = lastReportedCongestion == 0
+                            || currentReportedCongestionTime - lastReportedCongestion < closeEnoughTimeBound;
+                    if (lastMaxReport == null) {
+                        lastMaxReport = report;
+                        if (closeEnough) {
+                            congestionCount++;
+                        }
+                    } else {
+                        if (report.compareTo(lastMaxReport) > 0) {
+                            lastMaxReport = report;
+                            congestionCount = 1;
+                        } else if (report.compareTo(lastMaxReport) == 0) {
+                            lastMaxReport = report;
+                            if (closeEnough) {
+                                congestionCount++;
+                                if (congestionCount > 5) {
+                                    FeedRuntimeType sourceOfCongestion = null;
+                                    switch (lastMaxReport.getRuntimeType()) {
+                                        case INGESTION:
+                                            sourceOfCongestion = FeedRuntimeType.COMPUTE;
+                                            break;
+                                        case COMPUTE:
+                                            sourceOfCongestion = FeedRuntimeType.STORAGE;
+                                            break;
+                                        case STORAGE:
+                                        case COMMIT:
+                                            sourceOfCongestion = FeedRuntimeType.COMMIT;
+                                            break;
+                                    }
+                                    if (LOGGER.isLoggable(Level.WARNING)) {
+                                        LOGGER.warning(" Need elasticity at " + sourceOfCongestion + " as per report "
+                                                + lastMaxReport);
+                                    }
+                                }
+                            }
+                        }
+                    }
+                    lastReportedCongestion = System.currentTimeMillis();
+
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
                 }
             }
+
         }
     }
-}
+
+}
\ No newline at end of file