Added IPC statistics to admin console
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@1159 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java
index 3c94b9e..c17acd0 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java
@@ -107,6 +107,14 @@
private final long[] netSignalingBytesWritten;
+ private final long[] ipcMessagesSent;
+
+ private final long[] ipcMessageBytesSent;
+
+ private final long[] ipcMessagesReceived;
+
+ private final long[] ipcMessageBytesReceived;
+
private int rrdPtr;
private int lastHeartbeatDuration;
@@ -156,6 +164,11 @@
netPayloadBytesWritten = new long[RRD_SIZE];
netSignalingBytesRead = new long[RRD_SIZE];
netSignalingBytesWritten = new long[RRD_SIZE];
+ ipcMessagesSent = new long[RRD_SIZE];
+ ipcMessageBytesSent = new long[RRD_SIZE];
+ ipcMessagesReceived = new long[RRD_SIZE];
+ ipcMessageBytesReceived = new long[RRD_SIZE];
+
rrdPtr = 0;
}
@@ -183,6 +196,10 @@
netPayloadBytesWritten[rrdPtr] = hbData.netPayloadBytesWritten;
netSignalingBytesRead[rrdPtr] = hbData.netSignalingBytesRead;
netSignalingBytesWritten[rrdPtr] = hbData.netSignalingBytesWritten;
+ ipcMessagesSent[rrdPtr] = hbData.ipcMessagesSent;
+ ipcMessageBytesSent[rrdPtr] = hbData.ipcMessageBytesSent;
+ ipcMessagesReceived[rrdPtr] = hbData.ipcMessagesReceived;
+ ipcMessageBytesReceived[rrdPtr] = hbData.ipcMessageBytesReceived;
rrdPtr = (rrdPtr + 1) % RRD_SIZE;
}
@@ -254,6 +271,10 @@
o.put("net-payload-bytes-written", netPayloadBytesWritten);
o.put("net-signaling-bytes-read", netSignalingBytesRead);
o.put("net-signaling-bytes-written", netSignalingBytesWritten);
+ o.put("ipc-messages-sent", ipcMessagesSent);
+ o.put("ipc-message-bytes-sent", ipcMessageBytesSent);
+ o.put("ipc-messages-received", ipcMessagesReceived);
+ o.put("ipc-message-bytes-received", ipcMessageBytesReceived);
return o;
}
diff --git a/hyracks-control-cc/src/main/resources/edu/uci/ics/hyracks/control/cc/adminconsole/pages/NodeDetailsPage.html b/hyracks-control-cc/src/main/resources/edu/uci/ics/hyracks/control/cc/adminconsole/pages/NodeDetailsPage.html
index 6a765e4..1d8f735 100644
--- a/hyracks-control-cc/src/main/resources/edu/uci/ics/hyracks/control/cc/adminconsole/pages/NodeDetailsPage.html
+++ b/hyracks-control-cc/src/main/resources/edu/uci/ics/hyracks/control/cc/adminconsole/pages/NodeDetailsPage.html
@@ -7,6 +7,7 @@
<li><a href="#tabs-4">Memory Usage</a></li>
<li><a href="#tabs-5">Thread Statistics</a></li>
<li><a href="#tabs-6">Network Statistics</a></li>
+ <li><a href="#tabs-7">IPC Statistics</a></li>
</ul>
<div id="tabs-1">
@@ -54,6 +55,13 @@
<div id='net-signaling-bandwidth' class="time-chart">
</div>
</div>
+
+ <div id="tabs-7">
+ <div id='ipc-messages' class="time-chart">
+ </div>
+ <div id='ipc-message-bytes' class="time-chart">
+ </div>
+ </div>
</div>
<script src="/static/javascript/adminconsole/NodeDetailsPage.js" type="text/javascript"></script>
diff --git a/hyracks-control-cc/src/main/resources/static/javascript/adminconsole/NodeDetailsPage.js b/hyracks-control-cc/src/main/resources/static/javascript/adminconsole/NodeDetailsPage.js
index 1847b32..ff9d8a0 100644
--- a/hyracks-control-cc/src/main/resources/static/javascript/adminconsole/NodeDetailsPage.js
+++ b/hyracks-control-cc/src/main/resources/static/javascript/adminconsole/NodeDetailsPage.js
@@ -15,8 +15,8 @@
}
};
- function computeBandwidth(bytes, rrdPtr) {
- return (bytes[(rrdPtr + 1) % bytes.length] - bytes[rrdPtr]) / 10;
+ function computeRate(array, rrdPtr) {
+ return (array[(rrdPtr + 1) % array.length] - array[rrdPtr]) / 10;
}
function onDataReceived(data) {
@@ -58,6 +58,11 @@
var netPayloadBytesWritten = result['net-payload-bytes-written'];
var netSignalingBytesRead = result['net-signaling-bytes-read'];
var netSignalingBytesWritten = result['net-signaling-bytes-written'];
+ var ipcMessagesSent = result['ipc-messages-sent'];
+ var ipcMessageBytesSent = result['ipc-message-bytes-sent'];
+ var ipcMessagesReceived = result['ipc-messages-received'];
+ var ipcMessageBytesReceived = result['ipc-message-bytes-received'];
+
var sysLoadArray = [];
var heapUsageInitSizesArray = [];
var heapUsageUsedSizesArray = [];
@@ -75,6 +80,10 @@
var netPayloadWriteBWArray = [];
var netSignalingReadBWArray = [];
var netSignalingWriteBWArray = [];
+ var ipcMessageSendRateArray = [];
+ var ipcMessageBytesSendRateArray = [];
+ var ipcMessageReceiveRateArray = [];
+ var ipcMessageBytesReceiveRateArray = [];
var gcChartsDiv = document.getElementById('gc-charts');
for ( var i = 0; i < gcCollectionCounts.length; ++i) {
gcCollectionCountsArray.push([]);
@@ -107,10 +116,14 @@
gcCollectionTimesArray[j].push([ i, gcCollectionTimes[j][rrdPtr] ]);
}
if (i < sysLoad.length - 1) {
- netPayloadReadBWArray.push([ i, computeBandwidth(netPayloadBytesRead, rrdPtr) ]);
- netPayloadWriteBWArray.push([ i, computeBandwidth(netPayloadBytesWritten, rrdPtr) ]);
- netSignalingReadBWArray.push([ i, computeBandwidth(netSignalingBytesRead, rrdPtr) ]);
- netSignalingWriteBWArray.push([ i, computeBandwidth(netSignalingBytesWritten, rrdPtr) ]);
+ netPayloadReadBWArray.push([ i, computeRate(netPayloadBytesRead, rrdPtr) ]);
+ netPayloadWriteBWArray.push([ i, computeRate(netPayloadBytesWritten, rrdPtr) ]);
+ netSignalingReadBWArray.push([ i, computeRate(netSignalingBytesRead, rrdPtr) ]);
+ netSignalingWriteBWArray.push([ i, computeRate(netSignalingBytesWritten, rrdPtr) ]);
+ ipcMessageSendRateArray.push([ i, computeRate(ipcMessagesSent, rrdPtr) ]);
+ ipcMessageBytesSendRateArray.push([ i, computeRate(ipcMessageBytesSent, rrdPtr) ]);
+ ipcMessageReceiveRateArray.push([ i, computeRate(ipcMessagesReceived, rrdPtr) ]);
+ ipcMessageBytesReceiveRateArray.push([ i, computeRate(ipcMessageBytesReceived, rrdPtr) ]);
}
rrdPtr = (rrdPtr + 1) % sysLoad.length;
}
@@ -186,6 +199,22 @@
label : 'Signaling Write Bandwidth (bytes/sec)',
data : netSignalingWriteBWArray
} ], options);
+
+ $.plot($('#ipc-messages'), [ {
+ label : 'IPC Messages Send Rate (messages/sec)',
+ data : ipcMessageSendRateArray
+ }, {
+ label : 'IPC Messages Receive Rate (messages/sec)',
+ data : ipcMessageReceiveRateArray
+ } ], options);
+
+ $.plot($('#ipc-message-bytes'), [ {
+ label : 'IPC Message Send Bandwidth (bytes/sec)',
+ data : ipcMessageBytesSendRateArray
+ }, {
+ label : 'IPC Message Receive Bandwidth (bytes/sec)',
+ data : ipcMessageBytesReceiveRateArray
+ } ], options);
}
function fetchData() {
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/heartbeat/HeartbeatData.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/heartbeat/HeartbeatData.java
index 8ca0fa4..1dba3bc 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/heartbeat/HeartbeatData.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/heartbeat/HeartbeatData.java
@@ -37,4 +37,8 @@
public long netPayloadBytesWritten;
public long netSignalingBytesRead;
public long netSignalingBytesWritten;
+ public long ipcMessagesSent;
+ public long ipcMessageBytesSent;
+ public long ipcMessagesReceived;
+ public long ipcMessageBytesReceived;
}
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index b3eccb5..c3d3c34 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -69,8 +69,9 @@
import edu.uci.ics.hyracks.control.nc.work.StartTasksWork;
import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
import edu.uci.ics.hyracks.ipc.api.IIPCI;
+import edu.uci.ics.hyracks.ipc.api.IPCPerformanceCounters;
import edu.uci.ics.hyracks.ipc.impl.IPCSystem;
-import edu.uci.ics.hyracks.net.protocols.muxdemux.PerformanceCounters;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.MuxDemuxPerformanceCounters;
public class NodeControllerService extends AbstractRemoteService {
private static Logger LOGGER = Logger.getLogger(NodeControllerService.class.getName());
@@ -311,11 +312,19 @@
hbData.gcCollectionCounts[i] = gcMXBean.getCollectionCount();
hbData.gcCollectionTimes[i] = gcMXBean.getCollectionTime();
}
- PerformanceCounters netPC = netManager.getPerformanceCounters();
+
+ MuxDemuxPerformanceCounters netPC = netManager.getPerformanceCounters();
hbData.netPayloadBytesRead = netPC.getPayloadBytesRead();
hbData.netPayloadBytesWritten = netPC.getPayloadBytesWritten();
hbData.netSignalingBytesRead = netPC.getSignalingBytesRead();
hbData.netSignalingBytesWritten = netPC.getSignalingBytesWritten();
+
+ IPCPerformanceCounters ipcPC = ipc.getPerformanceCounters();
+ hbData.ipcMessagesSent = ipcPC.getMessageSentCount();
+ hbData.ipcMessageBytesSent = ipcPC.getMessageBytesSent();
+ hbData.ipcMessagesReceived = ipcPC.getMessageReceivedCount();
+ hbData.ipcMessageBytesReceived = ipcPC.getMessageBytesReceived();
+
try {
cc.nodeHeartbeat(id, hbData);
} catch (Exception e) {
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java
index 1a0a820..68e3120 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java
@@ -35,7 +35,7 @@
import edu.uci.ics.hyracks.net.protocols.muxdemux.IChannelOpenListener;
import edu.uci.ics.hyracks.net.protocols.muxdemux.MultiplexedConnection;
import edu.uci.ics.hyracks.net.protocols.muxdemux.MuxDemux;
-import edu.uci.ics.hyracks.net.protocols.muxdemux.PerformanceCounters;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.MuxDemuxPerformanceCounters;
public class NetworkManager {
private static final Logger LOGGER = Logger.getLogger(NetworkManager.class.getName());
@@ -128,7 +128,7 @@
return new PartitionId(jobId, cdid, senderIndex, receiverIndex);
}
- public PerformanceCounters getPerformanceCounters() {
+ public MuxDemuxPerformanceCounters getPerformanceCounters() {
return md.getPerformanceCounters();
}
}
\ No newline at end of file
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IPCPerformanceCounters.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IPCPerformanceCounters.java
new file mode 100644
index 0000000..1873378
--- /dev/null
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IPCPerformanceCounters.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.ipc.api;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+public class IPCPerformanceCounters {
+ private final AtomicLong nMessagesSent;
+
+ private final AtomicLong nMessageBytesSent;
+
+ private final AtomicLong nMessagesReceived;
+
+ private final AtomicLong nMessageBytesReceived;
+
+ public IPCPerformanceCounters() {
+ nMessagesSent = new AtomicLong();
+ nMessageBytesSent = new AtomicLong();
+ nMessagesReceived = new AtomicLong();
+ nMessageBytesReceived = new AtomicLong();
+ }
+
+ public long getMessageSentCount() {
+ return nMessagesSent.get();
+ }
+
+ public void addMessageSentCount(long delta) {
+ nMessagesSent.addAndGet(delta);
+ }
+
+ public long getMessageBytesSent() {
+ return nMessageBytesSent.get();
+ }
+
+ public void addMessageBytesSent(long delta) {
+ nMessageBytesSent.addAndGet(delta);
+ }
+
+ public long getMessageReceivedCount() {
+ return nMessagesReceived.get();
+ }
+
+ public void addMessageReceivedCount(long delta) {
+ nMessagesReceived.addAndGet(delta);
+ }
+
+ public long getMessageBytesReceived() {
+ return nMessageBytesReceived.get();
+ }
+
+ public void addMessageBytesReceived(long delta) {
+ nMessageBytesReceived.addAndGet(delta);
+ }
+}
\ No newline at end of file
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java
index 6f62760..8e42d53 100644
--- a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java
@@ -206,6 +206,7 @@
boolean success = msg.write(buffer);
buffer.flip();
if (success) {
+ system.getPerformanceCounters().addMessageSentCount(1);
SelectionKey key = handle.getKey();
key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
} else {
@@ -235,6 +236,7 @@
IPCHandle handle = (IPCHandle) key.attachment();
ByteBuffer readBuffer = handle.getInBuffer();
int len = channel.read(readBuffer);
+ system.getPerformanceCounters().addMessageBytesReceived(len);
if (len < 0) {
key.cancel();
channel.close();
@@ -250,6 +252,7 @@
IPCHandle handle = (IPCHandle) key.attachment();
ByteBuffer writeBuffer = handle.getOutBuffer();
int len = channel.write(writeBuffer);
+ system.getPerformanceCounters().addMessageBytesSent(len);
if (len < 0) {
key.cancel();
channel.close();
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCHandle.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCHandle.java
index 3d3bc7a..749afe2 100644
--- a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCHandle.java
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCHandle.java
@@ -141,6 +141,7 @@
message.setFlag(Message.ERROR);
message.setPayload(e);
}
+ system.getPerformanceCounters().addMessageReceivedCount(1);
if (state == HandleState.CONNECT_RECEIVED) {
remoteAddress = (InetSocketAddress) message.getPayload();
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCSystem.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCSystem.java
index 6c9c82c..d7e383d 100644
--- a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCSystem.java
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCSystem.java
@@ -20,6 +20,7 @@
import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
import edu.uci.ics.hyracks.ipc.api.IIPCI;
+import edu.uci.ics.hyracks.ipc.api.IPCPerformanceCounters;
import edu.uci.ics.hyracks.ipc.api.IPayloadSerializerDeserializer;
import edu.uci.ics.hyracks.ipc.exceptions.IPCException;
@@ -32,12 +33,15 @@
private final AtomicLong midFactory;
+ private final IPCPerformanceCounters perfCounters;
+
public IPCSystem(InetSocketAddress socketAddress, IIPCI ipci, IPayloadSerializerDeserializer serde)
throws IOException {
cMgr = new IPCConnectionManager(this, socketAddress);
this.ipci = ipci;
this.serde = serde;
midFactory = new AtomicLong();
+ perfCounters = new IPCPerformanceCounters();
}
public InetSocketAddress getSocketAddress() {
@@ -82,4 +86,8 @@
IPCConnectionManager getConnectionManager() {
return cMgr;
}
+
+ public IPCPerformanceCounters getPerformanceCounters() {
+ return perfCounters;
+ }
}
\ No newline at end of file
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemux.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemux.java
index b041e2c..64ef6ae7 100644
--- a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemux.java
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemux.java
@@ -32,7 +32,7 @@
private final TCPEndpoint tcpEndpoint;
- private final PerformanceCounters perfCounters;
+ private final MuxDemuxPerformanceCounters perfCounters;
public MuxDemux(InetSocketAddress localAddress, IChannelOpenListener listener, int nThreads) {
this.localAddress = localAddress;
@@ -59,7 +59,7 @@
connection.setAttachment(mConn);
}
}, nThreads);
- perfCounters = new PerformanceCounters();
+ perfCounters = new MuxDemuxPerformanceCounters();
}
public void start() throws IOException {
@@ -88,7 +88,7 @@
return tcpEndpoint.getLocalAddress();
}
- public PerformanceCounters getPerformanceCounters() {
+ public MuxDemuxPerformanceCounters getPerformanceCounters() {
return perfCounters;
}
}
\ No newline at end of file
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/PerformanceCounters.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemuxPerformanceCounters.java
similarity index 95%
rename from hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/PerformanceCounters.java
rename to hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemuxPerformanceCounters.java
index a203f06..6bf21aa 100644
--- a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/PerformanceCounters.java
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemuxPerformanceCounters.java
@@ -16,7 +16,7 @@
import java.util.concurrent.atomic.AtomicLong;
-public class PerformanceCounters {
+public class MuxDemuxPerformanceCounters {
private final AtomicLong payloadBytesRead;
private final AtomicLong payloadBytesWritten;
@@ -25,7 +25,7 @@
private final AtomicLong signalingBytesWritten;
- public PerformanceCounters() {
+ public MuxDemuxPerformanceCounters() {
payloadBytesRead = new AtomicLong();
payloadBytesWritten = new AtomicLong();
signalingBytesRead = new AtomicLong();