Added performance counters to networing layer. Fixed sequencing of all signaling messages before CHANNEL_CLOSE_ACK.
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@999 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java
index 0b1fea0..2b7563a 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java
@@ -80,6 +80,14 @@
private final long[][] gcCollectionTimes;
+ private final long[] netPayloadBytesRead;
+
+ private final long[] netPayloadBytesWritten;
+
+ private final long[] netSignalingBytesRead;
+
+ private final long[] netSignalingBytesWritten;
+
private int rrdPtr;
private int lastHeartbeatDuration;
@@ -116,6 +124,10 @@
}
gcCollectionCounts = new long[gcN][RRD_SIZE];
gcCollectionTimes = new long[gcN][RRD_SIZE];
+ netPayloadBytesRead = new long[RRD_SIZE];
+ netPayloadBytesWritten = new long[RRD_SIZE];
+ netSignalingBytesRead = new long[RRD_SIZE];
+ netSignalingBytesWritten = new long[RRD_SIZE];
rrdPtr = 0;
}
@@ -139,6 +151,10 @@
gcCollectionCounts[i][rrdPtr] = hbData.gcCollectionCounts[i];
gcCollectionTimes[i][rrdPtr] = hbData.gcCollectionTimes[i];
}
+ netPayloadBytesRead[rrdPtr] = hbData.netPayloadBytesRead;
+ netPayloadBytesWritten[rrdPtr] = hbData.netPayloadBytesWritten;
+ netSignalingBytesRead[rrdPtr] = hbData.netSignalingBytesRead;
+ netSignalingBytesWritten[rrdPtr] = hbData.netSignalingBytesWritten;
rrdPtr = (rrdPtr + 1) % RRD_SIZE;
}
@@ -199,6 +215,10 @@
o.put("gc-names", gcNames);
o.put("gc-collection-counts", gcCollectionCounts);
o.put("gc-collection-times", gcCollectionTimes);
+ o.put("net-payload-bytes-read", netPayloadBytesRead);
+ o.put("net-payload-bytes-written", netPayloadBytesWritten);
+ o.put("net-signaling-bytes-read", netSignalingBytesRead);
+ o.put("net-signaling-bytes-written", netSignalingBytesWritten);
return o;
}
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/heartbeat/HeartbeatData.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/heartbeat/HeartbeatData.java
index cd9086f..8ca0fa4 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/heartbeat/HeartbeatData.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/heartbeat/HeartbeatData.java
@@ -33,4 +33,8 @@
public double systemLoadAverage;
public long[] gcCollectionCounts;
public long[] gcCollectionTimes;
+ public long netPayloadBytesRead;
+ public long netPayloadBytesWritten;
+ public long netSignalingBytesRead;
+ public long netSignalingBytesWritten;
}
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index d5fc27e..ea18ad7 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -77,6 +77,7 @@
import edu.uci.ics.hyracks.control.nc.work.StartTasksWork;
import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
import edu.uci.ics.hyracks.ipc.impl.IPCSystem;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.PerformanceCounters;
public class NodeControllerService extends AbstractRemoteService implements INodeController {
private static Logger LOGGER = Logger.getLogger(NodeControllerService.class.getName());
@@ -335,6 +336,11 @@
hbData.gcCollectionCounts[i] = gcMXBean.getCollectionCount();
hbData.gcCollectionTimes[i] = gcMXBean.getCollectionTime();
}
+ PerformanceCounters netPC = netManager.getPerformanceCounters();
+ hbData.netPayloadBytesRead = netPC.getPayloadBytesRead();
+ hbData.netPayloadBytesWritten = netPC.getPayloadBytesWritten();
+ hbData.netSignalingBytesRead = netPC.getSignalingBytesRead();
+ hbData.netSignalingBytesWritten = netPC.getSignalingBytesWritten();
try {
cc.nodeHeartbeat(id, hbData);
} catch (Exception e) {
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java
index 125e737..8e2a8ad 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java
@@ -35,6 +35,7 @@
import edu.uci.ics.hyracks.net.protocols.muxdemux.IChannelOpenListener;
import edu.uci.ics.hyracks.net.protocols.muxdemux.MultiplexedConnection;
import edu.uci.ics.hyracks.net.protocols.muxdemux.MuxDemux;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.PerformanceCounters;
public class NetworkManager {
private static final Logger LOGGER = Logger.getLogger(NetworkManager.class.getName());
@@ -126,4 +127,8 @@
int receiverIndex = buffer.getInt();
return new PartitionId(jobId, cdid, senderIndex, receiverIndex);
}
+
+ public PerformanceCounters getPerformanceCounters() {
+ return md.getPerformanceCounters();
+ }
}
\ No newline at end of file
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelControlBlock.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
index a281fe8..99140db 100644
--- a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
@@ -69,6 +69,9 @@
private final IBufferAcceptor eba = new IBufferAcceptor() {
@Override
public void accept(ByteBuffer buffer) {
+ if (remoteClose.get()) {
+ return;
+ }
int delta;
synchronized (ChannelControlBlock.this) {
riEmptyQueue.add(buffer);
@@ -102,7 +105,7 @@
return eba;
}
- int read(SocketChannel sc, int size) throws IOException {
+ int read(SocketChannel sc, int size) throws IOException, NetException {
while (true) {
if (size <= 0) {
return size;
@@ -117,6 +120,9 @@
int len;
try {
len = sc.read(ri.currentReadBuffer);
+ if (len < 0) {
+ throw new NetException("Socket Closed");
+ }
} finally {
ri.currentReadBuffer.limit(ri.currentReadBuffer.capacity());
}
@@ -273,7 +279,7 @@
wi.writeComplete();
}
- synchronized int read(SocketChannel sc, int size) throws IOException {
+ synchronized int read(SocketChannel sc, int size) throws IOException, NetException {
return ri.read(sc, size);
}
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelSet.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelSet.java
index ab5fd40..9411d42 100644
--- a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelSet.java
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelSet.java
@@ -140,6 +140,15 @@
}
}
+ void unmarkPendingCredits(int channelId) {
+ synchronized (mConn) {
+ if (pendingChannelCreditsBitmap.get(channelId)) {
+ pendingChannelCreditsBitmap.clear(channelId);
+ pendingWriteEventsCounter.decrement();
+ }
+ }
+ }
+
void markPendingWrite(int channelId) {
synchronized (mConn) {
assert !pendingChannelWriteBitmap.get(channelId);
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MultiplexedConnection.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
index 7f610de..8a21791 100644
--- a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
@@ -138,6 +138,7 @@
int len = writeBuffer.remaining();
if (len > 0) {
int written = sc.write(writeBuffer);
+ muxDemux.getPerformanceCounters().addSignalingBytesWritten(written);
if (written < len) {
return false;
}
@@ -149,6 +150,7 @@
try {
pendingBuffer.limit(pendingWriteSize + pendingBuffer.position());
int written = sc.write(pendingBuffer);
+ muxDemux.getPerformanceCounters().addPayloadBytesWritten(written);
pendingWriteSize -= written;
} finally {
pendingBuffer.limit(oldLimit);
@@ -266,7 +268,11 @@
void driveReaderStateMachine() throws IOException, NetException {
SocketChannel sc = tcpConnection.getSocketChannel();
if (readerState.readBuffer.remaining() > 0) {
- sc.read(readerState.readBuffer);
+ int read = sc.read(readerState.readBuffer);
+ if (read < 0) {
+ throw new NetException("Socket Closed");
+ }
+ muxDemux.getPerformanceCounters().addSignalingBytesRead(read);
if (readerState.readBuffer.remaining() > 0) {
return;
}
@@ -289,7 +295,9 @@
ccb = cSet.getCCB(readerState.command.getChannelId());
}
ccb.reportRemoteEOS();
- cSet.markEOSAck(ccb.getChannelId());
+ int channelId = ccb.getChannelId();
+ cSet.markEOSAck(channelId);
+ cSet.unmarkPendingCredits(channelId);
break;
}
case CLOSE_CHANNEL_ACK: {
@@ -312,7 +320,9 @@
ccb = cSet.getCCB(readerState.command.getChannelId());
}
ccb.reportRemoteError(readerState.command.getData());
- cSet.markEOSAck(ccb.getChannelId());
+ int channelId = ccb.getChannelId();
+ cSet.markEOSAck(channelId);
+ cSet.unmarkPendingCredits(channelId);
break;
}
case OPEN_CHANNEL: {
@@ -326,7 +336,9 @@
}
}
if (readerState.pendingReadSize > 0) {
- readerState.pendingReadSize = readerState.ccb.read(sc, readerState.pendingReadSize);
+ int newPendingReadSize = readerState.ccb.read(sc, readerState.pendingReadSize);
+ muxDemux.getPerformanceCounters().addPayloadBytesRead(newPendingReadSize - readerState.pendingReadSize);
+ readerState.pendingReadSize = newPendingReadSize;
if (readerState.pendingReadSize > 0) {
return;
}
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemux.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemux.java
index 74f1be2..1ed73c5 100644
--- a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemux.java
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemux.java
@@ -32,6 +32,8 @@
private final TCPEndpoint tcpEndpoint;
+ private final PerformanceCounters perfCounters;
+
public MuxDemux(InetSocketAddress localAddress, IChannelOpenListener listener) {
this.localAddress = localAddress;
this.channelOpenListener = listener;
@@ -57,6 +59,7 @@
connection.setAttachment(mConn);
}
});
+ perfCounters = new PerformanceCounters();
}
public void start() throws IOException {
@@ -84,4 +87,8 @@
public InetSocketAddress getLocalAddress() {
return tcpEndpoint.getLocalAddress();
}
+
+ public PerformanceCounters getPerformanceCounters() {
+ return perfCounters;
+ }
}
\ No newline at end of file
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/PerformanceCounters.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/PerformanceCounters.java
new file mode 100644
index 0000000..a203f06
--- /dev/null
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/PerformanceCounters.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.net.protocols.muxdemux;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+public class PerformanceCounters {
+ private final AtomicLong payloadBytesRead;
+
+ private final AtomicLong payloadBytesWritten;
+
+ private final AtomicLong signalingBytesRead;
+
+ private final AtomicLong signalingBytesWritten;
+
+ public PerformanceCounters() {
+ payloadBytesRead = new AtomicLong();
+ payloadBytesWritten = new AtomicLong();
+ signalingBytesRead = new AtomicLong();
+ signalingBytesWritten = new AtomicLong();
+ }
+
+ public void addPayloadBytesRead(long delta) {
+ payloadBytesRead.addAndGet(delta);
+ }
+
+ public long getPayloadBytesRead() {
+ return payloadBytesRead.get();
+ }
+
+ public void addPayloadBytesWritten(long delta) {
+ payloadBytesWritten.addAndGet(delta);
+ }
+
+ public long getPayloadBytesWritten() {
+ return payloadBytesWritten.get();
+ }
+
+ public void addSignalingBytesRead(long delta) {
+ signalingBytesRead.addAndGet(delta);
+ }
+
+ public long getSignalingBytesRead() {
+ return signalingBytesRead.get();
+ }
+
+ public void addSignalingBytesWritten(long delta) {
+ signalingBytesWritten.addAndGet(delta);
+ }
+
+ public long getSignalingBytesWritten() {
+ return signalingBytesWritten.get();
+ }
+}
\ No newline at end of file