1) minor fixes to feed management console 2) added provision for specifying (min,max) tps to the twitter firehose adaptor
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedDataProviderServlet.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedDataProviderServlet.java
index a032db4..a89b6d5 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedDataProviderServlet.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedDataProviderServlet.java
@@ -63,7 +63,6 @@
}
} else {
obj = verifyIfFeedIsAlive(dataverseName, feedName, datasetName);
- // do null check
}
PrintWriter out = response.getWriter();
@@ -75,7 +74,7 @@
LinkedBlockingQueue<String> queue = FeedLifecycleListener.INSTANCE.getFeedReportQueue(feedId);
String report = null;
try {
- report = queue.poll(30, TimeUnit.SECONDS);
+ report = queue.poll(5, TimeUnit.SECONDS);
} catch (Exception e) {
e.printStackTrace();
}
@@ -83,35 +82,33 @@
}
private JSONObject verifyIfFeedIsAlive(String dataverseName, String feedName, String datasetName) {
- JSONObject obj = null;
+ JSONObject obj = new JSONObject();
try {
MetadataTransactionContext ctx = MetadataManager.INSTANCE.beginTransaction();
List<FeedActivity> feedActivities = MetadataManager.INSTANCE
.getActiveFeeds(ctx, dataverseName, datasetName);
- boolean active = false;
- FeedActivity activity = null;
- for (FeedActivity feedActivity : feedActivities) {
- active = (feedActivity.getFeedName().equals(feedName) && feedActivity.getActivityType().equals(
- FeedActivityType.FEED_BEGIN));
- if (active) {
- activity = feedActivity;
+ FeedConnectionId feedId = new FeedConnectionId(dataverseName, feedName, datasetName);
+ FeedActivity activity = MetadataManager.INSTANCE.getRecentActivityOnFeedConnection(ctx, feedId, null);
+ switch(activity.getActivityType()){
+ case FEED_BEGIN:
+ Map<String, String> activityDetails = activity.getFeedActivityDetails();
+ String ingestLocations = activityDetails.get(FeedActivityDetails.INGEST_LOCATIONS);
+ String computeLocations = activityDetails.get(FeedActivityDetails.COMPUTE_LOCATIONS);
+ String storageLocations = activityDetails.get(FeedActivityDetails.STORAGE_LOCATIONS);
+ obj.put("status", "active");
+ obj.put("type", "reload");
+ obj.put("ingestLocations", ingestLocations);
+ obj.put("computeLocations", computeLocations);
+ obj.put("storageLocations", storageLocations);
+ System.out.println(" RE LOADING " + " ingestion at " + ingestLocations + " compute at "
+ + computeLocations + " storage at " + storageLocations);
break;
- }
- }
- if (active) {
- Map<String, String> activityDetails = activity.getFeedActivityDetails();
- String ingestLocations = activityDetails.get(FeedActivityDetails.INGEST_LOCATIONS);
- String computeLocations = activityDetails.get(FeedActivityDetails.COMPUTE_LOCATIONS);
- String storageLocations = activityDetails.get(FeedActivityDetails.STORAGE_LOCATIONS);
- obj = new JSONObject();
- obj.put("type", "reload");
- obj.put("ingestLocations", ingestLocations);
- obj.put("computeLocations", computeLocations);
- obj.put("storageLocations", storageLocations);
- System.out.println(" RE LOADING " + " ingestion at " + ingestLocations + " compute at "
- + computeLocations + " storage at " + storageLocations);
- } else {
- obj = null;
+ case FEED_FAILURE:
+ obj.put("status", "failed");
+ break;
+ case FEED_END:
+ obj.put("status", "ended");
+ break;
}
} catch (Exception e) {
// ignore
diff --git a/asterix-app/src/main/resources/feed/dashboard.html b/asterix-app/src/main/resources/feed/dashboard.html
index 97a8576..a5f1813 100644
--- a/asterix-app/src/main/resources/feed/dashboard.html
+++ b/asterix-app/src/main/resources/feed/dashboard.html
@@ -5,7 +5,7 @@
<script type="text/javascript">
$(document).ready(function() {
var feedSeries = new TimeSeries();
-
+ var state = "ACTIVE";
var dataverse = "%s";
var dataset = "%s";
var feed = "%s";
@@ -24,7 +24,7 @@
var graphNames = new Array();
$.ajaxSetup({ cache: false });
- setInterval(fetchFeedReport, 500);
+ setInterval(fetchFeedReport, 5000);
function fetchFeedReport() {
$.ajax({
url: '/feed/data?dataverse=' + dataverse + '&dataset=' + dataset + '&feed=' + feed,
@@ -36,26 +36,36 @@
function onFeedReportReceived(data) {
- var type = data["type"];
- if (type == ("reload")) {
- ingestLocations = data["ingestLocations"];
- computeLocations = data["computeLocations"];
- storageLocations = data["storageLocations"];
+ var status = data["status"];
+ if(status == ("ended")){
+ ingestLocations = " ";
+ computeLocations = " ";
+ storageLocations = " ";
+ ingestionPolicy = " ";
+ state = "INACTIVE";
document.location.reload(true);
} else {
- var report = data["value"];
- var tputArray = report.split("|");
- var covered = 0;
- var totalTput = 0;
- for( var i = 0; i < tputArray.length-1; i ++){
- ingestionTimeSeries[i].append(data["time"], tputArray[i]);
- covered++;
- totalTput += parseInt(tputArray[i]);
- }
- for( var j = covered; j < numIngestionNodes; j++){
- ingestionTimeSeries[j].append(data["time"], 0);
- }
- ingestionTimeSeries[numIngestionNodes].append(data["time"], totalTput);
+ var type = data["type"];
+ if (type == ("reload")) {
+ ingestLocations = data["ingestLocations"];
+ computeLocations = data["computeLocations"];
+ storageLocations = data["storageLocations"];
+ document.location.reload(true);
+ } else {
+ var report = data["value"];
+ var tputArray = report.split("|");
+ var covered = 0;
+ var totalTput = 0;
+ for( var i = 0; i < tputArray.length-1; i ++){
+ ingestionTimeSeries[i].append(data["time"], tputArray[i]);
+ covered++;
+ totalTput += parseInt(tputArray[i]);
+ }
+ for( var j = covered; j < numIngestionNodes; j++){
+ ingestionTimeSeries[j].append(data["time"], 0);
+ }
+ ingestionTimeSeries[numIngestionNodes].append(data["time"], totalTput);
+ }
}
}
@@ -72,7 +82,7 @@
document.write("<br />" + "Compute Locations: " + computeLocations);
document.write("<br />" + "Storage Locations: " + storageLocations);
document.write("<br />" + "Ingestion Policy: " + ingestionPolicy);
- document.write("<br />" + "Active since" + activeSince);
+ document.write("<br />" + "Status: " + state);
document.write("<br />");
document.write("<br />");
@@ -103,7 +113,7 @@
}
function drawChart(chartName, ingestionTimeSeries) {
- var ingestionChart = new SmoothieChart({ minValue:0, millisPerPixel: 20, grid: { strokeStyle: '#555555', lineWidth: 1, millisPerLine: 1000, verticalSections: 10 }});
+ var ingestionChart = new SmoothieChart({ timestampFormatter:SmoothieChart.timeFormatter, interpolation:'linear', minValue:0, millisPerPixel: 20, grid: { strokeStyle: '#555555', lineWidth: 1, millisPerLine: 1000, verticalSections: 10 }});
ingestionChart.addTimeSeries(ingestionTimeSeries, seriesOptions);
ingestionChart.streamTo(document.getElementById(chartName, 500));
}