Added performance counters to networing layer. Fixed sequencing of all signaling messages before CHANNEL_CLOSE_ACK.

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@999 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java
index 0b1fea0..2b7563a 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java
@@ -80,6 +80,14 @@
 
     private final long[][] gcCollectionTimes;
 
+    private final long[] netPayloadBytesRead;
+
+    private final long[] netPayloadBytesWritten;
+
+    private final long[] netSignalingBytesRead;
+
+    private final long[] netSignalingBytesWritten;
+
     private int rrdPtr;
 
     private int lastHeartbeatDuration;
@@ -116,6 +124,10 @@
         }
         gcCollectionCounts = new long[gcN][RRD_SIZE];
         gcCollectionTimes = new long[gcN][RRD_SIZE];
+        netPayloadBytesRead = new long[RRD_SIZE];
+        netPayloadBytesWritten = new long[RRD_SIZE];
+        netSignalingBytesRead = new long[RRD_SIZE];
+        netSignalingBytesWritten = new long[RRD_SIZE];
         rrdPtr = 0;
     }
 
@@ -139,6 +151,10 @@
             gcCollectionCounts[i][rrdPtr] = hbData.gcCollectionCounts[i];
             gcCollectionTimes[i][rrdPtr] = hbData.gcCollectionTimes[i];
         }
+        netPayloadBytesRead[rrdPtr] = hbData.netPayloadBytesRead;
+        netPayloadBytesWritten[rrdPtr] = hbData.netPayloadBytesWritten;
+        netSignalingBytesRead[rrdPtr] = hbData.netSignalingBytesRead;
+        netSignalingBytesWritten[rrdPtr] = hbData.netSignalingBytesWritten;
         rrdPtr = (rrdPtr + 1) % RRD_SIZE;
     }
 
@@ -199,6 +215,10 @@
         o.put("gc-names", gcNames);
         o.put("gc-collection-counts", gcCollectionCounts);
         o.put("gc-collection-times", gcCollectionTimes);
+        o.put("net-payload-bytes-read", netPayloadBytesRead);
+        o.put("net-payload-bytes-written", netPayloadBytesWritten);
+        o.put("net-signaling-bytes-read", netSignalingBytesRead);
+        o.put("net-signaling-bytes-written", netSignalingBytesWritten);
 
         return o;
     }
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/heartbeat/HeartbeatData.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/heartbeat/HeartbeatData.java
index cd9086f..8ca0fa4 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/heartbeat/HeartbeatData.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/heartbeat/HeartbeatData.java
@@ -33,4 +33,8 @@
     public double systemLoadAverage;
     public long[] gcCollectionCounts;
     public long[] gcCollectionTimes;
+    public long netPayloadBytesRead;
+    public long netPayloadBytesWritten;
+    public long netSignalingBytesRead;
+    public long netSignalingBytesWritten;
 }
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index d5fc27e..ea18ad7 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -77,6 +77,7 @@
 import edu.uci.ics.hyracks.control.nc.work.StartTasksWork;
 import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
 import edu.uci.ics.hyracks.ipc.impl.IPCSystem;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.PerformanceCounters;
 
 public class NodeControllerService extends AbstractRemoteService implements INodeController {
     private static Logger LOGGER = Logger.getLogger(NodeControllerService.class.getName());
@@ -335,6 +336,11 @@
                 hbData.gcCollectionCounts[i] = gcMXBean.getCollectionCount();
                 hbData.gcCollectionTimes[i] = gcMXBean.getCollectionTime();
             }
+            PerformanceCounters netPC = netManager.getPerformanceCounters();
+            hbData.netPayloadBytesRead = netPC.getPayloadBytesRead();
+            hbData.netPayloadBytesWritten = netPC.getPayloadBytesWritten();
+            hbData.netSignalingBytesRead = netPC.getSignalingBytesRead();
+            hbData.netSignalingBytesWritten = netPC.getSignalingBytesWritten();
             try {
                 cc.nodeHeartbeat(id, hbData);
             } catch (Exception e) {
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java
index 125e737..8e2a8ad 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java
@@ -35,6 +35,7 @@
 import edu.uci.ics.hyracks.net.protocols.muxdemux.IChannelOpenListener;
 import edu.uci.ics.hyracks.net.protocols.muxdemux.MultiplexedConnection;
 import edu.uci.ics.hyracks.net.protocols.muxdemux.MuxDemux;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.PerformanceCounters;
 
 public class NetworkManager {
     private static final Logger LOGGER = Logger.getLogger(NetworkManager.class.getName());
@@ -126,4 +127,8 @@
         int receiverIndex = buffer.getInt();
         return new PartitionId(jobId, cdid, senderIndex, receiverIndex);
     }
+
+    public PerformanceCounters getPerformanceCounters() {
+        return md.getPerformanceCounters();
+    }
 }
\ No newline at end of file
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelControlBlock.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
index a281fe8..99140db 100644
--- a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
@@ -69,6 +69,9 @@
         private final IBufferAcceptor eba = new IBufferAcceptor() {
             @Override
             public void accept(ByteBuffer buffer) {
+                if (remoteClose.get()) {
+                    return;
+                }
                 int delta;
                 synchronized (ChannelControlBlock.this) {
                     riEmptyQueue.add(buffer);
@@ -102,7 +105,7 @@
             return eba;
         }
 
-        int read(SocketChannel sc, int size) throws IOException {
+        int read(SocketChannel sc, int size) throws IOException, NetException {
             while (true) {
                 if (size <= 0) {
                     return size;
@@ -117,6 +120,9 @@
                     int len;
                     try {
                         len = sc.read(ri.currentReadBuffer);
+                        if (len < 0) {
+                            throw new NetException("Socket Closed");
+                        }
                     } finally {
                         ri.currentReadBuffer.limit(ri.currentReadBuffer.capacity());
                     }
@@ -273,7 +279,7 @@
         wi.writeComplete();
     }
 
-    synchronized int read(SocketChannel sc, int size) throws IOException {
+    synchronized int read(SocketChannel sc, int size) throws IOException, NetException {
         return ri.read(sc, size);
     }
 
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelSet.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelSet.java
index ab5fd40..9411d42 100644
--- a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelSet.java
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelSet.java
@@ -140,6 +140,15 @@
         }
     }
 
+    void unmarkPendingCredits(int channelId) {
+        synchronized (mConn) {
+            if (pendingChannelCreditsBitmap.get(channelId)) {
+                pendingChannelCreditsBitmap.clear(channelId);
+                pendingWriteEventsCounter.decrement();
+            }
+        }
+    }
+
     void markPendingWrite(int channelId) {
         synchronized (mConn) {
             assert !pendingChannelWriteBitmap.get(channelId);
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MultiplexedConnection.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
index 7f610de..8a21791 100644
--- a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
@@ -138,6 +138,7 @@
             int len = writeBuffer.remaining();
             if (len > 0) {
                 int written = sc.write(writeBuffer);
+                muxDemux.getPerformanceCounters().addSignalingBytesWritten(written);
                 if (written < len) {
                     return false;
                 }
@@ -149,6 +150,7 @@
                     try {
                         pendingBuffer.limit(pendingWriteSize + pendingBuffer.position());
                         int written = sc.write(pendingBuffer);
+                        muxDemux.getPerformanceCounters().addPayloadBytesWritten(written);
                         pendingWriteSize -= written;
                     } finally {
                         pendingBuffer.limit(oldLimit);
@@ -266,7 +268,11 @@
     void driveReaderStateMachine() throws IOException, NetException {
         SocketChannel sc = tcpConnection.getSocketChannel();
         if (readerState.readBuffer.remaining() > 0) {
-            sc.read(readerState.readBuffer);
+            int read = sc.read(readerState.readBuffer);
+            if (read < 0) {
+                throw new NetException("Socket Closed");
+            }
+            muxDemux.getPerformanceCounters().addSignalingBytesRead(read);
             if (readerState.readBuffer.remaining() > 0) {
                 return;
             }
@@ -289,7 +295,9 @@
                         ccb = cSet.getCCB(readerState.command.getChannelId());
                     }
                     ccb.reportRemoteEOS();
-                    cSet.markEOSAck(ccb.getChannelId());
+                    int channelId = ccb.getChannelId();
+                    cSet.markEOSAck(channelId);
+                    cSet.unmarkPendingCredits(channelId);
                     break;
                 }
                 case CLOSE_CHANNEL_ACK: {
@@ -312,7 +320,9 @@
                         ccb = cSet.getCCB(readerState.command.getChannelId());
                     }
                     ccb.reportRemoteError(readerState.command.getData());
-                    cSet.markEOSAck(ccb.getChannelId());
+                    int channelId = ccb.getChannelId();
+                    cSet.markEOSAck(channelId);
+                    cSet.unmarkPendingCredits(channelId);
                     break;
                 }
                 case OPEN_CHANNEL: {
@@ -326,7 +336,9 @@
             }
         }
         if (readerState.pendingReadSize > 0) {
-            readerState.pendingReadSize = readerState.ccb.read(sc, readerState.pendingReadSize);
+            int newPendingReadSize = readerState.ccb.read(sc, readerState.pendingReadSize);
+            muxDemux.getPerformanceCounters().addPayloadBytesRead(newPendingReadSize - readerState.pendingReadSize);
+            readerState.pendingReadSize = newPendingReadSize;
             if (readerState.pendingReadSize > 0) {
                 return;
             }
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemux.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemux.java
index 74f1be2..1ed73c5 100644
--- a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemux.java
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemux.java
@@ -32,6 +32,8 @@
 
     private final TCPEndpoint tcpEndpoint;
 
+    private final PerformanceCounters perfCounters;
+
     public MuxDemux(InetSocketAddress localAddress, IChannelOpenListener listener) {
         this.localAddress = localAddress;
         this.channelOpenListener = listener;
@@ -57,6 +59,7 @@
                 connection.setAttachment(mConn);
             }
         });
+        perfCounters = new PerformanceCounters();
     }
 
     public void start() throws IOException {
@@ -84,4 +87,8 @@
     public InetSocketAddress getLocalAddress() {
         return tcpEndpoint.getLocalAddress();
     }
+
+    public PerformanceCounters getPerformanceCounters() {
+        return perfCounters;
+    }
 }
\ No newline at end of file
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/PerformanceCounters.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/PerformanceCounters.java
new file mode 100644
index 0000000..a203f06
--- /dev/null
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/PerformanceCounters.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.net.protocols.muxdemux;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+public class PerformanceCounters {
+    private final AtomicLong payloadBytesRead;
+
+    private final AtomicLong payloadBytesWritten;
+
+    private final AtomicLong signalingBytesRead;
+
+    private final AtomicLong signalingBytesWritten;
+
+    public PerformanceCounters() {
+        payloadBytesRead = new AtomicLong();
+        payloadBytesWritten = new AtomicLong();
+        signalingBytesRead = new AtomicLong();
+        signalingBytesWritten = new AtomicLong();
+    }
+
+    public void addPayloadBytesRead(long delta) {
+        payloadBytesRead.addAndGet(delta);
+    }
+
+    public long getPayloadBytesRead() {
+        return payloadBytesRead.get();
+    }
+
+    public void addPayloadBytesWritten(long delta) {
+        payloadBytesWritten.addAndGet(delta);
+    }
+
+    public long getPayloadBytesWritten() {
+        return payloadBytesWritten.get();
+    }
+
+    public void addSignalingBytesRead(long delta) {
+        signalingBytesRead.addAndGet(delta);
+    }
+
+    public long getSignalingBytesRead() {
+        return signalingBytesRead.get();
+    }
+
+    public void addSignalingBytesWritten(long delta) {
+        signalingBytesWritten.addAndGet(delta);
+    }
+
+    public long getSignalingBytesWritten() {
+        return signalingBytesWritten.get();
+    }
+}
\ No newline at end of file