minor changes to feed console
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedDashboardServlet.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedDashboardServlet.java
index f7c9b16..78bb8bc 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedDashboardServlet.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedDashboardServlet.java
@@ -40,6 +40,7 @@
import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
import edu.uci.ics.asterix.metadata.entities.Feed;
import edu.uci.ics.asterix.metadata.entities.FeedActivity;
+import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityDetails;
import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityType;
import edu.uci.ics.asterix.metadata.feeds.FeedConnectionId;
import edu.uci.ics.asterix.metadata.feeds.RemoteSocketMessageListener;
@@ -95,6 +96,8 @@
String feedName = request.getParameter("feed");
String datasetName = request.getParameter("dataset");
String dataverseName = request.getParameter("dataverse");
+ String ingestLocations = request.getParameter("ingestLocations");
+
FeedConnectionId feedId = new FeedConnectionId(dataverseName, feedName, datasetName);
String outStr = null;
@@ -115,7 +118,14 @@
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info(" Super Feed Maanger address :" + host + "[" + port + "]");
}
- outStr = String.format(sb.toString(), dataverseName, datasetName, feedName);
+
+ String computeLocations = activityDetails.get(FeedActivityDetails.COMPUTE_LOCATIONS);
+ String storageLocations = activityDetails.get(FeedActivityDetails.STORAGE_LOCATIONS);
+ String ingestionPolicy = activityDetails.get(FeedActivityDetails.FEED_POLICY_NAME);
+ String activeSince = activity.getLastUpdatedTimestamp();
+
+ outStr = String.format(sb.toString(), dataverseName, datasetName, feedName, ingestLocations,
+ computeLocations, storageLocations, ingestionPolicy, activeSince);
FeedServletUtil.initiateSubscription(feedId, host, port);
}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedDataProviderServlet.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedDataProviderServlet.java
index c507e23..3721147 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedDataProviderServlet.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedDataProviderServlet.java
@@ -23,6 +23,7 @@
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
+import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
@@ -42,15 +43,12 @@
String report = getFeedReport(feedName, datasetName, dataverseName);
System.out.println(" RECEIVED REPORT " + report);
long timestamp = System.currentTimeMillis();
- String value = null;
- if (report != null) {
- String[] reportComponents = report.split("\\|");
- value = reportComponents[4];
- }
+
JSONObject obj = new JSONObject();
+ JSONArray array = new JSONArray();
try {
obj.put("time", timestamp);
- obj.put("value", value);
+ obj.put("value", report);
} catch (JSONException jsoe) {
throw new IOException(jsoe);
}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedServlet.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedServlet.java
index b732276..6936161 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedServlet.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedServlet.java
@@ -33,15 +33,12 @@
import edu.uci.ics.asterix.metadata.MetadataManager;
import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
import edu.uci.ics.asterix.metadata.entities.FeedActivity;
+import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityDetails;
import edu.uci.ics.asterix.metadata.feeds.FeedConnectionId;
public class FeedServlet extends HttpServlet {
private static final long serialVersionUID = 1L;
- private static final String HYRACKS_CONNECTION_ATTR = "edu.uci.ics.asterix.HYRACKS_CONNECTION";
-
- private static final String HYRACKS_DATASET_ATTR = "edu.uci.ics.asterix.HYRACKS_DATASET";
-
@Override
public void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException {
String resourcePath = null;
@@ -93,16 +90,20 @@
MetadataTransactionContext ctx = MetadataManager.INSTANCE.beginTransaction();
List<FeedActivity> lfa = MetadataManager.INSTANCE.getActiveFeeds(ctx, null, null);
StringBuilder ldStr = new StringBuilder();
- ldStr.append("Feeds");
+ ldStr.append("<br />");
+ ldStr.append("<br />");
+ ldStr.append("Active Feeds");
FeedConnectionId feedId = null;
for (FeedActivity feedActivity : lfa) {
feedId = new FeedConnectionId(feedActivity.getDataverseName(), feedActivity.getFeedName(),
feedActivity.getDatasetName());
+ String ingestLocations = feedActivity.getFeedActivityDetails().get(
+ FeedActivityDetails.INGEST_LOCATIONS);
ldStr.append("<br />");
ldStr.append("<br />");
ldStr.append("<a href=\"/feed/dashboard?dataverse=" + feedActivity.getDataverseName() + "&feed="
- + feedActivity.getFeedName() + "&dataset=" + feedActivity.getDatasetName() + "\">" + feedId
- + "</a>");
+ + feedActivity.getFeedName() + "&dataset=" + feedActivity.getDatasetName()
+ + "&ingestLocations=" + ingestLocations + "\">" + feedId + "</a>");
ldStr.append("<br />");
}
diff --git a/asterix-app/src/main/resources/feed/dashboard.html b/asterix-app/src/main/resources/feed/dashboard.html
index 362072f..5538705 100644
--- a/asterix-app/src/main/resources/feed/dashboard.html
+++ b/asterix-app/src/main/resources/feed/dashboard.html
@@ -5,11 +5,29 @@
<script type="text/javascript">
$(document).ready(function() {
var feedSeries = new TimeSeries();
- var buildGraph = setInterval(fetchFeedReport, 500);
+
+ var dataverse = "%s";
+ var dataset = "%s";
+ var feed = "%s";
+ var ingestLocations = "%s";
+ var computeLocations = "%s";
+ var storageLocations = "%s";
+ var ingestionPolicy = "%s";
+ var activeSince = "%s";
+ var targetUrl = "/feed/data?dataverse=" + dataverse + "&dataset=" + dataset + "&feed=" + feed;
+
+ var ingestionNodes = ingestLocations.split(",");
+ var numIngestionNodes = ingestionNodes.length;
+ var seriesOptions = { strokeStyle: 'rgba(0, 255, 0, 1)', fillStyle: 'rgba(0, 255, 0, 0.2)', lineWidth: 4 };
+ var ingestionTimeSeries = new Array();
+ var graphNames = new Array();
+
+ $.ajaxSetup({ cache: false });
+ setInterval(fetchFeedReport, 500);
function fetchFeedReport() {
$.ajax({
- url: '/feed/data?dataverse=%s&dataset=%s&feed=%s',
+ url: '/feed/data?dataverse=' + dataverse + '&dataset=' + dataset + '&feed=' + feed,
method: 'GET',
dataType: 'json',
success: onFeedReportReceived
@@ -18,12 +36,19 @@
function onFeedReportReceived(data) {
- var tput = data["value"];
- if (tput == null) {
- clearInterval(buildGraph);
- } else {
- feedSeries.append(data["time"], data["value"]);
- }
+ var report = data["value"];
+ var tputArray = report.split("|");
+ var covered = 0;
+ var totalTput = 0;
+ for( var i = 0; i < tputArray.length; i ++){
+ ingestionTimeSeries[i].append(data["time"], tputArray[i]);
+ covered++;
+ totalTput += parseInt(tputArray[i]);
+ }
+ for( var j = covered; j < numIngestionNodes; j++){
+ ingestionTimeSeries[j].append(data["time"], 0);
+ }
+ ingestionTimeSeries[numIngestionNodes].append(data["time"], totalTput);
}
function myYRangeFunction(range) {
@@ -32,22 +57,53 @@
return {min: min, max: max};
}
+ function initTimeline(ingestLocations) {
- function createTimeline() {
- var chart = new SmoothieChart({minValue:0,horizontalLines:[{color:'#ffffff',lineWidth:1,value:0},{color:'#880000',lineWidth:2,value:3333},{color:'#880000',lineWidth:2,value:-3333}]});
- chart.addTimeSeries(feedSeries, { strokeStyle: 'rgba(0, 255, 0, 1)', fillStyle: 'rgba(0, 255, 0, 0.2)', lineWidth: 4 });
- chart.streamTo(document.getElementById("chart"), 500);
+ document.write("<i>" + "Feed Ingestion" + "<i>");
+ document.write("<br />" + "Ingestion Locations: " + ingestLocations);
+ document.write("<br />" + "Compute Locations: " + computeLocations);
+ document.write("<br />" + "Storage Locations: " + storageLocations);
+ document.write("<br />" + "Ingestion Policy: " + ingestionPolicy);
+ document.write("<br />" + "Active since" + activeSince);
+ document.write("<br />");
+ document.write("<br />");
+
+ for( var i = 0; i < numIngestionNodes; i++){
+ graphNames[i] = ingestionNodes[i];
+ }
+
+ if(numIngestionNodes > 1){
+ graphNames[numIngestionNodes] = "IngestionThroughput";
+ drawCanvas(graphNames[numIngestionNodes]);
+ ingestionTimeSeries[numIngestionNodes] = new TimeSeries();
+ drawChart(graphNames[numIngestionNodes], ingestionTimeSeries[numIngestionNodes]);
+ }
+
+ for( var j = 0; j < numIngestionNodes; j++){
+ drawCanvas(graphNames[j]);
+ ingestionTimeSeries[j] = new TimeSeries();
+ drawChart(graphNames[j], ingestionTimeSeries[j]);
+ }
}
- createTimeline();
+
+ function drawCanvas(chartName) {
+ document.write("<br />");
+ document.write("<br />");
+ document.write("<i>" + chartName + "</i>");
+ document.write("<br />");
+ document.write("<canvas id="+ "\"" + chartName + "\"" + " " + "width=\"500\" height=\"250\"></canvas>");
+ }
+
+ function drawChart(chartName, ingestionTimeSeries) {
+ var ingestionChart = new SmoothieChart({ minValue:0, millisPerPixel: 20, grid: { strokeStyle: '#555555', lineWidth: 1, millisPerLine: 1000, verticalSections: 4 }});
+ ingestionChart.addTimeSeries(ingestionTimeSeries, seriesOptions);
+ ingestionChart.streamTo(document.getElementById(chartName, 500));
+ }
+
+ initTimeline(ingestLocations);
});
</script>
</head>
- <body>
-
- <p>Feed Ingestion</p>
-
- <canvas id="chart" width="600" height="300"></canvas>
-
- </body>
+ <body></body>
</html>
diff --git a/asterix-app/src/main/resources/feed/home.html b/asterix-app/src/main/resources/feed/home.html
index 6c74cad..5b1721b 100644
--- a/asterix-app/src/main/resources/feed/home.html
+++ b/asterix-app/src/main/resources/feed/home.html
@@ -65,10 +65,9 @@
<div class="span12">
%s
</div>
-
</div>
</div>
-</div>
+ </div>
<div class="footer">
<section class="line"><hr></section>
<section class="content">
diff --git a/asterix-events/src/main/java/edu/uci/ics/asterix/event/service/AsterixEventServiceUtil.java b/asterix-events/src/main/java/edu/uci/ics/asterix/event/service/AsterixEventServiceUtil.java
index 2129862..7f89cb9 100644
--- a/asterix-events/src/main/java/edu/uci/ics/asterix/event/service/AsterixEventServiceUtil.java
+++ b/asterix-events/src/main/java/edu/uci/ics/asterix/event/service/AsterixEventServiceUtil.java
@@ -496,7 +496,8 @@
boolean conflictFound = false;
AsterixInstance conflictingInstance = null;
for (AsterixInstance existing : existingInstances) {
- conflictFound = existing.getCluster().getMasterNode().getClusterIp().equals(masterIp);
+ conflictFound = !existing.getState().equals(State.INACTIVE)
+ && existing.getCluster().getMasterNode().getClusterIp().equals(masterIp);
if (conflictFound) {
conflictingInstance = existing;
break;
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/FeedActivityTupleTranslator.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/FeedActivityTupleTranslator.java
index 8648f00..bcc16a7 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/FeedActivityTupleTranslator.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/FeedActivityTupleTranslator.java
@@ -114,8 +114,13 @@
activityDetails.put(key, value);
}
+ String feedActivityTimestamp = ((AString) feedActivityRecord
+ .getValueByPos(MetadataRecordTypes.FEED_ACTIVITY_ARECORD_LAST_UPDATE_TIMESTAMP_FIELD_INDEX))
+ .getStringValue();
+
FeedActivity fa = new FeedActivity(dataverseName, feedName, datasetName,
FeedActivityType.valueOf(feedActivityType), activityDetails);
+ fa.setLastUpdatedTimestamp(feedActivityTimestamp);
fa.setActivityId(activityId);
return fa;
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/SuperFeedManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/SuperFeedManager.java
index 6b51332..4d15b8e 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/SuperFeedManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/SuperFeedManager.java
@@ -20,7 +20,9 @@
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
@@ -307,12 +309,14 @@
private final FeedConnectionId feedId;
private boolean process = true;
private final List<LinkedBlockingQueue<String>> subscriptionQueues;
+ private final Map<String, String> ingestionThroughputs;
public SFMessageAnalyzer(LinkedBlockingQueue<String> inbox, FeedConnectionId feedId)
throws UnknownHostException, IOException {
this.inbox = inbox;
this.feedId = feedId;
this.subscriptionQueues = new ArrayList<LinkedBlockingQueue<String>>();
+ this.ingestionThroughputs = new HashMap<String, String>();
String ccHost = AsterixClusterProperties.INSTANCE.getCluster().getMasterNode().getClusterIp();
}
@@ -329,6 +333,7 @@
}
public void run() {
+ StringBuilder finalMessage = new StringBuilder();
while (process) {
try {
String message = inbox.take();
@@ -340,7 +345,15 @@
LOGGER.warning("SuperFeedManager received message " + message);
}
for (LinkedBlockingQueue<String> q : subscriptionQueues) {
- q.add(message);
+ String[] comp = message.split("\\|");
+ String parition = comp[3];
+ String tput = comp[4];
+ ingestionThroughputs.put(parition, tput);
+ for (String tp : ingestionThroughputs.values()) {
+ finalMessage.append(tp + "|");
+ }
+ q.add(finalMessage.toString());
+ finalMessage.delete(0, finalMessage.length());
}
break;
case CONGESTION: