mechanism for collection of feed statistics (checkpoint 1)
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedDashboardServlet.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedDashboardServlet.java
index 8955342..89af26a 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedDashboardServlet.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedDashboardServlet.java
@@ -21,6 +21,12 @@
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.PrintWriter;
+import java.net.Socket;
+import java.nio.CharBuffer;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
import javax.imageio.ImageIO;
import javax.servlet.http.HttpServlet;
@@ -28,17 +34,22 @@
import javax.servlet.http.HttpServletResponse;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
+import edu.uci.ics.asterix.hyracks.bootstrap.FeedLifecycleListener;
import edu.uci.ics.asterix.metadata.MetadataException;
import edu.uci.ics.asterix.metadata.MetadataManager;
import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
import edu.uci.ics.asterix.metadata.entities.Feed;
+import edu.uci.ics.asterix.metadata.entities.FeedActivity;
+import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityType;
+import edu.uci.ics.asterix.metadata.feeds.FeedConnectionId;
+import edu.uci.ics.asterix.metadata.feeds.RemoteSocketMessageListener;
public class FeedDashboardServlet extends HttpServlet {
private static final long serialVersionUID = 1L;
- private static final String HYRACKS_CONNECTION_ATTR = "edu.uci.ics.asterix.HYRACKS_CONNECTION";
+ private static final Logger LOGGER = Logger.getLogger(FeedDashboardServlet.class.getName());
- private static final String HYRACKS_DATASET_ATTR = "edu.uci.ics.asterix.HYRACKS_DATASET";
+ private static final char EOL = (char) "\n".getBytes()[0];
@Override
public void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException {
@@ -86,6 +97,7 @@
String feedName = request.getParameter("feed");
String datasetName = request.getParameter("dataset");
String dataverseName = request.getParameter("dataverse");
+ FeedConnectionId feedId = new FeedConnectionId(dataverseName, feedName, datasetName);
String outStr = null;
if (requestURI.startsWith("/webui/static")) {
@@ -97,7 +109,18 @@
Feed feed = MetadataManager.INSTANCE.getFeed(ctx, dataverseName, feedName);
MetadataManager.INSTANCE.commitTransaction(ctx);
+ FeedActivity activity = MetadataManager.INSTANCE.getRecentActivityOnFeedConnection(ctx, feedId,
+ FeedActivityType.FEED_BEGIN, FeedActivityType.FEED_RESUME);
+ Map<String, String> activityDetails = activity.getFeedActivityDetails();
+
+ String host = activityDetails.get(FeedActivity.FeedActivityDetails.SUPER_FEED_MANAGER_HOST);
+ int port = Integer.parseInt(activityDetails
+ .get(FeedActivity.FeedActivityDetails.SUPER_FEED_MANAGER_PORT));
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info(" Feed Report Source :" + host + "[" + port + "]");
+ }
outStr = String.format(sb.toString(), dataverseName, datasetName, feedName);
+ initiateSubscription(feedId, host, port);
}
PrintWriter out = response.getWriter();
@@ -107,4 +130,28 @@
}
}
+ private void initiateSubscription(FeedConnectionId feedId, String host, int port) throws IOException {
+ LinkedBlockingQueue<String> outbox = new LinkedBlockingQueue<String>();
+ Socket sc = new Socket(host, port);
+ InputStream in = sc.getInputStream();
+
+ CharBuffer buffer = CharBuffer.allocate(50);
+ char ch = 0;
+ while (ch != EOL) {
+ buffer.put(ch);
+ ch = (char) in.read();
+ }
+ buffer.flip();
+ String s = new String(buffer.array());
+ int feedSubscriptionPort = Integer.parseInt(s.trim());
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Response from Super Feed Manager Report Service " + port + " will connect at " + host + " "
+ + port);
+ }
+
+ FeedLifecycleListener.INSTANCE.registerFeedReportQueue(feedId, outbox);
+ RemoteSocketMessageListener listener = new RemoteSocketMessageListener(host, feedSubscriptionPort, outbox);
+ listener.start();
+ }
+
}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedDataProviderServlet.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedDataProviderServlet.java
index 65b3883..1bec374 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedDataProviderServlet.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedDataProviderServlet.java
@@ -17,6 +17,7 @@
import java.io.IOException;
import java.io.PrintWriter;
import java.util.Random;
+import java.util.concurrent.LinkedBlockingQueue;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
@@ -25,6 +26,9 @@
import org.json.JSONException;
import org.json.JSONObject;
+import edu.uci.ics.asterix.hyracks.bootstrap.FeedLifecycleListener;
+import edu.uci.ics.asterix.metadata.feeds.FeedConnectionId;
+
public class FeedDataProviderServlet extends HttpServlet {
private static final long serialVersionUID = 1L;
@@ -37,6 +41,8 @@
String datasetName = request.getParameter("dataset");
String dataverseName = request.getParameter("dataverse");
+ String report = getFeedReport(feedName, datasetName, dataverseName);
+ System.out.println(" RECEIVED REPORT " + report);
JSONObject obj = new JSONObject();
try {
obj.put("time", System.currentTimeMillis());
@@ -49,4 +55,15 @@
out.println(obj.toString());
}
+ private String getFeedReport(String feedName, String datasetName, String dataverseName) {
+ FeedConnectionId feedId = new FeedConnectionId(dataverseName, feedName, datasetName);
+ LinkedBlockingQueue<String> queue = FeedLifecycleListener.INSTANCE.getFeedReportQueue(feedId);
+ String report = null;
+ try {
+ report = queue.take();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ return report;
+ }
}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java
index 5f59bea..00bed93 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java
@@ -110,7 +110,7 @@
private IMessageAnalyzer healthDataParser;
private MessageListener feedHealthDataListener;
private ExecutorService executorService = Executors.newCachedThreadPool();
-
+ private Map<FeedConnectionId, LinkedBlockingQueue<String>> feedReportQueue = new HashMap<FeedConnectionId, LinkedBlockingQueue<String>>();
private State state;
private FeedLifecycleListener() {
@@ -177,6 +177,18 @@
}
+ public void registerFeedReportQueue(FeedConnectionId feedId, LinkedBlockingQueue<String> queue) {
+ feedReportQueue.put(feedId, queue);
+ }
+
+ public void deregisterFeedReportQueue(FeedConnectionId feedId, LinkedBlockingQueue<String> queue) {
+ feedReportQueue.remove(feedId);
+ }
+
+ public LinkedBlockingQueue<String> getFeedReportQueue(FeedConnectionId feedId) {
+ return feedReportQueue.get(feedId);
+ }
+
private static class Message {
public JobId jobId;
@@ -228,6 +240,7 @@
private Map<JobId, FeedInfo> registeredFeeds = new HashMap<JobId, FeedInfo>();
private FeedMessenger feedMessenger;
private LinkedBlockingQueue<FeedMessengerMessage> messengerOutbox;
+ private int superFeedManagerPort = 3000;
public FeedJobNotificationHandler(LinkedBlockingQueue<Message> inbox) {
this.inbox = inbox;
@@ -377,13 +390,18 @@
throw new IllegalStateException("Unknown node " + superFeedManagerHost);
}
+ feedActivityDetails.put(FeedActivity.FeedActivityDetails.SUPER_FEED_MANAGER_HOST, hostIp);
+ feedActivityDetails.put(FeedActivity.FeedActivityDetails.SUPER_FEED_MANAGER_PORT, ""
+ + superFeedManagerPort);
+
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Super Feed Manager for " + feedInfo.feedConnectionId + " is " + hostIp + " node "
+ superFeedManagerHost);
}
- FeedManagerElectMessage feedMessage = new FeedManagerElectMessage(hostIp, superFeedManagerHost, 3000,
- feedInfo.feedConnectionId);
+ FeedManagerElectMessage feedMessage = new FeedManagerElectMessage(hostIp, superFeedManagerHost,
+ superFeedManagerPort, feedInfo.feedConnectionId);
+ superFeedManagerPort += SuperFeedManager.PORT_RANGE_ASSIGNED;
messengerOutbox.add(new FeedMessengerMessage(feedMessage, feedInfo));
MetadataManager.INSTANCE.acquireWriteLatch();
MetadataTransactionContext mdTxnCtx = null;
diff --git a/asterix-app/src/main/resources/feed/dashboard.html b/asterix-app/src/main/resources/feed/dashboard.html
index 72d996d..addf706 100644
--- a/asterix-app/src/main/resources/feed/dashboard.html
+++ b/asterix-app/src/main/resources/feed/dashboard.html
@@ -5,7 +5,9 @@
var random = new TimeSeries();
setInterval(function() {
- random.append(new Date().getTime(), Math.random() * 10000);
+
+ $.get('/feed/data?dataverse=%s&dataset=%s&feed=%s', function(data) {
+ feedSeries.append(data[“time”], data[“value”]);
}, 500);
function createTimeline() {