Added IPC statistics to admin console

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@1159 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java
index 3c94b9e..c17acd0 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java
@@ -107,6 +107,14 @@
 
     private final long[] netSignalingBytesWritten;
 
+    private final long[] ipcMessagesSent;
+
+    private final long[] ipcMessageBytesSent;
+
+    private final long[] ipcMessagesReceived;
+
+    private final long[] ipcMessageBytesReceived;
+
     private int rrdPtr;
 
     private int lastHeartbeatDuration;
@@ -156,6 +164,11 @@
         netPayloadBytesWritten = new long[RRD_SIZE];
         netSignalingBytesRead = new long[RRD_SIZE];
         netSignalingBytesWritten = new long[RRD_SIZE];
+        ipcMessagesSent = new long[RRD_SIZE];
+        ipcMessageBytesSent = new long[RRD_SIZE];
+        ipcMessagesReceived = new long[RRD_SIZE];
+        ipcMessageBytesReceived = new long[RRD_SIZE];
+
         rrdPtr = 0;
     }
 
@@ -183,6 +196,10 @@
         netPayloadBytesWritten[rrdPtr] = hbData.netPayloadBytesWritten;
         netSignalingBytesRead[rrdPtr] = hbData.netSignalingBytesRead;
         netSignalingBytesWritten[rrdPtr] = hbData.netSignalingBytesWritten;
+        ipcMessagesSent[rrdPtr] = hbData.ipcMessagesSent;
+        ipcMessageBytesSent[rrdPtr] = hbData.ipcMessageBytesSent;
+        ipcMessagesReceived[rrdPtr] = hbData.ipcMessagesReceived;
+        ipcMessageBytesReceived[rrdPtr] = hbData.ipcMessageBytesReceived;
         rrdPtr = (rrdPtr + 1) % RRD_SIZE;
     }
 
@@ -254,6 +271,10 @@
         o.put("net-payload-bytes-written", netPayloadBytesWritten);
         o.put("net-signaling-bytes-read", netSignalingBytesRead);
         o.put("net-signaling-bytes-written", netSignalingBytesWritten);
+        o.put("ipc-messages-sent", ipcMessagesSent);
+        o.put("ipc-message-bytes-sent", ipcMessageBytesSent);
+        o.put("ipc-messages-received", ipcMessagesReceived);
+        o.put("ipc-message-bytes-received", ipcMessageBytesReceived);
 
         return o;
     }
diff --git a/hyracks-control-cc/src/main/resources/edu/uci/ics/hyracks/control/cc/adminconsole/pages/NodeDetailsPage.html b/hyracks-control-cc/src/main/resources/edu/uci/ics/hyracks/control/cc/adminconsole/pages/NodeDetailsPage.html
index 6a765e4..1d8f735 100644
--- a/hyracks-control-cc/src/main/resources/edu/uci/ics/hyracks/control/cc/adminconsole/pages/NodeDetailsPage.html
+++ b/hyracks-control-cc/src/main/resources/edu/uci/ics/hyracks/control/cc/adminconsole/pages/NodeDetailsPage.html
@@ -7,6 +7,7 @@
             <li><a href="#tabs-4">Memory Usage</a></li>
             <li><a href="#tabs-5">Thread Statistics</a></li>
             <li><a href="#tabs-6">Network Statistics</a></li>
+            <li><a href="#tabs-7">IPC Statistics</a></li>
         </ul>
         
         <div id="tabs-1">
@@ -54,6 +55,13 @@
             <div id='net-signaling-bandwidth' class="time-chart">
             </div>
         </div>
+
+        <div id="tabs-7">
+            <div id='ipc-messages' class="time-chart">
+            </div>
+            <div id='ipc-message-bytes' class="time-chart">
+            </div>
+        </div>
     </div>
 
     <script src="/static/javascript/adminconsole/NodeDetailsPage.js" type="text/javascript"></script>
diff --git a/hyracks-control-cc/src/main/resources/static/javascript/adminconsole/NodeDetailsPage.js b/hyracks-control-cc/src/main/resources/static/javascript/adminconsole/NodeDetailsPage.js
index 1847b32..ff9d8a0 100644
--- a/hyracks-control-cc/src/main/resources/static/javascript/adminconsole/NodeDetailsPage.js
+++ b/hyracks-control-cc/src/main/resources/static/javascript/adminconsole/NodeDetailsPage.js
@@ -15,8 +15,8 @@
         }
     };
 
-    function computeBandwidth(bytes, rrdPtr) {
-        return (bytes[(rrdPtr + 1) % bytes.length] - bytes[rrdPtr]) / 10;
+    function computeRate(array, rrdPtr) {
+        return (array[(rrdPtr + 1) % array.length] - array[rrdPtr]) / 10;
     }
 
     function onDataReceived(data) {
@@ -58,6 +58,11 @@
         var netPayloadBytesWritten = result['net-payload-bytes-written'];
         var netSignalingBytesRead = result['net-signaling-bytes-read'];
         var netSignalingBytesWritten = result['net-signaling-bytes-written'];
+        var ipcMessagesSent = result['ipc-messages-sent'];
+        var ipcMessageBytesSent = result['ipc-message-bytes-sent'];
+        var ipcMessagesReceived = result['ipc-messages-received'];
+        var ipcMessageBytesReceived = result['ipc-message-bytes-received'];
+
         var sysLoadArray = [];
         var heapUsageInitSizesArray = [];
         var heapUsageUsedSizesArray = [];
@@ -75,6 +80,10 @@
         var netPayloadWriteBWArray = [];
         var netSignalingReadBWArray = [];
         var netSignalingWriteBWArray = [];
+        var ipcMessageSendRateArray = [];
+        var ipcMessageBytesSendRateArray = [];
+        var ipcMessageReceiveRateArray = [];
+        var ipcMessageBytesReceiveRateArray = [];
         var gcChartsDiv = document.getElementById('gc-charts');
         for ( var i = 0; i < gcCollectionCounts.length; ++i) {
             gcCollectionCountsArray.push([]);
@@ -107,10 +116,14 @@
                 gcCollectionTimesArray[j].push([ i, gcCollectionTimes[j][rrdPtr] ]);
             }
             if (i < sysLoad.length - 1) {
-                netPayloadReadBWArray.push([ i, computeBandwidth(netPayloadBytesRead, rrdPtr) ]);
-                netPayloadWriteBWArray.push([ i, computeBandwidth(netPayloadBytesWritten, rrdPtr) ]);
-                netSignalingReadBWArray.push([ i, computeBandwidth(netSignalingBytesRead, rrdPtr) ]);
-                netSignalingWriteBWArray.push([ i, computeBandwidth(netSignalingBytesWritten, rrdPtr) ]);
+                netPayloadReadBWArray.push([ i, computeRate(netPayloadBytesRead, rrdPtr) ]);
+                netPayloadWriteBWArray.push([ i, computeRate(netPayloadBytesWritten, rrdPtr) ]);
+                netSignalingReadBWArray.push([ i, computeRate(netSignalingBytesRead, rrdPtr) ]);
+                netSignalingWriteBWArray.push([ i, computeRate(netSignalingBytesWritten, rrdPtr) ]);
+                ipcMessageSendRateArray.push([ i, computeRate(ipcMessagesSent, rrdPtr) ]);
+                ipcMessageBytesSendRateArray.push([ i, computeRate(ipcMessageBytesSent, rrdPtr) ]);
+                ipcMessageReceiveRateArray.push([ i, computeRate(ipcMessagesReceived, rrdPtr) ]);
+                ipcMessageBytesReceiveRateArray.push([ i, computeRate(ipcMessageBytesReceived, rrdPtr) ]);
             }
             rrdPtr = (rrdPtr + 1) % sysLoad.length;
         }
@@ -186,6 +199,22 @@
             label : 'Signaling Write Bandwidth (bytes/sec)',
             data : netSignalingWriteBWArray
         } ], options);
+
+        $.plot($('#ipc-messages'), [ {
+            label : 'IPC Messages Send Rate (messages/sec)',
+            data : ipcMessageSendRateArray
+        }, {
+            label : 'IPC Messages Receive Rate (messages/sec)',
+            data : ipcMessageReceiveRateArray
+        } ], options);
+
+        $.plot($('#ipc-message-bytes'), [ {
+            label : 'IPC Message Send Bandwidth (bytes/sec)',
+            data : ipcMessageBytesSendRateArray
+        }, {
+            label : 'IPC Message Receive Bandwidth (bytes/sec)',
+            data : ipcMessageBytesReceiveRateArray
+        } ], options);
     }
 
     function fetchData() {
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/heartbeat/HeartbeatData.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/heartbeat/HeartbeatData.java
index 8ca0fa4..1dba3bc 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/heartbeat/HeartbeatData.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/heartbeat/HeartbeatData.java
@@ -37,4 +37,8 @@
     public long netPayloadBytesWritten;
     public long netSignalingBytesRead;
     public long netSignalingBytesWritten;
+    public long ipcMessagesSent;
+    public long ipcMessageBytesSent;
+    public long ipcMessagesReceived;
+    public long ipcMessageBytesReceived;
 }
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index b3eccb5..c3d3c34 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -69,8 +69,9 @@
 import edu.uci.ics.hyracks.control.nc.work.StartTasksWork;
 import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
 import edu.uci.ics.hyracks.ipc.api.IIPCI;
+import edu.uci.ics.hyracks.ipc.api.IPCPerformanceCounters;
 import edu.uci.ics.hyracks.ipc.impl.IPCSystem;
-import edu.uci.ics.hyracks.net.protocols.muxdemux.PerformanceCounters;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.MuxDemuxPerformanceCounters;
 
 public class NodeControllerService extends AbstractRemoteService {
     private static Logger LOGGER = Logger.getLogger(NodeControllerService.class.getName());
@@ -311,11 +312,19 @@
                 hbData.gcCollectionCounts[i] = gcMXBean.getCollectionCount();
                 hbData.gcCollectionTimes[i] = gcMXBean.getCollectionTime();
             }
-            PerformanceCounters netPC = netManager.getPerformanceCounters();
+
+            MuxDemuxPerformanceCounters netPC = netManager.getPerformanceCounters();
             hbData.netPayloadBytesRead = netPC.getPayloadBytesRead();
             hbData.netPayloadBytesWritten = netPC.getPayloadBytesWritten();
             hbData.netSignalingBytesRead = netPC.getSignalingBytesRead();
             hbData.netSignalingBytesWritten = netPC.getSignalingBytesWritten();
+
+            IPCPerformanceCounters ipcPC = ipc.getPerformanceCounters();
+            hbData.ipcMessagesSent = ipcPC.getMessageSentCount();
+            hbData.ipcMessageBytesSent = ipcPC.getMessageBytesSent();
+            hbData.ipcMessagesReceived = ipcPC.getMessageReceivedCount();
+            hbData.ipcMessageBytesReceived = ipcPC.getMessageBytesReceived();
+
             try {
                 cc.nodeHeartbeat(id, hbData);
             } catch (Exception e) {
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java
index 1a0a820..68e3120 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java
@@ -35,7 +35,7 @@
 import edu.uci.ics.hyracks.net.protocols.muxdemux.IChannelOpenListener;
 import edu.uci.ics.hyracks.net.protocols.muxdemux.MultiplexedConnection;
 import edu.uci.ics.hyracks.net.protocols.muxdemux.MuxDemux;
-import edu.uci.ics.hyracks.net.protocols.muxdemux.PerformanceCounters;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.MuxDemuxPerformanceCounters;
 
 public class NetworkManager {
     private static final Logger LOGGER = Logger.getLogger(NetworkManager.class.getName());
@@ -128,7 +128,7 @@
         return new PartitionId(jobId, cdid, senderIndex, receiverIndex);
     }
 
-    public PerformanceCounters getPerformanceCounters() {
+    public MuxDemuxPerformanceCounters getPerformanceCounters() {
         return md.getPerformanceCounters();
     }
 }
\ No newline at end of file
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IPCPerformanceCounters.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IPCPerformanceCounters.java
new file mode 100644
index 0000000..1873378
--- /dev/null
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IPCPerformanceCounters.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.ipc.api;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+public class IPCPerformanceCounters {
+    private final AtomicLong nMessagesSent;
+
+    private final AtomicLong nMessageBytesSent;
+
+    private final AtomicLong nMessagesReceived;
+
+    private final AtomicLong nMessageBytesReceived;
+
+    public IPCPerformanceCounters() {
+        nMessagesSent = new AtomicLong();
+        nMessageBytesSent = new AtomicLong();
+        nMessagesReceived = new AtomicLong();
+        nMessageBytesReceived = new AtomicLong();
+    }
+
+    public long getMessageSentCount() {
+        return nMessagesSent.get();
+    }
+
+    public void addMessageSentCount(long delta) {
+        nMessagesSent.addAndGet(delta);
+    }
+
+    public long getMessageBytesSent() {
+        return nMessageBytesSent.get();
+    }
+
+    public void addMessageBytesSent(long delta) {
+        nMessageBytesSent.addAndGet(delta);
+    }
+
+    public long getMessageReceivedCount() {
+        return nMessagesReceived.get();
+    }
+
+    public void addMessageReceivedCount(long delta) {
+        nMessagesReceived.addAndGet(delta);
+    }
+
+    public long getMessageBytesReceived() {
+        return nMessageBytesReceived.get();
+    }
+
+    public void addMessageBytesReceived(long delta) {
+        nMessageBytesReceived.addAndGet(delta);
+    }
+}
\ No newline at end of file
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java
index 6f62760..8e42d53 100644
--- a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java
@@ -206,6 +206,7 @@
                                         boolean success = msg.write(buffer);
                                         buffer.flip();
                                         if (success) {
+                                            system.getPerformanceCounters().addMessageSentCount(1);
                                             SelectionKey key = handle.getKey();
                                             key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
                                         } else {
@@ -235,6 +236,7 @@
                                 IPCHandle handle = (IPCHandle) key.attachment();
                                 ByteBuffer readBuffer = handle.getInBuffer();
                                 int len = channel.read(readBuffer);
+                                system.getPerformanceCounters().addMessageBytesReceived(len);
                                 if (len < 0) {
                                     key.cancel();
                                     channel.close();
@@ -250,6 +252,7 @@
                                 IPCHandle handle = (IPCHandle) key.attachment();
                                 ByteBuffer writeBuffer = handle.getOutBuffer();
                                 int len = channel.write(writeBuffer);
+                                system.getPerformanceCounters().addMessageBytesSent(len);
                                 if (len < 0) {
                                     key.cancel();
                                     channel.close();
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCHandle.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCHandle.java
index 3d3bc7a..749afe2 100644
--- a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCHandle.java
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCHandle.java
@@ -141,6 +141,7 @@
                 message.setFlag(Message.ERROR);
                 message.setPayload(e);
             }
+            system.getPerformanceCounters().addMessageReceivedCount(1);
 
             if (state == HandleState.CONNECT_RECEIVED) {
                 remoteAddress = (InetSocketAddress) message.getPayload();
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCSystem.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCSystem.java
index 6c9c82c..d7e383d 100644
--- a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCSystem.java
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCSystem.java
@@ -20,6 +20,7 @@
 
 import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
 import edu.uci.ics.hyracks.ipc.api.IIPCI;
+import edu.uci.ics.hyracks.ipc.api.IPCPerformanceCounters;
 import edu.uci.ics.hyracks.ipc.api.IPayloadSerializerDeserializer;
 import edu.uci.ics.hyracks.ipc.exceptions.IPCException;
 
@@ -32,12 +33,15 @@
 
     private final AtomicLong midFactory;
 
+    private final IPCPerformanceCounters perfCounters;
+
     public IPCSystem(InetSocketAddress socketAddress, IIPCI ipci, IPayloadSerializerDeserializer serde)
             throws IOException {
         cMgr = new IPCConnectionManager(this, socketAddress);
         this.ipci = ipci;
         this.serde = serde;
         midFactory = new AtomicLong();
+        perfCounters = new IPCPerformanceCounters();
     }
 
     public InetSocketAddress getSocketAddress() {
@@ -82,4 +86,8 @@
     IPCConnectionManager getConnectionManager() {
         return cMgr;
     }
+
+    public IPCPerformanceCounters getPerformanceCounters() {
+        return perfCounters;
+    }
 }
\ No newline at end of file
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemux.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemux.java
index b041e2c..64ef6ae7 100644
--- a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemux.java
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemux.java
@@ -32,7 +32,7 @@
 
     private final TCPEndpoint tcpEndpoint;
 
-    private final PerformanceCounters perfCounters;
+    private final MuxDemuxPerformanceCounters perfCounters;
 
     public MuxDemux(InetSocketAddress localAddress, IChannelOpenListener listener, int nThreads) {
         this.localAddress = localAddress;
@@ -59,7 +59,7 @@
                 connection.setAttachment(mConn);
             }
         }, nThreads);
-        perfCounters = new PerformanceCounters();
+        perfCounters = new MuxDemuxPerformanceCounters();
     }
 
     public void start() throws IOException {
@@ -88,7 +88,7 @@
         return tcpEndpoint.getLocalAddress();
     }
 
-    public PerformanceCounters getPerformanceCounters() {
+    public MuxDemuxPerformanceCounters getPerformanceCounters() {
         return perfCounters;
     }
 }
\ No newline at end of file
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/PerformanceCounters.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemuxPerformanceCounters.java
similarity index 95%
rename from hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/PerformanceCounters.java
rename to hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemuxPerformanceCounters.java
index a203f06..6bf21aa 100644
--- a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/PerformanceCounters.java
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemuxPerformanceCounters.java
@@ -16,7 +16,7 @@
 
 import java.util.concurrent.atomic.AtomicLong;
 
-public class PerformanceCounters {
+public class MuxDemuxPerformanceCounters {
     private final AtomicLong payloadBytesRead;
 
     private final AtomicLong payloadBytesWritten;
@@ -25,7 +25,7 @@
 
     private final AtomicLong signalingBytesWritten;
 
-    public PerformanceCounters() {
+    public MuxDemuxPerformanceCounters() {
         payloadBytesRead = new AtomicLong();
         payloadBytesWritten = new AtomicLong();
         signalingBytesRead = new AtomicLong();