1) fixed bug in super feed manager (de) initialization 2) experimneting with feed manager
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedDashboardServlet.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedDashboardServlet.java
new file mode 100644
index 0000000..d035610
--- /dev/null
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedDashboardServlet.java
@@ -0,0 +1,119 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.api.http.servlet;
+
+import java.awt.image.BufferedImage;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.PrintWriter;
+
+import javax.imageio.ImageIO;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import edu.uci.ics.asterix.common.exceptions.ACIDException;
+import edu.uci.ics.asterix.metadata.MetadataException;
+import edu.uci.ics.asterix.metadata.MetadataManager;
+import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
+import edu.uci.ics.asterix.metadata.entities.Feed;
+
+public class FeedDashboardServlet extends HttpServlet {
+ private static final long serialVersionUID = 1L;
+
+ private static final String HYRACKS_CONNECTION_ATTR = "edu.uci.ics.asterix.HYRACKS_CONNECTION";
+
+ private static final String HYRACKS_DATASET_ATTR = "edu.uci.ics.asterix.HYRACKS_DATASET";
+
+ @Override
+ public void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException {
+ String resourcePath = null;
+ String requestURI = request.getRequestURI();
+
+ if (requestURI.equals("/")) {
+ response.setContentType("text/html");
+ resourcePath = "/feed/dashboard.html";
+ } else {
+ resourcePath = requestURI + ".html";
+ }
+
+ try {
+ InputStream is = FeedDashboardServlet.class.getResourceAsStream(resourcePath);
+ if (is == null) {
+ response.sendError(HttpServletResponse.SC_NOT_FOUND);
+ return;
+ }
+
+ // Special handler for font files and .png resources
+ if (resourcePath.endsWith(".png")) {
+
+ BufferedImage img = ImageIO.read(is);
+ OutputStream outputStream = response.getOutputStream();
+ String formatName = "png";
+ response.setContentType("image/png");
+ ImageIO.write(img, formatName, outputStream);
+ outputStream.close();
+ return;
+
+ }
+
+ response.setCharacterEncoding("utf-8");
+ InputStreamReader isr = new InputStreamReader(is);
+ StringBuilder sb = new StringBuilder();
+ BufferedReader br = new BufferedReader(isr);
+ String line = br.readLine();
+
+ while (line != null) {
+ sb.append(line);
+ line = br.readLine();
+ }
+
+ String feedName = request.getParameter("feed");
+ String datasetName = request.getParameter("dataset");
+ String dataverseName = request.getParameter("dataverse");
+
+ String outStr = null;
+ if (requestURI.startsWith("/webui/static")) {
+ outStr = sb.toString();
+ } else {
+ MetadataManager.INSTANCE.init();
+ MetadataTransactionContext ctx = MetadataManager.INSTANCE.beginTransaction();
+
+ Feed feed = MetadataManager.INSTANCE.getFeed(ctx, dataverseName, feedName);
+ MetadataManager.INSTANCE.commitTransaction(ctx);
+
+ StringBuilder ldStr = new StringBuilder();
+ ldStr.append("Feed " + feed.getDataverseName() + " " + feed.getFeedName() + " " + feed.getAdaptorName());
+ ldStr.append("<br />");
+ ldStr.append("<br />");
+ ldStr.append("Graph");
+ ldStr.append("<br />");
+ ldStr.append("<br />");
+
+ outStr = String.format(sb.toString(), dataverseName, datasetName, feedName);
+
+ }
+
+ PrintWriter out = response.getWriter();
+ out.println(outStr);
+ } catch (ACIDException | MetadataException e) {
+ e.printStackTrace();
+ }
+ }
+
+}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedDataProviderServlet.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedDataProviderServlet.java
new file mode 100644
index 0000000..65b3883
--- /dev/null
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedDataProviderServlet.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.api.http.servlet;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Random;
+
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.json.JSONException;
+import org.json.JSONObject;
+
+public class FeedDataProviderServlet extends HttpServlet {
+ private static final long serialVersionUID = 1L;
+
+ private Random random = new Random();
+
+ @Override
+ public void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException {
+
+ String feedName = request.getParameter("feed");
+ String datasetName = request.getParameter("dataset");
+ String dataverseName = request.getParameter("dataverse");
+
+ JSONObject obj = new JSONObject();
+ try {
+ obj.put("time", System.currentTimeMillis());
+ obj.put("value", random.nextInt(100) + "");
+ } catch (JSONException jsoe) {
+ throw new IOException(jsoe);
+ }
+
+ PrintWriter out = response.getWriter();
+ out.println(obj.toString());
+ }
+
+}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java
index 74c2c8a..5f59bea 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java
@@ -41,8 +41,8 @@
import edu.uci.ics.asterix.aql.expression.Identifier;
import edu.uci.ics.asterix.aql.translator.AqlTranslator;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
-import edu.uci.ics.asterix.common.exceptions.AsterixException;
import edu.uci.ics.asterix.event.schema.cluster.Cluster;
+import edu.uci.ics.asterix.event.schema.cluster.Node;
import edu.uci.ics.asterix.file.JobSpecificationUtils;
import edu.uci.ics.asterix.hyracks.bootstrap.FeedLifecycleListener.FeedFailure.FailureType;
import edu.uci.ics.asterix.metadata.MetadataException;
@@ -73,11 +73,9 @@
import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
import edu.uci.ics.asterix.om.util.AsterixClusterProperties;
import edu.uci.ics.asterix.om.util.AsterixClusterProperties.State;
-import edu.uci.ics.asterix.om.util.AsterixRuntimeUtil;
import edu.uci.ics.asterix.runtime.formats.NonTaggedDataFormat;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
-import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
import edu.uci.ics.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
@@ -121,7 +119,7 @@
responseInbox = new LinkedBlockingQueue<IClusterManagementWorkResponse>();
feedWorkRequestResponseHandler = new FeedWorkRequestResponseHandler(responseInbox);
this.healthDataParser = new FeedHealthDataParser();
- feedHealthDataListener = new MessageListener(FEED_HEALTH_PORT, healthDataParser);
+ feedHealthDataListener = new MessageListener(FEED_HEALTH_PORT, healthDataParser.getMessageQueue());
try {
feedHealthDataListener.start();
} catch (Exception e) {
@@ -214,9 +212,11 @@
private static class FeedHealthDataParser implements IMessageAnalyzer {
+ private LinkedBlockingQueue<String> inbox = new LinkedBlockingQueue<String>();
+
@Override
- public void receiveMessage(String message) {
- System.out.println(" HEALTH DATA RECEIVED :" + message);
+ public LinkedBlockingQueue<String> getMessageQueue() {
+ return inbox;
}
}
@@ -363,11 +363,27 @@
int superFeedManagerIndex = new Random().nextInt(feedInfo.ingestLocations.size());
String superFeedManagerHost = feedInfo.ingestLocations.get(superFeedManagerIndex);
- SuperFeedManager sfm = new SuperFeedManager(feedInfo.feedConnectionId, superFeedManagerHost, 3000);
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Super Feed Manager for " + feedInfo.feedConnectionId + " is " + sfm);
+ Cluster cluster = AsterixClusterProperties.INSTANCE.getCluster();
+ String instanceName = cluster.getInstanceName();
+ String node = superFeedManagerHost.substring(instanceName.length() + 1);
+ String hostIp = null;
+ for (Node n : cluster.getNode()) {
+ if (n.getId().equals(node)) {
+ hostIp = n.getClusterIp();
+ break;
+ }
}
- FeedManagerElectMessage feedMessage = new FeedManagerElectMessage(sfm);
+ if (hostIp == null) {
+ throw new IllegalStateException("Unknown node " + superFeedManagerHost);
+ }
+
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Super Feed Manager for " + feedInfo.feedConnectionId + " is " + hostIp + " node "
+ + superFeedManagerHost);
+ }
+
+ FeedManagerElectMessage feedMessage = new FeedManagerElectMessage(hostIp, superFeedManagerHost, 3000,
+ feedInfo.feedConnectionId);
messengerOutbox.add(new FeedMessengerMessage(feedMessage, feedInfo));
MetadataManager.INSTANCE.acquireWriteLatch();
MetadataTransactionContext mdTxnCtx = null;
@@ -745,7 +761,7 @@
IOperatorDescriptor feedMessenger;
AlgebricksPartitionConstraint messengerPc;
- List<String> locations = new ArrayList<String>();
+ Set<String> locations = new HashSet<String>();
locations.addAll(feedInfo.computeLocations);
locations.addAll(feedInfo.ingestLocations);
locations.addAll(feedInfo.storageLocations);
diff --git a/asterix-app/src/main/resources/feed/dashboard.html b/asterix-app/src/main/resources/feed/dashboard.html
new file mode 100644
index 0000000..6576f75
--- /dev/null
+++ b/asterix-app/src/main/resources/feed/dashboard.html
@@ -0,0 +1,27 @@
+<!DOCTYPE html>
+<html>
+ <head>
+ <script type="text/javascript" src="/webui/static/js/smoothie.js"></script>
+ <script type="text/javascript"></script>
+ </head>
+ <body>
+ <canvas id="chart" width="400" height="100"></canvas>
+
+ <script type="text/javascript">
+
+ // Random data
+
+ var feedSeries = new TimeSeries();
+ setInterval(function() {
+ line1.append(new Date().getTime(), Math.random());
+ line2.append(new Date().getTime(), Math.random());
+ }, 1000);
+
+ var smoothie = new SmoothieChart({ grid: { strokeStyle: 'rgb(125, 0, 0)', fillStyle: 'rgb(60, 0, 0)', lineWidth: 1, millisPerLine: 250, verticalSections: 6 } });
+ smoothie.addTimeSeries(line1, { strokeStyle: 'rgb(0, 255, 0)', fillStyle: 'rgba(0, 255, 0, 0.4)', lineWidth: 3 });
+ smoothie.addTimeSeries(line2, { strokeStyle: 'rgb(255, 0, 255)', fillStyle: 'rgba(255, 0, 255, 0.3)', lineWidth: 3 });
+ smoothie.streamTo(document.getElementById("mycanvas"), 1000);
+
+ </script>
+ </body>
+</html>
diff --git a/asterix-app/src/main/resources/feed/home.html b/asterix-app/src/main/resources/feed/home.html
new file mode 100644
index 0000000..6c74cad
--- /dev/null
+++ b/asterix-app/src/main/resources/feed/home.html
@@ -0,0 +1,83 @@
+<!--
+ ! Copyright 2009-2013 by The Regents of the University of California
+ ! Licensed under the Apache License, Version 2.0 (the "License");
+ ! you may not use this file except in compliance with the License.
+ ! you may obtain a copy of the License from
+ !
+ ! http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing, software
+ ! distributed under the License is distributed on an "AS IS" BASIS,
+ ! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ! See the License for the specific language governing permissions and
+ ! limitations under the License.
+ !-->
+<!DOCTYPE html>
+<html lang="en">
+<head>
+<meta name="description" content="ASTERIX WEB PAGE" />
+<meta name="viewport" content="width=device-width, initial-scale=1.0">
+<link href='http://fonts.googleapis.com/css?family=Bitter|PT+Sans+Caption|Open+Sans' rel='stylesheet' type='text/css'>
+<script src="/webui/static/js/jquery.min.js"></script>
+
+<link href="/webui/static/css/bootstrap.min.css" rel="stylesheet" type="text/css" />
+<link href="/webui/static/css/bootstrap-responsive.min.css" rel="stylesheet" type="text/css" />
+
+<script src="/webui/static/js/bootstrap.min.js"></script>
+
+<link href="/webui/static/css/style.css" rel="stylesheet" type="text/css" />
+
+
+<meta charset=utf-8 />
+<title>AsterixDB Web Interface</title>
+</head>
+
+<body>
+ <div class="navbar navbar-fixed-top">
+ <div class="navbar-inner">
+ <div class="container">
+ <a class="btn btn-navbar" data-toggle="collapse" data-target=".nav-collapse">
+ <span class="icon-bar"></span>
+ <span class="icon-bar"></span>
+ <span class="icon-bar"></span>
+ </a>
+
+ <!-- Temporary logo placeholder -->
+ <a class="brand" href="#"><img src="/webui/static/img/finalasterixlogo.png"></a>
+
+ <div class="nav-collapse collapse">
+ <ul class="nav">
+ <li><a href="http://code.google.com/p/asterixdb/" target="_blank">
+ Open source<img class="extarget" src="/webui/static/img/targetlink.png"/></a></li>
+ <li><a href="http://code.google.com/p/asterixdb/issues/list" target="_blank">
+ File issues<img class="extarget" src="/webui/static/img/targetlink.png"/></a></li>
+ <li><a href="https://groups.google.com/forum/?fromgroups#!forum/asterixdb-users" target="_blank">
+ Contact<img class="extarget" src="/webui/static/img/targetlink.png"/></a></li>
+ </ul>
+ </div><!--/.nav-collapse -->
+ </div>
+ </div>
+ </div>
+
+ <div class="content">
+ <div class="container">
+ <div class="row-fluid">
+ <div class="span12">
+ %s
+ </div>
+
+ </div>
+ </div>
+</div>
+ <div class="footer">
+ <section class="line"><hr></section>
+ <section class="content">
+ <section class="left">
+ </section>
+ <section class="right">
+ © Copyright 2013 University of California, Irvine
+ </section>
+ </section>
+ </div>
+</body>
+</html>
diff --git a/asterix-app/src/main/resources/webui/static/js/smoothie.js b/asterix-app/src/main/resources/webui/static/js/smoothie.js
new file mode 100644
index 0000000..4e46fa7
--- /dev/null
+++ b/asterix-app/src/main/resources/webui/static/js/smoothie.js
@@ -0,0 +1,660 @@
+// MIT License:
+//
+// Copyright (c) 2010-2013, Joe Walnes
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+// THE SOFTWARE.
+
+/**
+ * Smoothie Charts - http://smoothiecharts.org/
+ * (c) 2010-2013, Joe Walnes
+ * 2013, Drew Noakes
+ *
+ * v1.0: Main charting library, by Joe Walnes
+ * v1.1: Auto scaling of axis, by Neil Dunn
+ * v1.2: fps (frames per second) option, by Mathias Petterson
+ * v1.3: Fix for divide by zero, by Paul Nikitochkin
+ * v1.4: Set minimum, top-scale padding, remove timeseries, add optional timer to reset bounds, by Kelley Reynolds
+ * v1.5: Set default frames per second to 50... smoother.
+ * .start(), .stop() methods for conserving CPU, by Dmitry Vyal
+ * options.interpolation = 'bezier' or 'line', by Dmitry Vyal
+ * options.maxValue to fix scale, by Dmitry Vyal
+ * v1.6: minValue/maxValue will always get converted to floats, by Przemek Matylla
+ * v1.7: options.grid.fillStyle may be a transparent color, by Dmitry A. Shashkin
+ * Smooth rescaling, by Kostas Michalopoulos
+ * v1.8: Set max length to customize number of live points in the dataset with options.maxDataSetLength, by Krishna Narni
+ * v1.9: Display timestamps along the bottom, by Nick and Stev-io
+ * (https://groups.google.com/forum/?fromgroups#!topic/smoothie-charts/-Ywse8FCpKI%5B1-25%5D)
+ * Refactored by Krishna Narni, to support timestamp formatting function
+ * v1.10: Switch to requestAnimationFrame, removed the now obsoleted options.fps, by Gergely Imreh
+ * v1.11: options.grid.sharpLines option added, by @drewnoakes
+ * Addressed warning seen in Firefox when seriesOption.fillStyle undefined, by @drewnoakes
+ * v1.12: Support for horizontalLines added, by @drewnoakes
+ * Support for yRangeFunction callback added, by @drewnoakes
+ * v1.13: Fixed typo (#32), by @alnikitich
+ * v1.14: Timer cleared when last TimeSeries removed (#23), by @davidgaleano
+ * Fixed diagonal line on chart at start/end of data stream, by @drewnoakes
+ * v1.15: Support for npm package (#18), by @dominictarr
+ * Fixed broken removeTimeSeries function (#24) by @davidgaleano
+ * Minor performance and tidying, by @drewnoakes
+ * v1.16: Bug fix introduced in v1.14 relating to timer creation/clearance (#23), by @drewnoakes
+ * TimeSeries.append now deals with out-of-order timestamps, and can merge duplicates, by @zacwitte (#12)
+ * Documentation and some local variable renaming for clarity, by @drewnoakes
+ * v1.17: Allow control over font size (#10), by @drewnoakes
+ * Timestamp text won't overlap, by @drewnoakes
+ * v1.18: Allow control of max/min label precision, by @drewnoakes
+ * Added 'borderVisible' chart option, by @drewnoakes
+ * Allow drawing series with fill but no stroke (line), by @drewnoakes
+ */
+
+;(function(exports) {
+
+ var Util = {
+ extend: function() {
+ arguments[0] = arguments[0] || {};
+ for (var i = 1; i < arguments.length; i++)
+ {
+ for (var key in arguments[i])
+ {
+ if (arguments[i].hasOwnProperty(key))
+ {
+ if (typeof(arguments[i][key]) === 'object') {
+ if (arguments[i][key] instanceof Array) {
+ arguments[0][key] = arguments[i][key];
+ } else {
+ arguments[0][key] = Util.extend(arguments[0][key], arguments[i][key]);
+ }
+ } else {
+ arguments[0][key] = arguments[i][key];
+ }
+ }
+ }
+ }
+ return arguments[0];
+ }
+ };
+
+ /**
+ * Initialises a new <code>TimeSeries</code> with optional data options.
+ *
+ * Options are of the form (defaults shown):
+ *
+ * <pre>
+ * {
+ * resetBounds: true, // enables/disables automatic scaling of the y-axis
+ * resetBoundsInterval: 3000 // the period between scaling calculations, in millis
+ * }
+ * </pre>
+ *
+ * Presentation options for TimeSeries are specified as an argument to <code>SmoothieChart.addTimeSeries</code>.
+ *
+ * @constructor
+ */
+ function TimeSeries(options) {
+ this.options = Util.extend({}, TimeSeries.defaultOptions, options);
+ this.data = [];
+ this.maxValue = Number.NaN; // The maximum value ever seen in this TimeSeries.
+ this.minValue = Number.NaN; // The minimum value ever seen in this TimeSeries.
+ }
+
+ TimeSeries.defaultOptions = {
+ resetBoundsInterval: 3000,
+ resetBounds: true
+ };
+
+ /**
+ * Recalculate the min/max values for this <code>TimeSeries</code> object.
+ *
+ * This causes the graph to scale itself in the y-axis.
+ */
+ TimeSeries.prototype.resetBounds = function() {
+ if (this.data.length) {
+ // Walk through all data points, finding the min/max value
+ this.maxValue = this.data[0][1];
+ this.minValue = this.data[0][1];
+ for (var i = 1; i < this.data.length; i++) {
+ var value = this.data[i][1];
+ if (value > this.maxValue) {
+ this.maxValue = value;
+ }
+ if (value < this.minValue) {
+ this.minValue = value;
+ }
+ }
+ } else {
+ // No data exists, so set min/max to NaN
+ this.maxValue = Number.NaN;
+ this.minValue = Number.NaN;
+ }
+ };
+
+ /**
+ * Adds a new data point to the <code>TimeSeries</code>, preserving chronological order.
+ *
+ * @param timestamp the position, in time, of this data point
+ * @param value the value of this data point
+ * @param sumRepeatedTimeStampValues if <code>timestamp</code> has an exact match in the series, this flag controls
+ * whether it is replaced, or the values summed (defaults to false.)
+ */
+ TimeSeries.prototype.append = function(timestamp, value, sumRepeatedTimeStampValues) {
+ // Rewind until we hit an older timestamp
+ var i = this.data.length - 1;
+ while (i > 0 && this.data[i][0] > timestamp) {
+ i--;
+ }
+
+ if (this.data.length > 0 && this.data[i][0] === timestamp) {
+ // Update existing values in the array
+ if (sumRepeatedTimeStampValues) {
+ // Sum this value into the existing 'bucket'
+ this.data[i][1] += value;
+ value = this.data[i][1];
+ } else {
+ // Replace the previous value
+ this.data[i][1] = value;
+ }
+ } else if (i < this.data.length - 1) {
+ // Splice into the correct position to keep timestamps in order
+ this.data.splice(i + 1, 0, [timestamp, value]);
+ } else {
+ // Add to the end of the array
+ this.data.push([timestamp, value]);
+ }
+
+ this.maxValue = isNaN(this.maxValue) ? value : Math.max(this.maxValue, value);
+ this.minValue = isNaN(this.minValue) ? value : Math.min(this.minValue, value);
+ };
+
+ TimeSeries.prototype.dropOldData = function(oldestValidTime, maxDataSetLength) {
+ // We must always keep one expired data point as we need this to draw the
+ // line that comes into the chart from the left, but any points prior to that can be removed.
+ var removeCount = 0;
+ while (this.data.length - removeCount >= maxDataSetLength && this.data[removeCount + 1][0] < oldestValidTime) {
+ removeCount++;
+ }
+ if (removeCount !== 0) {
+ this.data.splice(0, removeCount);
+ }
+ };
+
+ /**
+ * Initialises a new <code>SmoothieChart</code>.
+ *
+ * Options are optional, and should be of the form below. Just specify the values you
+ * need and the rest will be given sensible defaults as shown:
+ *
+ * <pre>
+ * {
+ * minValue: undefined, // specify to clamp the lower y-axis to a given value
+ * maxValue: undefined, // specify to clamp the upper y-axis to a given value
+ * maxValueScale: 1, // allows proportional padding to be added above the chart. for 10% padding, specify 1.1.
+ * yRangeFunction: undefined, // function({min: , max: }) { return {min: , max: }; }
+ * scaleSmoothing: 0.125, // controls the rate at which y-value zoom animation occurs
+ * millisPerPixel: 20, // sets the speed at which the chart pans by
+ * maxDataSetLength: 2,
+ * interpolation: 'bezier' // or 'linear'
+ * timestampFormatter: null, // Optional function to format time stamps for bottom of chart. You may use SmoothieChart.timeFormatter, or your own: function(date) { return ''; }
+ * horizontalLines: [], // [ { value: 0, color: '#ffffff', lineWidth: 1 } ],
+ * grid:
+ * {
+ * fillStyle: '#000000', // the background colour of the chart
+ * lineWidth: 1, // the pixel width of grid lines
+ * strokeStyle: '#777777', // colour of grid lines
+ * millisPerLine: 1000, // distance between vertical grid lines
+ * sharpLines: false, // controls whether grid lines are 1px sharp, or softened
+ * verticalSections: 2, // number of vertical sections marked out by horizontal grid lines
+ * borderVisible: true // whether the grid lines trace the border of the chart or not
+ * },
+ * labels
+ * {
+ * disabled: false, // enables/disables labels showing the min/max values
+ * fillStyle: '#ffffff', // colour for text of labels,
+ * fontSize: 15,
+ * fontFamily: 'sans-serif',
+ * precision: 2
+ * },
+ * }
+ * </pre>
+ *
+ * @constructor
+ */
+ function SmoothieChart(options) {
+ this.options = Util.extend({}, SmoothieChart.defaultChartOptions, options);
+ this.seriesSet = [];
+ this.currentValueRange = 1;
+ this.currentVisMinValue = 0;
+ }
+
+ SmoothieChart.defaultChartOptions = {
+ millisPerPixel: 20,
+ maxValueScale: 1,
+ interpolation: 'bezier',
+ scaleSmoothing: 0.125,
+ maxDataSetLength: 2,
+ grid: {
+ fillStyle: '#000000',
+ strokeStyle: '#777777',
+ lineWidth: 1,
+ sharpLines: false,
+ millisPerLine: 1000,
+ verticalSections: 2,
+ borderVisible: true
+ },
+ labels: {
+ fillStyle: '#ffffff',
+ disabled: false,
+ fontSize: 10,
+ fontFamily: 'monospace',
+ precision: 2
+ },
+ horizontalLines: []
+ };
+
+ // Based on http://inspirit.github.com/jsfeat/js/compatibility.js
+ SmoothieChart.AnimateCompatibility = (function() {
+ // TODO this global variable will cause bugs if more than one chart is used and the browser does not support *requestAnimationFrame natively
+ var lastTime = 0,
+ requestAnimationFrame = function(callback, element) {
+ var requestAnimationFrame =
+ window.requestAnimationFrame ||
+ window.webkitRequestAnimationFrame ||
+ window.mozRequestAnimationFrame ||
+ window.oRequestAnimationFrame ||
+ window.msRequestAnimationFrame ||
+ function(callback) {
+ var currTime = new Date().getTime(),
+ timeToCall = Math.max(0, 16 - (currTime - lastTime)),
+ id = window.setTimeout(function() {
+ callback(currTime + timeToCall);
+ }, timeToCall);
+ lastTime = currTime + timeToCall;
+ return id;
+ };
+ return requestAnimationFrame.call(window, callback, element);
+ },
+ cancelAnimationFrame = function(id) {
+ var cancelAnimationFrame =
+ window.cancelAnimationFrame ||
+ function(id) {
+ clearTimeout(id);
+ };
+ return cancelAnimationFrame.call(window, id);
+ };
+
+ return {
+ requestAnimationFrame: requestAnimationFrame,
+ cancelAnimationFrame: cancelAnimationFrame
+ };
+ })();
+
+ SmoothieChart.defaultSeriesPresentationOptions = {
+ lineWidth: 1,
+ strokeStyle: '#ffffff'
+ };
+
+ /**
+ * Adds a <code>TimeSeries</code> to this chart, with optional presentation options.
+ *
+ * Presentation options should be of the form (defaults shown):
+ *
+ * <pre>
+ * {
+ * lineWidth: 1,
+ * strokeStyle: '#ffffff',
+ * fillStyle: undefined
+ * }
+ * </pre>
+ */
+ SmoothieChart.prototype.addTimeSeries = function(timeSeries, options) {
+ this.seriesSet.push({timeSeries: timeSeries, options: Util.extend({}, SmoothieChart.defaultSeriesPresentationOptions, options)});
+ if (timeSeries.options.resetBounds && timeSeries.options.resetBoundsInterval > 0) {
+ timeSeries.resetBoundsTimerId = setInterval(
+ function() {
+ timeSeries.resetBounds();
+ },
+ timeSeries.options.resetBoundsInterval
+ );
+ }
+ };
+
+ /**
+ * Removes the specified <code>TimeSeries</code> from the chart.
+ */
+ SmoothieChart.prototype.removeTimeSeries = function(timeSeries) {
+ // Find the correct timeseries to remove, and remove it
+ var numSeries = this.seriesSet.length;
+ for (var i = 0; i < numSeries; i++) {
+ if (this.seriesSet[i].timeSeries === timeSeries) {
+ this.seriesSet.splice(i, 1);
+ break;
+ }
+ }
+ // If a timer was operating for that timeseries, remove it
+ if (timeSeries.resetBoundsTimerId) {
+ // Stop resetting the bounds, if we were
+ clearInterval(timeSeries.resetBoundsTimerId);
+ }
+ };
+
+ /**
+ * Instructs the <code>SmoothieChart</code> to start rendering to the provided canvas, with specified delay.
+ *
+ * @param canvas the target canvas element
+ * @param delayMillis an amount of time to wait before a data point is shown. This can prevent the end of the series
+ * from appearing on screen, with new values flashing into view, at the expense of some latency.
+ */
+ SmoothieChart.prototype.streamTo = function(canvas, delayMillis) {
+ this.canvas = canvas;
+ this.delay = delayMillis;
+ this.start();
+ };
+
+ /**
+ * Starts the animation of this chart.
+ */
+ SmoothieChart.prototype.start = function() {
+ if (this.frame) {
+ // We're already running, so just return
+ return;
+ }
+
+ // Renders a frame, and queues the next frame for later rendering
+ var animate = function() {
+ this.frame = SmoothieChart.AnimateCompatibility.requestAnimationFrame(function() {
+ this.render();
+ animate();
+ }.bind(this));
+ }.bind(this);
+
+ animate();
+ };
+
+ /**
+ * Stops the animation of this chart.
+ */
+ SmoothieChart.prototype.stop = function() {
+ if (this.frame) {
+ SmoothieChart.AnimateCompatibility.cancelAnimationFrame(this.frame);
+ delete this.frame;
+ }
+ };
+
+ SmoothieChart.prototype.updateValueRange = function() {
+ // Calculate the current scale of the chart, from all time series.
+ var chartOptions = this.options,
+ chartMaxValue = Number.NaN,
+ chartMinValue = Number.NaN;
+
+ for (var d = 0; d < this.seriesSet.length; d++) {
+ // TODO(ndunn): We could calculate / track these values as they stream in.
+ var timeSeries = this.seriesSet[d].timeSeries;
+ if (!isNaN(timeSeries.maxValue)) {
+ chartMaxValue = !isNaN(chartMaxValue) ? Math.max(chartMaxValue, timeSeries.maxValue) : timeSeries.maxValue;
+ }
+
+ if (!isNaN(timeSeries.minValue)) {
+ chartMinValue = !isNaN(chartMinValue) ? Math.min(chartMinValue, timeSeries.minValue) : timeSeries.minValue;
+ }
+ }
+
+ // Scale the chartMaxValue to add padding at the top if required
+ if (chartOptions.maxValue != null) {
+ chartMaxValue = chartOptions.maxValue;
+ } else {
+ chartMaxValue *= chartOptions.maxValueScale;
+ }
+
+ // Set the minimum if we've specified one
+ if (chartOptions.minValue != null) {
+ chartMinValue = chartOptions.minValue;
+ }
+
+ // If a custom range function is set, call it
+ if (this.options.yRangeFunction) {
+ var range = this.options.yRangeFunction({min: chartMinValue, max: chartMaxValue});
+ chartMinValue = range.min;
+ chartMaxValue = range.max;
+ }
+
+ if (!isNaN(chartMaxValue) && !isNaN(chartMinValue)) {
+ var targetValueRange = chartMaxValue - chartMinValue;
+ this.currentValueRange += chartOptions.scaleSmoothing * (targetValueRange - this.currentValueRange);
+ this.currentVisMinValue += chartOptions.scaleSmoothing * (chartMinValue - this.currentVisMinValue);
+ }
+
+ this.valueRange = { min: chartMinValue, max: chartMaxValue };
+ };
+
+ SmoothieChart.prototype.render = function(canvas, time) {
+ canvas = canvas || this.canvas;
+ time = time || new Date().getTime() - (this.delay || 0);
+
+ // TODO only render if the chart has moved at least 1px since the last rendered frame
+
+ // Round time down to pixel granularity, so motion appears smoother.
+ time -= time % this.options.millisPerPixel;
+
+ var context = canvas.getContext('2d'),
+ chartOptions = this.options,
+ dimensions = { top: 0, left: 0, width: canvas.clientWidth, height: canvas.clientHeight },
+ // Calculate the threshold time for the oldest data points.
+ oldestValidTime = time - (dimensions.width * chartOptions.millisPerPixel),
+ valueToYPixel = function(value) {
+ var offset = value - this.currentVisMinValue;
+ return this.currentValueRange === 0
+ ? dimensions.height
+ : dimensions.height - (Math.round((offset / this.currentValueRange) * dimensions.height));
+ }.bind(this),
+ timeToXPixel = function(t) {
+ return Math.round(dimensions.width - ((time - t) / chartOptions.millisPerPixel));
+ };
+
+ this.updateValueRange();
+
+ context.font = chartOptions.labels.fontSize + 'px ' + chartOptions.labels.fontFamily;
+
+ // Save the state of the canvas context, any transformations applied in this method
+ // will get removed from the stack at the end of this method when .restore() is called.
+ context.save();
+
+ // Move the origin.
+ context.translate(dimensions.left, dimensions.top);
+
+ // Create a clipped rectangle - anything we draw will be constrained to this rectangle.
+ // This prevents the occasional pixels from curves near the edges overrunning and creating
+ // screen cheese (that phrase should need no explanation).
+ context.beginPath();
+ context.rect(0, 0, dimensions.width, dimensions.height);
+ context.clip();
+
+ // Clear the working area.
+ context.save();
+ context.fillStyle = chartOptions.grid.fillStyle;
+ context.clearRect(0, 0, dimensions.width, dimensions.height);
+ context.fillRect(0, 0, dimensions.width, dimensions.height);
+ context.restore();
+
+ // Grid lines...
+ context.save();
+ context.lineWidth = chartOptions.grid.lineWidth;
+ context.strokeStyle = chartOptions.grid.strokeStyle;
+ // Vertical (time) dividers.
+ if (chartOptions.grid.millisPerLine > 0) {
+ var textUntilX = dimensions.width - context.measureText(minValueString).width + 4;
+ for (var t = time - (time % chartOptions.grid.millisPerLine);
+ t >= oldestValidTime;
+ t -= chartOptions.grid.millisPerLine) {
+ var gx = timeToXPixel(t);
+ if (chartOptions.grid.sharpLines) {
+ gx -= 0.5;
+ }
+ context.beginPath();
+ context.moveTo(gx, 0);
+ context.lineTo(gx, dimensions.height);
+ context.stroke();
+ context.closePath();
+
+ // Display timestamp at bottom of this line if requested, and it won't overlap
+ if (chartOptions.timestampFormatter && gx < textUntilX) {
+ // Formats the timestamp based on user specified formatting function
+ // SmoothieChart.timeFormatter function above is one such formatting option
+ var tx = new Date(t),
+ ts = chartOptions.timestampFormatter(tx),
+ tsWidth = context.measureText(ts).width;
+ textUntilX = gx - tsWidth - 2;
+ context.fillStyle = chartOptions.labels.fillStyle;
+ context.fillText(ts, gx - tsWidth, dimensions.height - 2);
+ }
+ }
+ }
+
+ // Horizontal (value) dividers.
+ for (var v = 1; v < chartOptions.grid.verticalSections; v++) {
+ var gy = Math.round(v * dimensions.height / chartOptions.grid.verticalSections);
+ if (chartOptions.grid.sharpLines) {
+ gy -= 0.5;
+ }
+ context.beginPath();
+ context.moveTo(0, gy);
+ context.lineTo(dimensions.width, gy);
+ context.stroke();
+ context.closePath();
+ }
+ // Bounding rectangle.
+ if (chartOptions.grid.borderVisible) {
+ context.beginPath();
+ context.strokeRect(0, 0, dimensions.width, dimensions.height);
+ context.closePath();
+ }
+ context.restore();
+
+ // Draw any horizontal lines...
+ if (chartOptions.horizontalLines && chartOptions.horizontalLines.length) {
+ for (var hl = 0; hl < chartOptions.horizontalLines.length; hl++) {
+ var line = chartOptions.horizontalLines[hl],
+ hly = Math.round(valueToYPixel(line.value)) - 0.5;
+ context.strokeStyle = line.color || '#ffffff';
+ context.lineWidth = line.lineWidth || 1;
+ context.beginPath();
+ context.moveTo(0, hly);
+ context.lineTo(dimensions.width, hly);
+ context.stroke();
+ context.closePath();
+ }
+ }
+
+ // For each data set...
+ for (var d = 0; d < this.seriesSet.length; d++) {
+ context.save();
+ var timeSeries = this.seriesSet[d].timeSeries,
+ dataSet = timeSeries.data,
+ seriesOptions = this.seriesSet[d].options;
+
+ // Delete old data that's moved off the left of the chart.
+ timeSeries.dropOldData(oldestValidTime, chartOptions.maxDataSetLength);
+
+ // Set style for this dataSet.
+ context.lineWidth = seriesOptions.lineWidth;
+ context.strokeStyle = seriesOptions.strokeStyle;
+ // Draw the line...
+ context.beginPath();
+ // Retain lastX, lastY for calculating the control points of bezier curves.
+ var firstX = 0, lastX = 0, lastY = 0;
+ for (var i = 0; i < dataSet.length && dataSet.length !== 1; i++) {
+ var x = timeToXPixel(dataSet[i][0]),
+ y = valueToYPixel(dataSet[i][1]);
+
+ if (i === 0) {
+ firstX = x;
+ context.moveTo(x, y);
+ } else {
+ switch (chartOptions.interpolation) {
+ case "linear":
+ case "line": {
+ context.lineTo(x,y);
+ break;
+ }
+ case "bezier":
+ default: {
+ // Great explanation of Bezier curves: http://en.wikipedia.org/wiki/Bezier_curve#Quadratic_curves
+ //
+ // Assuming A was the last point in the line plotted and B is the new point,
+ // we draw a curve with control points P and Q as below.
+ //
+ // A---P
+ // |
+ // |
+ // |
+ // Q---B
+ //
+ // Importantly, A and P are at the same y coordinate, as are B and Q. This is
+ // so adjacent curves appear to flow as one.
+ //
+ context.bezierCurveTo( // startPoint (A) is implicit from last iteration of loop
+ Math.round((lastX + x) / 2), lastY, // controlPoint1 (P)
+ Math.round((lastX + x)) / 2, y, // controlPoint2 (Q)
+ x, y); // endPoint (B)
+ break;
+ }
+ }
+ }
+
+ lastX = x; lastY = y;
+ }
+
+ if (dataSet.length > 1) {
+ if (seriesOptions.fillStyle) {
+ // Close up the fill region.
+ context.lineTo(dimensions.width + seriesOptions.lineWidth + 1, lastY);
+ context.lineTo(dimensions.width + seriesOptions.lineWidth + 1, dimensions.height + seriesOptions.lineWidth + 1);
+ context.lineTo(firstX, dimensions.height + seriesOptions.lineWidth);
+ context.fillStyle = seriesOptions.fillStyle;
+ context.fill();
+ }
+
+ if (seriesOptions.strokeStyle && seriesOptions.strokeStyle !== 'none') {
+ context.stroke();
+ }
+ context.closePath();
+ }
+ context.restore();
+ }
+
+ // Draw the axis values on the chart.
+ if (!chartOptions.labels.disabled && !isNaN(this.valueRange.min) && !isNaN(this.valueRange.max)) {
+ var maxValueString = parseFloat(this.valueRange.max).toFixed(chartOptions.labels.precision),
+ minValueString = parseFloat(this.valueRange.min).toFixed(chartOptions.labels.precision);
+ context.fillStyle = chartOptions.labels.fillStyle;
+ context.fillText(maxValueString, dimensions.width - context.measureText(maxValueString).width - 2, chartOptions.labels.fontSize);
+ context.fillText(minValueString, dimensions.width - context.measureText(minValueString).width - 2, dimensions.height - 2);
+ }
+
+ context.restore(); // See .save() above.
+ };
+
+ // Sample timestamp formatting function
+ SmoothieChart.timeFormatter = function(date) {
+ function pad2(number) { return (number < 10 ? '0' : '') + number }
+ return pad2(date.getHours()) + ':' + pad2(date.getMinutes()) + ':' + pad2(date.getSeconds());
+ };
+
+ exports.TimeSeries = TimeSeries;
+ exports.SmoothieChart = SmoothieChart;
+
+})(typeof exports === 'undefined' ? this : exports);
+
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedFrameWriter.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedFrameWriter.java
index b9c5edc..7e33c9a 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedFrameWriter.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedFrameWriter.java
@@ -1,6 +1,6 @@
package edu.uci.ics.asterix.metadata.feeds;
-import java.io.OutputStream;
+import java.io.IOException;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -46,8 +46,6 @@
private FramePushWait framePushWait;
- private SuperFeedManager sfm;
-
private FeedRuntimeType feedRuntimeType;
private int partition;
@@ -65,7 +63,7 @@
public FeedFrameWriter(IFrameWriter writer, IOperatorNodePushable nodePushable, FeedConnectionId feedId,
FeedPolicyEnforcer policyEnforcer, String nodeId, FeedRuntimeType feedRuntimeType, int partition,
- ExecutorService executorService, FrameTupleAccessor fta) {
+ FrameTupleAccessor fta) {
this.writer = writer;
this.mode = Mode.FORWARD;
this.nodePushable = nodePushable;
@@ -73,14 +71,11 @@
this.policyEnforcer = policyEnforcer;
this.feedRuntimeType = feedRuntimeType;
this.partition = partition;
- this.executorService = executorService;
+ this.executorService = FeedManager.INSTANCE.getFeedExecutorService(feedId);
this.collectStatistics = policyEnforcer.getFeedPolicyAccessor().collectStatistics();
if (collectStatistics) {
this.statsOutbox = new LinkedBlockingQueue<Long>();
- Runnable task = new FeedOperatorStatisticsCollector(feedId, statsOutbox, nodePushable);
- executorService.execute(task);
- sfm = FeedManager.INSTANCE.getSuperFeedManager(feedId);
- framePushWait = new FramePushWait(nodePushable, FLUSH_THRESHOLD_TIME, sfm, feedId, nodeId, feedRuntimeType,
+ framePushWait = new FramePushWait(nodePushable, FLUSH_THRESHOLD_TIME, feedId, nodeId, feedRuntimeType,
partition, FLUSH_THRESHOLD_TIME);
Timer timer = new Timer();
timer.scheduleAtFixedRate(framePushWait, 0, FLUSH_THRESHOLD_TIME);
@@ -127,6 +122,10 @@
writer.open();
}
+ public void reset() {
+ framePushWait.reset();
+ }
+
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
switch (mode) {
@@ -171,7 +170,6 @@
private IOperatorNodePushable nodePushable;
private State state;
private long flushThresholdTime;
- private SuperFeedManager sfm;
private static final String EOL = "\n";
private FeedConnectionId feedId;
private String nodeId;
@@ -180,13 +178,13 @@
private AtomicLong numTuplesInInterval = new AtomicLong(0);
private long period;
private boolean collectThroughput;
+ private FeedMessageService mesgService;
- public FramePushWait(IOperatorNodePushable nodePushable, long flushThresholdTime, SuperFeedManager sfm,
- FeedConnectionId feedId, String nodeId, FeedRuntimeType feedRuntimeType, int partition, long period) {
+ public FramePushWait(IOperatorNodePushable nodePushable, long flushThresholdTime, FeedConnectionId feedId,
+ String nodeId, FeedRuntimeType feedRuntimeType, int partition, long period) {
this.nodePushable = nodePushable;
this.flushThresholdTime = flushThresholdTime;
this.state = State.INTIALIZED;
- this.sfm = sfm;
this.feedId = feedId;
this.nodeId = nodeId;
this.feedRuntimeType = feedRuntimeType;
@@ -201,15 +199,15 @@
}
+ public void reset() {
+ mesgService = null;
+ }
+
public void notifyFinish(int numTuples) {
state = State.WAITNG_FOR_NEXT_FRAME;
numTuplesInInterval.set(numTuplesInInterval.get() + numTuples);
}
- public void resetSuperFeedManager(SuperFeedManager sfm) {
- this.sfm = sfm;
- }
-
@Override
public void run() {
if (state.equals(State.WAITING_FOR_FLUSH_COMPLETION)) {
@@ -218,46 +216,39 @@
if (LOGGER.isLoggable(Level.SEVERE)) {
LOGGER.severe("Congestion reported by " + feedRuntimeType + " [" + partition + "]");
}
- reportCongestionToSFM(currentTime - startTime);
+ sendReportToSFM(currentTime - startTime, FeedReportMessageType.CONGESTION);
}
}
if (collectThroughput) {
- int instantTput = (int) ((numTuplesInInterval.get() * 1000) / period);
- reportThroughputToSFM(instantTput);
+ int instantTput = (int) Math.ceil(new Double(((numTuplesInInterval.get() * 1000) / period)));
+ sendReportToSFM(instantTput, FeedReportMessageType.THROUGHPUT);
}
numTuplesInInterval.set(0);
}
- private void reportCongestionToSFM(long waitingTime) {
- sendReportToSFM(waitingTime, FeedReportMessageType.CONGESTION);
- }
-
- private void reportThroughputToSFM(long throughput) {
- sendReportToSFM(throughput, FeedReportMessageType.THROUGHPUT);
- }
-
private void sendReportToSFM(long value, SuperFeedManager.FeedReportMessageType mesgType) {
String feedRep = feedId.getDataverse() + ":" + feedId.getFeedName() + ":" + feedId.getDatasetName();
String operator = "" + feedRuntimeType;
- String mesg = mesgType.name().toLowerCase() + "|" + feedRep + "|" + operator + "|" + partition + "|"
- + value + "|" + EOL;
-
- Socket sc = null;
- try {
- while (sfm == null) {
- sfm = FeedManager.INSTANCE.getSuperFeedManager(feedId);
- if (sfm == null) {
- Thread.sleep(2000);
- } else {
- break;
+ String message = mesgType.name().toLowerCase() + "|" + feedRep + "|" + operator + "|" + partition + "|"
+ + value + "|" + nodeId + "|" + EOL;
+ if (mesgService == null) {
+ while (mesgService == null) {
+ mesgService = FeedManager.INSTANCE.getFeedMessageService(feedId);
+ if (mesgService == null) {
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
}
}
- sc = new Socket(sfm.getHost(), sfm.getPort());
- OutputStream os = sc.getOutputStream();
- os.write(mesg.getBytes());
- } catch (Exception e) {
+ }
+ try {
+ mesgService.sendMessage(message);
+ } catch (IOException ioe) {
if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Unable to report congestion to " + sfm);
+ LOGGER.warning("Unable to send feed report to SFM for feed " + feedId + " " + feedRuntimeType + "["
+ + partition + "]");
}
}
}
@@ -355,6 +346,9 @@
@Override
public void fail() throws HyracksDataException {
writer.fail();
+ if (timer != null) {
+ timer.cancel();
+ }
}
@Override
@@ -378,9 +372,4 @@
return "MaterializingFrameWriter using " + writer;
}
- public void resetSuperFeedManager() {
- sfm = null;
- framePushWait.resetSuperFeedManager(null);
- }
-
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
index c0f6222..3293d26 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
@@ -64,10 +64,8 @@
AdapterRuntimeManager adapterRuntimeMgr = null;
try {
if (ingestionRuntime == null) {
- ingestionRuntime = new IngestionRuntime(feedId, partition, FeedRuntimeType.INGESTION, adapterRuntimeMgr);
- ExecutorService executorService = FeedManager.INSTANCE.registerFeedRuntime(ingestionRuntime);
FeedFrameWriter mWriter = new FeedFrameWriter(writer, this, feedId, policyEnforcer, nodeId,
- FeedRuntimeType.INGESTION, partition, executorService, fta);
+ FeedRuntimeType.INGESTION, partition, fta);
adapterRuntimeMgr = new AdapterRuntimeManager(feedId, adapter, mWriter, partition, inbox);
if (adapter instanceof AbstractFeedDatasourceAdapter) {
@@ -86,6 +84,7 @@
adapter = adapterRuntimeMgr.getFeedAdapter();
writer.open();
adapterRuntimeMgr.getAdapterExecutor().setWriter(writer);
+ adapterRuntimeMgr.getAdapterExecutor().getWriter().reset();
adapterRuntimeMgr.setState(State.ACTIVE_INGESTION);
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedManager.java
index 6e8cc63..a67e137 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedManager.java
@@ -15,6 +15,7 @@
package edu.uci.ics.asterix.metadata.feeds;
import java.io.IOException;
+import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
@@ -40,11 +41,16 @@
private Map<FeedConnectionId, SuperFeedManager> superFeedManagers = new HashMap<FeedConnectionId, SuperFeedManager>();
private Map<FeedConnectionId, Map<FeedRuntimeId, FeedRuntime>> feedRuntimes = new HashMap<FeedConnectionId, Map<FeedRuntimeId, FeedRuntime>>();
private Map<FeedConnectionId, ExecutorService> feedExecutorService = new HashMap<FeedConnectionId, ExecutorService>();
+ private Map<FeedConnectionId, FeedMessageService> feedMessageService = new HashMap<FeedConnectionId, FeedMessageService>();
public ExecutorService getFeedExecutorService(FeedConnectionId feedId) {
return feedExecutorService.get(feedId);
}
+ public FeedMessageService getFeedMessageService(FeedConnectionId feedId) {
+ return feedMessageService.get(feedId);
+ }
+
@Override
public void deregisterFeed(FeedConnectionId feedId) {
try {
@@ -67,6 +73,9 @@
if (executorService != null && !executorService.isShutdown()) {
executorService.shutdownNow();
}
+ superFeedManagers.remove(feedId);
+ feedMessageService.remove(feedId);
+
} catch (Exception e) {
e.printStackTrace();
if (LOGGER.isLoggable(Level.WARNING)) {
@@ -76,11 +85,12 @@
}
@Override
- public ExecutorService registerFeedRuntime(FeedRuntime feedRuntime) {
- ExecutorService execService = feedExecutorService.get(feedRuntime.getFeedRuntimeId().getFeedId());
+ public ExecutorService registerFeedRuntime(FeedRuntime feedRuntime) throws Exception {
+ FeedConnectionId feedId = feedRuntime.getFeedRuntimeId().getFeedId();
+ ExecutorService execService = feedExecutorService.get(feedId);
if (execService == null) {
execService = Executors.newCachedThreadPool();
- feedExecutorService.put(feedRuntime.getFeedRuntimeId().getFeedId(), execService);
+ feedExecutorService.put(feedId, execService);
}
Map<FeedRuntimeId, FeedRuntime> feedRuntimesForFeed = feedRuntimes.get(feedRuntime.getFeedRuntimeId()
@@ -120,15 +130,34 @@
}
@Override
- public void registerSuperFeedManager(FeedConnectionId feedId, SuperFeedManager sfm) {
+ public void registerSuperFeedManager(FeedConnectionId feedId, SuperFeedManager sfm) throws Exception {
+ boolean overriden = superFeedManagers.get(feedId) != null;
superFeedManagers.put(feedId, sfm);
+ FeedMessageService mesgService = feedMessageService.get(feedId);
+ if (overriden && mesgService != null) {
+ mesgService.stop();
+ }
+ if (mesgService == null || overriden) {
+ mesgService = new FeedMessageService(feedId);
+ feedMessageService.put(feedId, mesgService);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Started Feed Message Service for feed :" + feedId);
+ }
+ mesgService.start();
+ }
}
@Override
public void deregisterSuperFeedManager(FeedConnectionId feedId) {
SuperFeedManager sfm = superFeedManagers.remove(feedId);
try {
- sfm.stop();
+ if (sfm.isLocal()) {
+ sfm.stop();
+ }
+ FeedMessageService fms = feedMessageService.remove(feedId);
+ if (fms != null) {
+ fms.stop();
+ }
} catch (IOException e) {
e.printStackTrace();
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedManagerElectMessage.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedManagerElectMessage.java
index 103eb94..aedd124 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedManagerElectMessage.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedManagerElectMessage.java
@@ -23,11 +23,15 @@
private static final long serialVersionUID = 1L;
- private final SuperFeedManager superFeedMaanger;
+ private final String host;
+ private final String nodeId;
+ private final int port;
- public FeedManagerElectMessage(SuperFeedManager superFeedManager) {
- super(MessageType.SUPER_FEED_MANAGER_ELECT, superFeedManager.getFeedConnectionId());
- this.superFeedMaanger = superFeedManager;
+ public FeedManagerElectMessage(String host, String nodeId, int port, FeedConnectionId feedId) {
+ super(MessageType.SUPER_FEED_MANAGER_ELECT, feedId);
+ this.host = host;
+ this.port = port;
+ this.nodeId = nodeId;
}
@Override
@@ -35,13 +39,21 @@
return MessageType.SUPER_FEED_MANAGER_ELECT;
}
- public SuperFeedManager getSuperFeedMaanger() {
- return superFeedMaanger;
- }
-
@Override
public String toString() {
- return superFeedMaanger.toString();
+ return host + "_" + nodeId + "[" + port + "]";
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ public int getPort() {
+ return port;
}
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java
index 47e155c..81cbd96 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java
@@ -74,22 +74,33 @@
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Registering SUPER Feed MGR for :" + feedId);
}
- SuperFeedManager sfm = ((FeedManagerElectMessage) feedMessage).getSuperFeedMaanger();
+ FeedManagerElectMessage mesg = ((FeedManagerElectMessage) feedMessage);
+ SuperFeedManager sfm = new SuperFeedManager(mesg.getFeedId(), mesg.getHost(), mesg.getNodeId(),
+ mesg.getPort());
synchronized (FeedManager.INSTANCE) {
- if (FeedManager.INSTANCE.getSuperFeedManager(feedId) == null) {
- FeedManager.INSTANCE.registerSuperFeedManager(feedId, sfm);
- INCApplicationContext ncCtx = ctx.getJobletContext().getApplicationContext();
- String nodeId = ncCtx.getNodeId();
+ INCApplicationContext ncCtx = ctx.getJobletContext().getApplicationContext();
+ String nodeId = ncCtx.getNodeId();
- if (sfm.getNodeId().equals(nodeId)) {
- System.out.println("STARTED SUPER FEED MANAGER !!!!!!!!!!!");
- sfm.setLocal(true);
- sfm.start();
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Started Super Feed Manager for " + feedId);
- }
+ if (sfm.getNodeId().equals(nodeId)) {
+ SuperFeedManager currentManager = FeedManager.INSTANCE.getSuperFeedManager(feedId);
+ if (currentManager != null) {
+ currentManager.stop();
+ FeedManager.INSTANCE.deregisterSuperFeedManager(feedId);
}
+
+ sfm.setLocal(true);
+ sfm.start();
+ System.out.println("STARTED SUPER FEED MANAGER !!!!!!!!!!!");
+
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Started Super Feed Manager for " + feedId);
+ }
+ } else {
+ Thread.sleep(5000);
}
+ FeedManager.INSTANCE.registerSuperFeedManager(feedId, sfm);
+ System.out.println("REGISTERED SUPER FEED MANAGER ! + is LOCAL ?" + sfm.isLocal());
+
}
break;
@@ -104,6 +115,7 @@
}
} catch (Exception e) {
+ e.printStackTrace();
throw new HyracksDataException(e);
} finally {
writer.close();
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageService.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageService.java
new file mode 100644
index 0000000..12b22a9
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageService.java
@@ -0,0 +1,122 @@
+package edu.uci.ics.asterix.metadata.feeds;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.Socket;
+import java.net.UnknownHostException;
+import java.nio.CharBuffer;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class FeedMessageService {
+
+ private static final char EOL = (char) "\n".getBytes()[0];
+
+ private final FeedConnectionId feedId;
+ private final LinkedBlockingQueue<String> inbox;
+ private final FeedMessageHandler mesgHandler;
+
+ public FeedMessageService(FeedConnectionId feedId) {
+ this.feedId = feedId;
+ this.inbox = new LinkedBlockingQueue<String>();
+ mesgHandler = new FeedMessageHandler(inbox, feedId);
+ }
+
+ public void start() throws UnknownHostException, IOException, Exception {
+ FeedManager.INSTANCE.getFeedExecutorService(feedId).execute(mesgHandler);
+ }
+
+ public void stop() throws IOException {
+ mesgHandler.stop();
+ }
+
+ public void sendMessage(String message) throws IOException {
+ inbox.add(message);
+ }
+
+ private static class FeedMessageHandler implements Runnable {
+
+ private final LinkedBlockingQueue<String> inbox;
+ private final FeedConnectionId feedId;
+ private Socket sfmSocket;
+ private boolean process = true;
+
+ public FeedMessageHandler(LinkedBlockingQueue<String> inbox, FeedConnectionId feedId) {
+ this.inbox = inbox;
+ this.feedId = feedId;
+ }
+
+ public void run() {
+ try {
+ sfmSocket = obtainSFMSocket();
+ while (process) {
+ String message = inbox.take();
+ sfmSocket.getOutputStream().write(message.getBytes());
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ System.out.println("STOPPING MESSAGE HANDLER");
+ if (sfmSocket != null) {
+ try {
+ sfmSocket.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ }
+
+ public void stop() {
+ process = false;
+ }
+
+ private Socket obtainSFMSocket() throws UnknownHostException, IOException, Exception {
+ Socket sfmDirServiceSocket = null;
+ FeedMessageService fsm = FeedManager.INSTANCE.getFeedMessageService(feedId);
+ while (fsm == null) {
+ fsm = FeedManager.INSTANCE.getFeedMessageService(feedId);
+ if (fsm == null) {
+ Thread.sleep(2000);
+ } else {
+ break;
+ }
+ }
+ SuperFeedManager sfm = FeedManager.INSTANCE.getSuperFeedManager(feedId);
+
+ System.out.println(" OBTAINED SFM DETAILS WILL TRY TO CONNECT " + sfm);
+
+ try {
+ sfmDirServiceSocket = new Socket(sfm.getHost(), sfm.getPort());
+ System.out.println(" CONNECTED TO " + sfm.getHost() + " " + sfm.getPort());
+
+ while (!sfmDirServiceSocket.isConnected()) {
+ Thread.sleep(2000);
+ }
+ InputStream in = sfmDirServiceSocket.getInputStream();
+ CharBuffer buffer = CharBuffer.allocate(50);
+ char ch = 0;
+ while (ch != EOL) {
+ buffer.put(ch);
+ ch = (char) in.read();
+ }
+ buffer.flip();
+ String s = new String(buffer.array());
+ int port = Integer.parseInt(s.trim());
+ System.out.println("OBTAINED PORT " + port + " WILL CONNECT AT " + sfm.getHost() + " " + port);
+
+ sfmSocket = new Socket(sfm.getHost(), port);
+ } catch (Exception e) {
+ System.out.println(" COULT NOT CONNECT TO " + sfm);
+ e.printStackTrace();
+ throw e;
+ } finally {
+ if (sfmDirServiceSocket != null) {
+ sfmDirServiceSocket.close();
+ }
+ }
+ return sfmSocket;
+ }
+ }
+
+}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaOperatorDescriptor.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaOperatorDescriptor.java
index cdc4d2a..f933504 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaOperatorDescriptor.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaOperatorDescriptor.java
@@ -87,26 +87,29 @@
@Override
public void open() throws HyracksDataException {
FeedRuntimeId runtimeId = new FeedRuntimeId(runtimeType, feedId, partition);
- System.out.println("TRYING TO OBTAIN FEED RUNTIME" + runtimeId);
- feedRuntime = FeedManager.INSTANCE.getFeedRuntime(runtimeId);
- if (feedRuntime == null) {
- feedRuntime = new FeedRuntime(feedId, partition, runtimeType);
- feedExecService = FeedManager.INSTANCE.registerFeedRuntime(feedRuntime);
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Did not find a saved state, starting fresh for " + runtimeType + " node.");
+ try {
+ feedRuntime = FeedManager.INSTANCE.getFeedRuntime(runtimeId);
+ if (feedRuntime == null) {
+ feedRuntime = new FeedRuntime(feedId, partition, runtimeType);
+ feedExecService = FeedManager.INSTANCE.registerFeedRuntime(feedRuntime);
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Did not find a saved state, starting fresh for " + runtimeType + " node.");
+ }
+ resumeOldState = false;
+ } else {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Resuming from saved state (if any) of " + runtimeType + " node.");
+ }
+ feedExecService = FeedManager.INSTANCE.getFeedExecutorService(feedId);
+ resumeOldState = true;
}
- resumeOldState = false;
- } else {
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Resuming from saved state (if any) of " + runtimeType + " node.");
- }
- feedExecService = FeedManager.INSTANCE.getFeedExecutorService(feedId);
- resumeOldState = true;
+ FeedFrameWriter mWriter = new FeedFrameWriter(writer, this, feedId, policyEnforcer, nodeId,
+ runtimeType, partition, fta);
+ coreOperatorNodePushable.setOutputFrameWriter(0, mWriter, recordDesc);
+ coreOperatorNodePushable.open();
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
}
- FeedFrameWriter mWriter = new FeedFrameWriter(writer, this, feedId, policyEnforcer, nodeId, runtimeType,
- partition, feedExecService, fta);
- coreOperatorNodePushable.setOutputFrameWriter(0, mWriter, recordDesc);
- coreOperatorNodePushable.open();
}
@Override
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedReport.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedReport.java
new file mode 100644
index 0000000..d0a83d8
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedReport.java
@@ -0,0 +1,104 @@
+package edu.uci.ics.asterix.metadata.feeds;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import edu.uci.ics.asterix.metadata.feeds.FeedRuntime.FeedRuntimeType;
+import edu.uci.ics.asterix.metadata.feeds.SuperFeedManager.FeedReportMessageType;
+
+public class FeedReport implements Comparable {
+
+ private FeedConnectionId feedId;
+ private FeedReportMessageType reportType;
+ private int partition = -1;
+ private FeedRuntimeType runtimeType;
+ private long value = -1;
+ private final String[] representation;
+
+ public FeedReport(String rep) {
+ representation = rep.split("\\|");
+ }
+
+ @Override
+ public String toString() {
+ return getFeedId() + " " + getReportType() + " " + getPartition() + " " + getRuntimeType() + " " + getValue();
+ }
+
+ public FeedConnectionId getFeedId() {
+ if (feedId == null) {
+ String feedIdRep = representation[1];
+ String[] feedIdComp = feedIdRep.split(":");
+ feedId = new FeedConnectionId(feedIdComp[0], feedIdComp[1], feedIdComp[2]);
+ }
+ return feedId;
+ }
+
+ public FeedReportMessageType getReportType() {
+ if (reportType == null) {
+ reportType = FeedReportMessageType.valueOf(representation[0].toUpperCase());
+ }
+ return reportType;
+ }
+
+ public int getPartition() {
+ if (partition < 0) {
+ partition = Integer.parseInt(representation[3]);
+ }
+ return partition;
+ }
+
+ public FeedRuntimeType getRuntimeType() {
+ if (runtimeType == null) {
+ runtimeType = FeedRuntimeType.valueOf(representation[2].toUpperCase());
+ }
+ return runtimeType;
+ }
+
+ public long getValue() {
+ if (value < 0) {
+ value = Long.parseLong(representation[4]);
+ }
+ return value;
+ }
+
+ public String[] getRepresentation() {
+ return representation;
+ }
+
+ @Override
+ public int compareTo(Object o) {
+ if (!(o instanceof FeedReport)) {
+ throw new IllegalArgumentException("Incorrect operand type " + o);
+ }
+
+ FeedReport other = (FeedReport) o;
+ if (!other.getReportType().equals(getReportType())) {
+ throw new IllegalArgumentException("Incorrect operand type " + o);
+ }
+
+ int returnValue = 0;
+
+ switch (getReportType()) {
+ case CONGESTION:
+ returnValue = ranking.get(getRuntimeType()) - ranking.get(other.getRuntimeType());
+ break;
+
+ case THROUGHPUT:
+ returnValue = (int) (other.getValue() - getValue());
+ break;
+ }
+
+ return returnValue;
+ }
+
+ private static Map<FeedRuntimeType, Integer> ranking = populateRanking();
+
+ private static Map<FeedRuntimeType, Integer> populateRanking() {
+ Map<FeedRuntimeType, Integer> ranking = new HashMap<FeedRuntimeType, Integer>();
+ ranking.put(FeedRuntimeType.INGESTION, 1);
+ ranking.put(FeedRuntimeType.COMPUTE, 2);
+ ranking.put(FeedRuntimeType.STORAGE, 3);
+ ranking.put(FeedRuntimeType.COMMIT, 4);
+ return ranking;
+ }
+}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedManager.java
index a0be4ff..278fadd 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedManager.java
@@ -32,8 +32,9 @@
/**
* @param feedRuntime
+ * @throws Exception
*/
- public ExecutorService registerFeedRuntime(FeedRuntime feedRuntime);
+ public ExecutorService registerFeedRuntime(FeedRuntime feedRuntime) throws Exception;
/**
* @param feedRuntimeId
@@ -49,8 +50,9 @@
/**
* @param feedId
* @param sfm
+ * @throws Exception
*/
- public void registerSuperFeedManager(FeedConnectionId feedId, SuperFeedManager sfm);
+ public void registerSuperFeedManager(FeedConnectionId feedId, SuperFeedManager sfm) throws Exception;
/**
* @param feedId
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/MessageListener.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/MessageListener.java
new file mode 100644
index 0000000..3be39f3
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/MessageListener.java
@@ -0,0 +1,165 @@
+package edu.uci.ics.asterix.metadata.feeds;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.nio.CharBuffer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class MessageListener {
+
+ private static final Logger LOGGER = Logger.getLogger(MessageListener.class.getName());
+
+ private int port;
+ private final LinkedBlockingQueue<String> outbox;
+
+ private ExecutorService executorService = Executors.newFixedThreadPool(10);
+
+ private MessageListenerServer listenerServer;
+
+ public MessageListener(int port, LinkedBlockingQueue<String> outbox) {
+ this.port = port;
+ this.outbox = outbox;
+ }
+
+ public void stop() {
+ if (!executorService.isShutdown()) {
+ executorService.shutdownNow();
+ }
+ listenerServer.stop();
+ System.out.println("STOPPED MESSAGE RECEIVING SERVICE AT " + port);
+
+ }
+
+ public void start() throws IOException {
+ System.out.println("STARTING MESSAGE RECEIVING SERVICE AT " + port);
+ listenerServer = new MessageListenerServer(port, outbox);
+ executorService.execute(listenerServer);
+ }
+
+ private static class MessageListenerServer implements Runnable {
+
+ private final int port;
+ private final LinkedBlockingQueue<String> outbox;
+ private ServerSocket server;
+
+ public MessageListenerServer(int port, LinkedBlockingQueue<String> outbox) {
+ this.port = port;
+ this.outbox = outbox;
+ }
+
+ public void stop() {
+ try {
+ server.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void run() {
+ char EOL = (char) "\n".getBytes()[0];
+ Socket client = null;
+ try {
+ server = new ServerSocket(port);
+ client = server.accept();
+ InputStream in = client.getInputStream();
+ CharBuffer buffer = CharBuffer.allocate(5000);
+ char ch;
+ while (true) {
+ ch = (char) in.read();
+ if (((int) ch) == -1) {
+ break;
+ }
+ while (ch != EOL) {
+ buffer.put(ch);
+ ch = (char) in.read();
+ }
+ buffer.flip();
+ String s = new String(buffer.array());
+ synchronized (outbox) {
+ outbox.add(s + "\n");
+ }
+ buffer.position(0);
+ buffer.limit(5000);
+ }
+
+ } catch (Exception e) {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Unable to start Message listener" + server);
+ }
+ } finally {
+ if (server != null) {
+ try {
+ server.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ }
+
+ }
+
+ private static class MessageParser implements Runnable {
+
+ private Socket client;
+ private IMessageAnalyzer messageAnalyzer;
+ private static final char EOL = (char) "\n".getBytes()[0];
+
+ public MessageParser(Socket client, IMessageAnalyzer messageAnalyzer) {
+ this.client = client;
+ this.messageAnalyzer = messageAnalyzer;
+ }
+
+ @Override
+ public void run() {
+ CharBuffer buffer = CharBuffer.allocate(5000);
+ char ch;
+ try {
+ InputStream in = client.getInputStream();
+ while (true) {
+ ch = (char) in.read();
+ if (((int) ch) == -1) {
+ break;
+ }
+ while (ch != EOL) {
+ buffer.put(ch);
+ ch = (char) in.read();
+ }
+ buffer.flip();
+ String s = new String(buffer.array());
+ synchronized (messageAnalyzer) {
+ messageAnalyzer.getMessageQueue().add(s + "\n");
+ }
+ buffer.position(0);
+ buffer.limit(5000);
+ }
+ } catch (IOException ioe) {
+ ioe.printStackTrace();
+ } finally {
+ try {
+ client.close();
+ } catch (IOException ioe) {
+ // do nothing
+ }
+ }
+ }
+ }
+
+ public static interface IMessageAnalyzer {
+
+ /**
+ * @return
+ */
+ public LinkedBlockingQueue<String> getMessageQueue();
+
+ }
+
+}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/SuperFeedManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/SuperFeedManager.java
index a51bf6e..fcc9026 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/SuperFeedManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/SuperFeedManager.java
@@ -16,13 +16,12 @@
import java.io.IOException;
import java.io.OutputStream;
-import java.io.Serializable;
+import java.net.ServerSocket;
import java.net.Socket;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -30,13 +29,10 @@
import edu.uci.ics.asterix.metadata.feeds.FeedRuntime.FeedRuntimeType;
import edu.uci.ics.asterix.metadata.feeds.MessageListener.IMessageAnalyzer;
import edu.uci.ics.asterix.om.util.AsterixClusterProperties;
-import edu.uci.ics.asterix.om.util.AsterixRuntimeUtil;
-public class SuperFeedManager implements Serializable {
+public class SuperFeedManager {
private static final Logger LOGGER = Logger.getLogger(SuperFeedManager.class.getName());
-
- private static final long serialVersionUID = 1L;
private String host;
private final int port;
@@ -45,22 +41,25 @@
private final FeedConnectionId feedConnectionId;
- private MessageListener listener;
+ // private MessageListener listener;
private boolean isLocal = false;
- private transient ExecutorService executorService;
+ private SuperFeedManagerService sfmService;
+
+ private LinkedBlockingQueue<String> inbox;
public enum FeedReportMessageType {
CONGESTION,
THROUGHPUT
}
- public SuperFeedManager(FeedConnectionId feedId, String nodeId, int port) throws Exception {
+ public SuperFeedManager(FeedConnectionId feedId, String host, String nodeId, int port) throws Exception {
this.feedConnectionId = feedId;
this.nodeId = nodeId;
this.port = port;
- initialize();
+ this.host = host;
+ this.inbox = new LinkedBlockingQueue<String>();
}
public int getPort() {
@@ -75,18 +74,6 @@
return nodeId;
}
- private void initialize() throws Exception {
- Map<String, Set<String>> ncs = AsterixRuntimeUtil.getNodeControllerMap();
- for (Entry<String, Set<String>> entry : ncs.entrySet()) {
- String ip = entry.getKey();
- Set<String> nc = entry.getValue();
- if (nc.contains(nodeId)) {
- host = ip;
- break;
- }
- }
- }
-
public FeedConnectionId getFeedConnectionId() {
return feedConnectionId;
}
@@ -100,20 +87,17 @@
}
public void start() throws IOException {
- if (listener == null) {
- if (executorService == null) {
- executorService = Executors.newCachedThreadPool();
- }
- listener = new MessageListener(port, new SuperFeedManagerMessageAnalzer(executorService));
- listener.start();
+ if (sfmService == null) {
+ ExecutorService executorService = FeedManager.INSTANCE.getFeedExecutorService(feedConnectionId);
+ sfmService = new SuperFeedManagerService(port, inbox, feedConnectionId);
+ executorService.execute(sfmService);
}
+ System.out.println("STARTED SUPER FEED MANAGER!");
}
public void stop() throws IOException {
- if (listener != null) {
- listener.stop();
- }
- executorService.shutdownNow();
+ sfmService.stop();
+ System.out.println("STOPPED SUPER FEED MANAGER!");
}
@Override
@@ -121,19 +105,156 @@
return feedConnectionId + "[" + nodeId + "(" + host + ")" + ":" + port + "]";
}
+ public static class SuperFeedManagerMessages {
+
+ private static final String EOL = "\n";
+
+ public enum MessageType {
+ FEED_PORT_REQUEST,
+ FEED_PORT_RESPONSE
+ }
+
+ public static final byte[] SEND_PORT_REQUEST = (MessageType.FEED_PORT_REQUEST.name() + EOL).getBytes();
+ }
+
+ private static class SuperFeedManagerService implements Runnable {
+
+ private int nextPort;
+ private ServerSocket server;
+ private String EOM = "\n";
+
+ private final LinkedBlockingQueue<String> inbox;
+ private List<MessageListener> messageListeners;
+ private SFMessageAnalyzer mesgAnalyzer;
+ private final FeedConnectionId feedId;
+ private boolean process = true;
+
+ public SuperFeedManagerService(int port, LinkedBlockingQueue<String> inbox, FeedConnectionId feedId)
+ throws IOException {
+ server = new ServerSocket(port);
+ nextPort = port;
+ this.inbox = inbox;
+ this.feedId = feedId;
+ this.messageListeners = new ArrayList<MessageListener>();
+ mesgAnalyzer = new SFMessageAnalyzer(inbox);
+ FeedManager.INSTANCE.getFeedExecutorService(feedId).execute(mesgAnalyzer);
+ }
+
+ public void stop() {
+ process = false;
+ if (server != null) {
+ try {
+ server.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ mesgAnalyzer.stop();
+ }
+
+ @Override
+ public void run() {
+ Socket client = null;
+ while (true) {
+ try {
+ client = server.accept();
+ OutputStream os = client.getOutputStream();
+ nextPort++;
+ MessageListener listener = new MessageListener(nextPort, inbox);
+ listener.start();
+ messageListeners.add(listener);
+ os.write((nextPort + EOM).getBytes());
+ os.flush();
+ } catch (IOException e) {
+ if (process == false) {
+ break;
+ }
+ e.printStackTrace();
+ } finally {
+ if (client != null) {
+ try {
+ client.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+ }
+
+ }
+
+ private static class SFMessageAnalyzer implements Runnable {
+
+ private final LinkedBlockingQueue<String> inbox;
+ private final Socket socket;
+ private final OutputStream os;
+ private boolean process = true;
+
+ public SFMessageAnalyzer(LinkedBlockingQueue<String> inbox) throws UnknownHostException, IOException {
+ this.inbox = inbox;
+ String ccHost = AsterixClusterProperties.INSTANCE.getCluster().getMasterNode().getClusterIp();
+ socket = null; //new Socket(ccHost, 2999);
+ os = null; //socket.getOutputStream();
+ }
+
+ public void stop() {
+ process = false;
+ }
+
+ public void run() {
+ while (process) {
+ try {
+ String message = inbox.take();
+ FeedReport report = new FeedReport(message);
+ FeedReportMessageType mesgType = report.getReportType();
+ switch (mesgType) {
+ case THROUGHPUT:
+ //send message to FeedHealthDataReceiver at CC (2999)
+ // os.write(message.getBytes());
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.warning("SuperFeedManager received message " + message);
+ }
+ break;
+ case CONGESTION:
+ // congestionInbox.add(report);
+ break;
+ }
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning(message);
+ }
+ if (os != null) {
+ os.close();
+ }
+ if (socket != null) {
+ socket.close();
+ }
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+
private static class SuperFeedManagerMessageAnalzer implements IMessageAnalyzer {
private String ccHost;
private CongestionAnalyzer congestionAnalyzer;
private LinkedBlockingQueue<FeedReport> congestionInbox = new LinkedBlockingQueue<FeedReport>();
+ private ExecutorService executorService;
+ private LinkedBlockingQueue<String> mesgInbox = new LinkedBlockingQueue<String>();
- public SuperFeedManagerMessageAnalzer(ExecutorService executorService) {
+ public SuperFeedManagerMessageAnalzer(FeedConnectionId feedId) {
ccHost = AsterixClusterProperties.INSTANCE.getCluster().getMasterNode().getClusterIp();
congestionAnalyzer = new CongestionAnalyzer(congestionInbox);
+ executorService = FeedManager.INSTANCE.getFeedExecutorService(feedId);
executorService.execute(congestionAnalyzer);
}
- @Override
public void receiveMessage(String message) {
Socket socket = null;
OutputStream os = null;
@@ -146,6 +267,9 @@
socket = new Socket(ccHost, 2999);
os = socket.getOutputStream();
os.write(message.getBytes());
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.warning("SuperFeedManager received message " + message);
+ }
break;
case CONGESTION:
congestionInbox.add(report);
@@ -168,6 +292,11 @@
}
}
+ @Override
+ public LinkedBlockingQueue<String> getMessageQueue() {
+ return mesgInbox;
+ }
+
}
private static class CongestionAnalyzer implements Runnable {
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapter.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapter.java
index 94250d2..acd9d7a 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapter.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapter.java
@@ -240,8 +240,8 @@
int currentMeasureTweets = gen.getNumFlushedTweets();
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning(dataset + " " + partition + " " + gen.getNumFlushedTweets() + "\t"
+ if (LOGGER.isLoggable(Level.FINE)) {
+ LOGGER.fine(dataset + " " + partition + " " + gen.getNumFlushedTweets() + "\t"
+ ((currentMeasureTweets - prevMeasuredTweets) / tputDuration) + " ID "
+ Thread.currentThread().getId());
}