ensuring proper shutdown of feed (checkpoint 1)
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedServlet.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedServlet.java
index 6936161..65b057f 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedServlet.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedServlet.java
@@ -92,7 +92,11 @@
StringBuilder ldStr = new StringBuilder();
ldStr.append("<br />");
ldStr.append("<br />");
- ldStr.append("Active Feeds");
+ if (lfa == null || lfa.isEmpty()) {
+ ldStr.append("Currently there are no active feeds in the Asterix");
+ } else {
+ ldStr.append("Active Feeds");
+ }
FeedConnectionId feedId = null;
for (FeedActivity feedActivity : lfa) {
feedId = new FeedConnectionId(feedActivity.getDataverseName(), feedActivity.getFeedName(),
diff --git a/asterix-app/src/main/resources/feed/dashboard.html b/asterix-app/src/main/resources/feed/dashboard.html
index 5538705..aa25bd8 100644
--- a/asterix-app/src/main/resources/feed/dashboard.html
+++ b/asterix-app/src/main/resources/feed/dashboard.html
@@ -40,7 +40,7 @@
var tputArray = report.split("|");
var covered = 0;
var totalTput = 0;
- for( var i = 0; i < tputArray.length; i ++){
+ for( var i = 0; i < tputArray.length-1; i ++){
ingestionTimeSeries[i].append(data["time"], tputArray[i]);
covered++;
totalTput += parseInt(tputArray[i]);
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AdapterRuntimeManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AdapterRuntimeManager.java
index e26c16b..44ad1e9 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AdapterRuntimeManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AdapterRuntimeManager.java
@@ -38,6 +38,8 @@
private int partition;
+ private IngestionRuntime ingestionRuntime;
+
public enum State {
/*
* Indicates that data from external source will be pushed downstream for storage
@@ -64,7 +66,7 @@
@Override
public void start() throws Exception {
state = State.ACTIVE_INGESTION;
- FeedRuntime ingestionRuntime = new IngestionRuntime(feedId, partition, FeedRuntimeType.INGESTION, this);
+ ingestionRuntime = new IngestionRuntime(feedId, partition, FeedRuntimeType.INGESTION, this);
FeedManager.INSTANCE.registerFeedRuntime(ingestionRuntime);
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Registered feed runtime manager for " + this.getFeedId());
@@ -191,4 +193,8 @@
return partition;
}
+ public IngestionRuntime getIngestionRuntime() {
+ return ingestionRuntime;
+ }
+
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedFrameWriter.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedFrameWriter.java
index dd46d03..698e2ab 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedFrameWriter.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedFrameWriter.java
@@ -74,9 +74,10 @@
this.collectStatistics = policyEnforcer.getFeedPolicyAccessor().collectStatistics();
if (collectStatistics) {
this.statsOutbox = new LinkedBlockingQueue<Long>();
+ timer = new Timer();
framePushWait = new FramePushWait(nodePushable, FLUSH_THRESHOLD_TIME, feedId, nodeId, feedRuntimeType,
- partition, FLUSH_THRESHOLD_TIME);
- Timer timer = new Timer();
+ partition, FLUSH_THRESHOLD_TIME, timer);
+
timer.scheduleAtFixedRate(framePushWait, 0, FLUSH_THRESHOLD_TIME);
}
this.fta = fta;
@@ -180,7 +181,7 @@
private FeedMessageService mesgService;
public FramePushWait(IOperatorNodePushable nodePushable, long flushThresholdTime, FeedConnectionId feedId,
- String nodeId, FeedRuntimeType feedRuntimeType, int partition, long period) {
+ String nodeId, FeedRuntimeType feedRuntimeType, int partition, long period, Timer timer) {
this.nodePushable = nodePushable;
this.flushThresholdTime = flushThresholdTime;
this.state = State.INTIALIZED;
@@ -346,15 +347,15 @@
@Override
public void fail() throws HyracksDataException {
writer.fail();
- if (timer != null) {
- timer.cancel();
+ if (framePushWait != null) {
+ framePushWait.cancel();
}
}
@Override
public void close() throws HyracksDataException {
- if (timer != null) {
- timer.cancel();
+ if (framePushWait != null) {
+ framePushWait.cancel();
}
writer.close();
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
index 4d44f4c..ca73324 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
@@ -87,6 +87,7 @@
adapterRuntimeMgr.setState(State.ACTIVE_INGESTION);
}
+ ingestionRuntime = adapterRuntimeMgr.getIngestionRuntime();
synchronized (adapterRuntimeMgr) {
while (!adapterRuntimeMgr.getState().equals(State.FINISHED_INGESTION)) {
adapterRuntimeMgr.wait();
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedManager.java
index 227b103..83060f2 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedManager.java
@@ -59,6 +59,9 @@
LOGGER.warning("unknown feed id: " + feedId);
}
} else {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Closing feed runtime manager: " + mgr);
+ }
mgr.close();
}
} catch (Exception e) {
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java
index a2a1546..8838ad4 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java
@@ -67,7 +67,6 @@
LOGGER.info("Terminating ingestion for :" + feedId);
}
}
- FeedManager.INSTANCE.deRegisterFeedRuntime(runtimeId);
break;
case SUPER_FEED_MANAGER_ELECT:
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaOperatorDescriptor.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaOperatorDescriptor.java
index 6523914..78b03ee 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaOperatorDescriptor.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaOperatorDescriptor.java
@@ -190,6 +190,7 @@
@Override
public void close() throws HyracksDataException {
coreOperatorNodePushable.close();
+ FeedManager.INSTANCE.deRegisterFeedRuntime(feedRuntime.getFeedRuntimeId());
}
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedRuntimeManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedRuntimeManager.java
index 8308845..3b2eaaf 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedRuntimeManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedRuntimeManager.java
@@ -40,12 +40,6 @@
public void close() throws IOException {
socketFactory.close();
- if (executorService != null) {
- executorService.shutdownNow();
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Shut down executor service for :" + feedId);
- }
- }
if (messageService != null) {
messageService.stop();
@@ -59,6 +53,13 @@
LOGGER.info("Shut down super feed manager for :" + feedId);
}
}
+
+ if (executorService != null) {
+ executorService.shutdownNow();
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Shut down executor service for :" + feedId);
+ }
+ }
}
public void setSuperFeedManager(SuperFeedManager sfm) throws UnknownHostException, IOException, Exception {