fixed issue related to election of super feed manager
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedServlet.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedServlet.java
index dbcf566..19fec30 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedServlet.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedServlet.java
@@ -14,33 +14,26 @@
*/
package edu.uci.ics.asterix.api.http.servlet;
+import java.awt.image.BufferedImage;
+import java.io.BufferedReader;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
import java.io.PrintWriter;
import java.util.List;
-import java.util.logging.Level;
-import javax.servlet.ServletContext;
+import javax.imageio.ImageIO;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
-import org.json.JSONObject;
-
-import edu.uci.ics.asterix.api.common.APIFramework.DisplayFormat;
-import edu.uci.ics.asterix.api.common.SessionConfig;
-import edu.uci.ics.asterix.aql.base.Statement;
-import edu.uci.ics.asterix.aql.base.Statement.Kind;
-import edu.uci.ics.asterix.aql.parser.AQLParser;
-import edu.uci.ics.asterix.aql.parser.ParseException;
-import edu.uci.ics.asterix.aql.parser.TokenMgrError;
-import edu.uci.ics.asterix.aql.translator.AqlTranslator;
-import edu.uci.ics.asterix.common.config.GlobalConfig;
+import edu.uci.ics.asterix.common.exceptions.ACIDException;
+import edu.uci.ics.asterix.metadata.MetadataException;
import edu.uci.ics.asterix.metadata.MetadataManager;
-import edu.uci.ics.asterix.result.ResultReader;
-import edu.uci.ics.asterix.result.ResultUtils;
-import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
-import edu.uci.ics.hyracks.api.dataset.IHyracksDataset;
-import edu.uci.ics.hyracks.client.dataset.HyracksDataset;
+import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
+import edu.uci.ics.asterix.metadata.entities.Dataverse;
+import edu.uci.ics.asterix.metadata.entities.FeedActivity;
public class FeedServlet extends HttpServlet {
private static final long serialVersionUID = 1L;
@@ -51,48 +44,74 @@
@Override
public void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException {
- response.setContentType("application/json");
- response.setCharacterEncoding("utf-8");
+ String resourcePath = null;
+ String requestURI = request.getRequestURI();
- PrintWriter out = response.getWriter();
-
- DisplayFormat format = DisplayFormat.HTML;
-
- String contentType = request.getContentType();
-
- if ((contentType == null) || (contentType.equals("text/plain"))) {
- format = DisplayFormat.TEXT;
- } else if (contentType.equals("application/json")) {
- format = DisplayFormat.JSON;
+ if (requestURI.equals("/")) {
+ response.setContentType("text/html");
+ resourcePath = "/feed/home.html";
+ } else {
+ resourcePath = requestURI;
}
-
- ServletContext context = getServletContext();
- IHyracksClientConnection hcc;
- IHyracksDataset hds;
-
try {
- synchronized (context) {
- hcc = (IHyracksClientConnection) context.getAttribute(HYRACKS_CONNECTION_ATTR);
-
- hds = (IHyracksDataset) context.getAttribute(HYRACKS_DATASET_ATTR);
- if (hds == null) {
- hds = new HyracksDataset(hcc, ResultReader.FRAME_SIZE, ResultReader.NUM_READERS);
- context.setAttribute(HYRACKS_DATASET_ATTR, hds);
- }
+ InputStream is = FeedServlet.class.getResourceAsStream(resourcePath);
+ if (is == null) {
+ response.sendError(HttpServletResponse.SC_NOT_FOUND);
+ return;
}
- } catch (ParseException | TokenMgrError | edu.uci.ics.asterix.aqlplus.parser.TokenMgrError pe) {
- GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, pe.getMessage(), pe);
- String errorMessage = ResultUtils.buildParseExceptionMessage(pe, "");
- JSONObject errorResp = ResultUtils.getErrorResponse(2, errorMessage, "", "");
- out.write(errorResp.toString());
- response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
- } catch (Exception e) {
- GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, e.getMessage(), e);
- ResultUtils.apiErrorHandler(out, e);
- response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
+ // Special handler for font files and .png resources
+ if (resourcePath.endsWith(".png")) {
+
+ BufferedImage img = ImageIO.read(is);
+ OutputStream outputStream = response.getOutputStream();
+ String formatName = "png";
+ response.setContentType("image/png");
+ ImageIO.write(img, formatName, outputStream);
+ outputStream.close();
+ return;
+
+ }
+
+ response.setCharacterEncoding("utf-8");
+ InputStreamReader isr = new InputStreamReader(is);
+ StringBuilder sb = new StringBuilder();
+ BufferedReader br = new BufferedReader(isr);
+ String line = br.readLine();
+
+ while (line != null) {
+ sb.append(line);
+ line = br.readLine();
+ }
+
+ String outStr = null;
+ if (requestURI.startsWith("/webui/static")) {
+ outStr = sb.toString();
+ } else {
+ MetadataManager.INSTANCE.init();
+ MetadataTransactionContext ctx = MetadataManager.INSTANCE.beginTransaction();
+ List<FeedActivity> lfa = MetadataManager.INSTANCE.getActiveFeeds(ctx, null, null);
+ StringBuilder ldStr = new StringBuilder();
+ ldStr.append("Feeds");
+ for (FeedActivity feedActivity : lfa) {
+ ldStr.append("<br />");
+ ldStr.append("<br />");
+ ldStr.append("<a href=\"/feed/dashboard?dataverse=" + feedActivity.getDataverseName() + "&feed="
+ + feedActivity.getFeedName() + "&dataset=" + feedActivity.getDatasetName() + "\">"
+ + feedActivity + "</a>");
+ ldStr.append("<br />");
+ }
+
+ outStr = String.format(sb.toString(), ldStr.toString());
+ MetadataManager.INSTANCE.commitTransaction(ctx);
+
+ }
+
+ PrintWriter out = response.getWriter();
+ out.println(outStr);
+ } catch (ACIDException | MetadataException e) {
+
}
}
-
}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
index 38f0fd2..b197765 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
@@ -568,7 +568,7 @@
}
for (int i = 0; i < nodegroupCardinality - numChosen; i++) {
- int selected = i + random.nextInt(nodeNames.size() - i);
+ int selected = i + random.nextInt(nodeNamesClone.size() - i);
int selNodeIndex = b[selected];
selectedNodes.add(nodes[selNodeIndex]);
int temp = b[0];
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
index 4b60764..c2f9192 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
@@ -23,6 +23,8 @@
import edu.uci.ics.asterix.api.http.servlet.APIServlet;
import edu.uci.ics.asterix.api.http.servlet.DDLAPIServlet;
+import edu.uci.ics.asterix.api.http.servlet.FeedDashboardServlet;
+import edu.uci.ics.asterix.api.http.servlet.FeedDataProviderServlet;
import edu.uci.ics.asterix.api.http.servlet.FeedServlet;
import edu.uci.ics.asterix.api.http.servlet.QueryAPIServlet;
import edu.uci.ics.asterix.api.http.servlet.QueryResultAPIServlet;
@@ -145,5 +147,9 @@
feedServer.setHandler(context);
context.addServlet(new ServletHolder(new FeedServlet()), "/");
+ context.addServlet(new ServletHolder(new FeedDashboardServlet()), "/feed/dashboard");
+ context.addServlet(new ServletHolder(new FeedDataProviderServlet()), "/feed/data");
+
+ // add paths here
}
}
\ No newline at end of file
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java
index 63be317..74c2c8a 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java
@@ -26,6 +26,8 @@
import java.util.Map.Entry;
import java.util.Random;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -65,6 +67,8 @@
import edu.uci.ics.asterix.metadata.feeds.FeedMetaOperatorDescriptor;
import edu.uci.ics.asterix.metadata.feeds.FeedPolicyAccessor;
import edu.uci.ics.asterix.metadata.feeds.IFeedMessage;
+import edu.uci.ics.asterix.metadata.feeds.MessageListener;
+import edu.uci.ics.asterix.metadata.feeds.MessageListener.IMessageAnalyzer;
import edu.uci.ics.asterix.metadata.feeds.SuperFeedManager;
import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
import edu.uci.ics.asterix.om.util.AsterixClusterProperties;
@@ -100,9 +104,14 @@
public static FeedLifecycleListener INSTANCE = new FeedLifecycleListener();
+ public static final int FEED_HEALTH_PORT = 2999;
+
private LinkedBlockingQueue<Message> jobEventInbox;
private LinkedBlockingQueue<IClusterManagementWorkResponse> responseInbox;
private Map<FeedInfo, List<String>> dependentFeeds = new HashMap<FeedInfo, List<String>>();
+ private IMessageAnalyzer healthDataParser;
+ private MessageListener feedHealthDataListener;
+ private ExecutorService executorService = Executors.newCachedThreadPool();
private State state;
@@ -111,11 +120,20 @@
feedJobNotificationHandler = new FeedJobNotificationHandler(jobEventInbox);
responseInbox = new LinkedBlockingQueue<IClusterManagementWorkResponse>();
feedWorkRequestResponseHandler = new FeedWorkRequestResponseHandler(responseInbox);
-
- new Thread(feedJobNotificationHandler).start();
- new Thread(feedWorkRequestResponseHandler).start();
+ this.healthDataParser = new FeedHealthDataParser();
+ feedHealthDataListener = new MessageListener(FEED_HEALTH_PORT, healthDataParser);
+ try {
+ feedHealthDataListener.start();
+ } catch (Exception e) {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Unable to start Feed health data listener");
+ }
+ }
+ executorService.execute(feedJobNotificationHandler);
+ executorService.execute(feedWorkRequestResponseHandler);
ClusterManager.INSTANCE.registerSubscriber(this);
state = AsterixClusterProperties.INSTANCE.getState();
+
}
private final FeedJobNotificationHandler feedJobNotificationHandler;
@@ -194,6 +212,15 @@
}
}
+ private static class FeedHealthDataParser implements IMessageAnalyzer {
+
+ @Override
+ public void receiveMessage(String message) {
+ System.out.println(" HEALTH DATA RECEIVED :" + message);
+ }
+
+ }
+
private static class FeedJobNotificationHandler implements Runnable, Serializable {
private static final long serialVersionUID = 1L;
@@ -309,14 +336,23 @@
ingestLocs.append(ingestLoc);
ingestLocs.append(",");
}
+ if (ingestLocs.length() > 1) {
+ ingestLocs.deleteCharAt(ingestLocs.length() - 1);
+ }
for (String computeLoc : feedInfo.computeLocations) {
computeLocs.append(computeLoc);
computeLocs.append(",");
}
+ if (computeLocs.length() > 1) {
+ computeLocs.deleteCharAt(computeLocs.length() - 1);
+ }
for (String storageLoc : feedInfo.storageLocations) {
storageLocs.append(storageLoc);
storageLocs.append(",");
}
+ if (storageLocs.length() > 1) {
+ storageLocs.deleteCharAt(storageLocs.length() - 1);
+ }
feedActivityDetails.put(FeedActivity.FeedActivityDetails.INGEST_LOCATIONS, ingestLocs.toString());
feedActivityDetails.put(FeedActivity.FeedActivityDetails.COMPUTE_LOCATIONS, computeLocs.toString());
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
index b806be0..acda21e 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
@@ -1371,6 +1371,10 @@
RemoteException {
List<FeedActivity> activeFeeds = new ArrayList<FeedActivity>();
Map<FeedConnectionId, FeedActivity> aFeeds = new HashMap<FeedConnectionId, FeedActivity>();
+ boolean invalidArgs = (dataverse == null && dataset != null);
+ if (invalidArgs) {
+ throw new MetadataException("Invalid arguments " + dataverse + " " + dataset);
+ }
try {
ITupleReference searchKey = createTuple();
FeedActivityTupleTranslator tupleReaderWriter = new FeedActivityTupleTranslator(true);
@@ -1382,10 +1386,13 @@
FeedConnectionId fid = null;
Set<FeedConnectionId> terminatedFeeds = new HashSet<FeedConnectionId>();
for (FeedActivity fa : results) {
- if (dataset != null
- && (!fa.getDataverseName().equals(dataverse) || !dataset.equals(fa.getDatasetName()))) {
- continue;
+ if (dataverse != null) {
+ if (dataset != null
+ && (!fa.getDataverseName().equals(dataverse) || !dataset.equals(fa.getDatasetName()))) {
+ continue;
+ }
}
+
fid = new FeedConnectionId(fa.getDataverseName(), fa.getFeedName(), fa.getDatasetName());
switch (fa.getActivityType()) {
case FEED_RESUME:
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedFrameWriter.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedFrameWriter.java
index 3a481f3..b9c5edc 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedFrameWriter.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedFrameWriter.java
@@ -42,7 +42,7 @@
private String nodeId;
- private long FLUSH_THRESHOLD_TIME = 5000;
+ public static final long FLUSH_THRESHOLD_TIME = 5000;
private FramePushWait framePushWait;
@@ -206,21 +206,23 @@
numTuplesInInterval.set(numTuplesInInterval.get() + numTuples);
}
+ public void resetSuperFeedManager(SuperFeedManager sfm) {
+ this.sfm = sfm;
+ }
+
@Override
public void run() {
if (state.equals(State.WAITING_FOR_FLUSH_COMPLETION)) {
long currentTime = System.currentTimeMillis();
if (currentTime - startTime > flushThresholdTime) {
if (LOGGER.isLoggable(Level.SEVERE)) {
- LOGGER.severe("CONGESTION!!!!!!!! BY " + nodePushable);
+ LOGGER.severe("Congestion reported by " + feedRuntimeType + " [" + partition + "]");
}
reportCongestionToSFM(currentTime - startTime);
}
}
if (collectThroughput) {
int instantTput = (int) ((numTuplesInInterval.get() * 1000) / period);
- System.out.println("Instantaneous throughput " + instantTput + " (" + feedRuntimeType + "[" + partition
- + "]" + ")");
reportThroughputToSFM(instantTput);
}
numTuplesInInterval.set(0);
@@ -376,4 +378,9 @@
return "MaterializingFrameWriter using " + writer;
}
+ public void resetSuperFeedManager() {
+ sfm = null;
+ framePushWait.resetSuperFeedManager(null);
+ }
+
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
index 8f3a9b6..c0f6222 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
@@ -21,7 +21,6 @@
import java.util.logging.Logger;
import edu.uci.ics.asterix.metadata.feeds.AdapterRuntimeManager.State;
-import edu.uci.ics.asterix.metadata.feeds.FeedRuntime.FeedRuntimeId;
import edu.uci.ics.asterix.metadata.feeds.FeedRuntime.FeedRuntimeType;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -102,6 +101,7 @@
LOGGER.info("Continuing on failure as per feed policy");
}
adapterRuntimeMgr.setState(State.INACTIVE_INGESTION);
+ FeedManager.INSTANCE.deregisterSuperFeedManager(feedId);
writer.fail();
/*
* Do not de-register feed
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/SuperFeedManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/SuperFeedManager.java
index 2f499e6..a51bf6e 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/SuperFeedManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/SuperFeedManager.java
@@ -15,11 +15,9 @@
package edu.uci.ics.asterix.metadata.feeds;
import java.io.IOException;
-import java.io.InputStream;
+import java.io.OutputStream;
import java.io.Serializable;
-import java.net.ServerSocket;
import java.net.Socket;
-import java.nio.CharBuffer;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
@@ -29,6 +27,9 @@
import java.util.logging.Level;
import java.util.logging.Logger;
+import edu.uci.ics.asterix.metadata.feeds.FeedRuntime.FeedRuntimeType;
+import edu.uci.ics.asterix.metadata.feeds.MessageListener.IMessageAnalyzer;
+import edu.uci.ics.asterix.om.util.AsterixClusterProperties;
import edu.uci.ics.asterix.om.util.AsterixRuntimeUtil;
public class SuperFeedManager implements Serializable {
@@ -44,10 +45,12 @@
private final FeedConnectionId feedConnectionId;
- private SuperFeedManagerListener listener;
+ private MessageListener listener;
private boolean isLocal = false;
+ private transient ExecutorService executorService;
+
public enum FeedReportMessageType {
CONGESTION,
THROUGHPUT
@@ -98,7 +101,10 @@
public void start() throws IOException {
if (listener == null) {
- listener = new SuperFeedManagerListener(port);
+ if (executorService == null) {
+ executorService = Executors.newCachedThreadPool();
+ }
+ listener = new MessageListener(port, new SuperFeedManagerMessageAnalzer(executorService));
listener.start();
}
}
@@ -107,6 +113,7 @@
if (listener != null) {
listener.stop();
}
+ executorService.shutdownNow();
}
@Override
@@ -114,111 +121,119 @@
return feedConnectionId + "[" + nodeId + "(" + host + ")" + ":" + port + "]";
}
- private static class SuperFeedManagerListener implements Serializable {
+ private static class SuperFeedManagerMessageAnalzer implements IMessageAnalyzer {
- private static final long serialVersionUID = 1L;
- private ServerSocket server;
- private int port;
- private LinkedBlockingQueue<String> messages;
- private CongestionAnalyzer ca;
+ private String ccHost;
+ private CongestionAnalyzer congestionAnalyzer;
+ private LinkedBlockingQueue<FeedReport> congestionInbox = new LinkedBlockingQueue<FeedReport>();
- private ExecutorService executorService = Executors.newFixedThreadPool(10);
-
- public SuperFeedManagerListener(int port) throws IOException {
- this.port = port;
- messages = new LinkedBlockingQueue<String>();
- ca = new CongestionAnalyzer(messages);
- executorService.execute(ca);
+ public SuperFeedManagerMessageAnalzer(ExecutorService executorService) {
+ ccHost = AsterixClusterProperties.INSTANCE.getCluster().getMasterNode().getClusterIp();
+ congestionAnalyzer = new CongestionAnalyzer(congestionInbox);
+ executorService.execute(congestionAnalyzer);
}
- public void stop() {
- executorService.shutdown();
- }
-
- public void start() throws IOException {
- server = new ServerSocket(port);
- while (true) {
- Socket client = server.accept();
- executorService.execute(new MessageProcessor(client, this));
+ @Override
+ public void receiveMessage(String message) {
+ Socket socket = null;
+ OutputStream os = null;
+ try {
+ FeedReport report = new FeedReport(message);
+ FeedReportMessageType mesgType = report.getReportType();
+ switch (mesgType) {
+ case THROUGHPUT:
+ //send message to FeedHealthDataReceiver at CC (2999)
+ socket = new Socket(ccHost, 2999);
+ os = socket.getOutputStream();
+ os.write(message.getBytes());
+ break;
+ case CONGESTION:
+ congestionInbox.add(report);
+ break;
+ }
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning(message);
+ }
+ if (os != null) {
+ os.close();
+ }
+ if (socket != null) {
+ socket.close();
+ }
+ } catch (IOException ioe) {
+ ioe.printStackTrace();
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("unable to send message to FeedHealthDataReceiver");
+ }
}
}
- public synchronized void notifyMessage(String s) {
- messages.add(s);
- }
}
private static class CongestionAnalyzer implements Runnable {
- private LinkedBlockingQueue<String> messages;
+ private final LinkedBlockingQueue<FeedReport> inbox;
+ private FeedReport lastMaxReport;
+ private int congestionCount;
+ private long lastReportedCongestion = 0;
+ private long closeEnoughTimeBound = FeedFrameWriter.FLUSH_THRESHOLD_TIME * 2;
- public CongestionAnalyzer(LinkedBlockingQueue<String> messages) {
- this.messages = messages;
+ public CongestionAnalyzer(LinkedBlockingQueue<FeedReport> inbox) {
+ this.inbox = inbox;
}
@Override
public void run() {
- try {
- while (true) {
- try {
- String message = messages.take();
- String[] messageComponents = message.split("|");
- SuperFeedManager.FeedReportMessageType mesgType = FeedReportMessageType
- .valueOf(messageComponents[0]);
- switch (mesgType) {
- case THROUGHPUT:
- case CONGESTION:
- }
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning(message);
- }
- } catch (InterruptedException ie) {
- throw ie;
- } catch (Exception e) {
-
- }
-
- }
-
- } catch (InterruptedException ie) {
- // do nothing
- }
- }
- }
-
- private static class MessageProcessor implements Runnable {
-
- private SuperFeedManagerListener listener;
- private Socket client;
- private static final char EOL = (char) "\n".getBytes()[0];
-
- public MessageProcessor(Socket client, SuperFeedManagerListener listener) {
- this.listener = listener;
- this.client = client;
- }
-
- @Override
- public void run() {
- CharBuffer buffer = CharBuffer.allocate(2000);
- char ch;
- try {
- InputStream in = client.getInputStream();
- ch = (char) in.read();
- while (ch != EOL) {
- buffer.put(ch);
- ch = (char) in.read();
- }
- buffer.flip();
- String s = new String(buffer.array());
- listener.notifyMessage(s);
- } catch (IOException ioe) {
- } finally {
+ FeedReport report;
+ while (true) {
try {
- client.close();
- } catch (IOException ioe) {
- // do nothing
+ report = inbox.take();
+ long currentReportedCongestionTime = System.currentTimeMillis();
+ boolean closeEnough = lastReportedCongestion == 0
+ || currentReportedCongestionTime - lastReportedCongestion < closeEnoughTimeBound;
+ if (lastMaxReport == null) {
+ lastMaxReport = report;
+ if (closeEnough) {
+ congestionCount++;
+ }
+ } else {
+ if (report.compareTo(lastMaxReport) > 0) {
+ lastMaxReport = report;
+ congestionCount = 1;
+ } else if (report.compareTo(lastMaxReport) == 0) {
+ lastMaxReport = report;
+ if (closeEnough) {
+ congestionCount++;
+ if (congestionCount > 5) {
+ FeedRuntimeType sourceOfCongestion = null;
+ switch (lastMaxReport.getRuntimeType()) {
+ case INGESTION:
+ sourceOfCongestion = FeedRuntimeType.COMPUTE;
+ break;
+ case COMPUTE:
+ sourceOfCongestion = FeedRuntimeType.STORAGE;
+ break;
+ case STORAGE:
+ case COMMIT:
+ sourceOfCongestion = FeedRuntimeType.COMMIT;
+ break;
+ }
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning(" Need elasticity at " + sourceOfCongestion + " as per report "
+ + lastMaxReport);
+ }
+ }
+ }
+ }
+ }
+ lastReportedCongestion = System.currentTimeMillis();
+
+ } catch (InterruptedException e) {
+ e.printStackTrace();
}
}
+
}
}
-}
+
+}
\ No newline at end of file