[NO ISSUE][NET] Log ChannelReadInterface Stats on Failure
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
- When the read buffers of a FullFrameChannelReadInterface
are exceeded, log its stats to help in debugging the issue.
- Warn when a partial frame is being written over a full
frame write channel.
- Rename ThreadSafetyGuaranteedBy -> GuardedBy.
- Annotate networking calls with their synchronization
guards.
Change-Id: I89eed0c06dbf4b0e86747538bff286dc37853957
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/3663
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Al Hubail <mhubail@uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/MessagingChannelInterfaceFactory.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/MessagingChannelInterfaceFactory.java
index abd401a..17e2744 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/MessagingChannelInterfaceFactory.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/MessagingChannelInterfaceFactory.java
@@ -93,6 +93,11 @@
}
return buffer;
}
+
+ @Override
+ public int getCreatedBuffersCount() {
+ return 0;
+ }
}
/**
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IBufferFactory.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IBufferFactory.java
index 9e8d931..d1d5b9b 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IBufferFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IBufferFactory.java
@@ -22,9 +22,20 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
-@FunctionalInterface
public interface IBufferFactory {
- public ByteBuffer createBuffer() throws HyracksDataException;
+ /**
+ * Creates a buffer
+ *
+ * @return the created buffer
+ * @throws HyracksDataException
+ */
+ ByteBuffer createBuffer() throws HyracksDataException;
+ /**
+ * Gets the number of created buffers
+ *
+ * @return the number of created buffers
+ */
+ int getCreatedBuffersCount();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/ReadBufferFactory.java b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/ReadBufferFactory.java
index 876b3de..ed639f5 100644
--- a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/ReadBufferFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/ReadBufferFactory.java
@@ -22,7 +22,9 @@
import org.apache.hyracks.api.comm.IBufferFactory;
import org.apache.hyracks.api.context.IHyracksCommonContext;
+import org.apache.hyracks.util.annotations.NotThreadSafe;
+@NotThreadSafe
public class ReadBufferFactory implements IBufferFactory {
private final int limit;
@@ -44,4 +46,9 @@
return frame;
}
}
+
+ @Override
+ public int getCreatedBuffersCount() {
+ return counter;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java
index 2a6bdae..260c6b9 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java
@@ -35,8 +35,8 @@
import org.apache.hyracks.api.job.resource.IJobCapacityController;
import org.apache.hyracks.control.cc.job.IJobManager;
import org.apache.hyracks.control.cc.job.JobRun;
+import org.apache.hyracks.util.annotations.GuardedBy;
import org.apache.hyracks.util.annotations.NotThreadSafe;
-import org.apache.hyracks.util.annotations.ThreadSafetyGuaranteedBy;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -45,7 +45,7 @@
* An implementation of IJobQueue that gives more priority to jobs that are submitted earlier.
*/
@NotThreadSafe
-@ThreadSafetyGuaranteedBy("JobManager")
+@GuardedBy("JobManager")
public class FIFOJobQueue implements IJobQueue {
private static final Logger LOGGER = LogManager.getLogger();
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelReadInterface.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelReadInterface.java
index 31cb69f..d81517d 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelReadInterface.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelReadInterface.java
@@ -24,6 +24,7 @@
import org.apache.hyracks.api.comm.IBufferFactory;
import org.apache.hyracks.api.comm.IChannelReadInterface;
import org.apache.hyracks.api.comm.ICloseableBufferAcceptor;
+import org.apache.hyracks.util.annotations.GuardedBy;
public abstract class AbstractChannelReadInterface implements IChannelReadInterface {
@@ -31,6 +32,7 @@
protected IBufferAcceptor emptyBufferAcceptor;
protected ByteBuffer currentReadBuffer;
protected IBufferFactory bufferFactory;
+ @GuardedBy("MultiplexConnection")
protected volatile int credits;
@Override
@@ -58,11 +60,13 @@
}
@Override
+ @GuardedBy("MultiplexConnection")
public int getCredits() {
return credits;
}
@Override
+ @GuardedBy("MultiplexConnection")
public void setReadCredits(int credits) {
this.credits = credits;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java
index 5c927f9..741ca8c 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java
@@ -27,6 +27,7 @@
import org.apache.hyracks.api.comm.IChannelControlBlock;
import org.apache.hyracks.api.comm.IChannelWriteInterface;
import org.apache.hyracks.api.comm.ICloseableBufferAcceptor;
+import org.apache.hyracks.util.annotations.GuardedBy;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -42,6 +43,7 @@
protected boolean channelWritabilityState;
protected final int channelId;
protected IBufferAcceptor eba;
+ @GuardedBy("ChannelControlBlock")
protected int credits;
protected boolean eos;
protected boolean eosSent;
@@ -61,6 +63,7 @@
}
@Override
+ @GuardedBy("ChannelControlBlock")
public void writeComplete() {
if (currentWriteBuffer.remaining() <= 0) {
currentWriteBuffer.clear();
@@ -70,6 +73,7 @@
}
}
+ @GuardedBy("ChannelControlBlock")
private boolean computeWritability() {
boolean writableDataPresent = currentWriteBuffer != null || !wiFullQueue.isEmpty();
if (writableDataPresent) {
@@ -82,6 +86,7 @@
}
@Override
+ @GuardedBy("ChannelControlBlock")
public void adjustChannelWritability() {
boolean writable = computeWritability();
if (writable) {
@@ -97,6 +102,7 @@
}
@Override
+ @GuardedBy("ChannelControlBlock")
public void addCredits(int credit) {
credits += credit;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
index 3f88630..4ca7f1a 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
@@ -30,6 +30,7 @@
import org.apache.hyracks.api.network.ISocketChannel;
import org.apache.hyracks.net.protocols.muxdemux.MultiplexedConnection.WriterState;
import org.apache.hyracks.util.JSONUtil;
+import org.apache.hyracks.util.annotations.GuardedBy;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -95,10 +96,12 @@
return ri.read(sc, size);
}
+ @GuardedBy("MultiplexConnection")
int getReadCredits() {
return ri.getCredits();
}
+ @GuardedBy("MultiplexConnection")
void setReadCredits(int credits) {
ri.setReadCredits(credits);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java
index f32e6bf..9d7f848 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java
@@ -27,6 +27,7 @@
import org.apache.hyracks.api.comm.IChannelControlBlock;
import org.apache.hyracks.api.exceptions.NetException;
import org.apache.hyracks.api.network.ISocketChannel;
+import org.apache.hyracks.util.annotations.GuardedBy;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -36,6 +37,9 @@
private final Deque<ByteBuffer> riEmptyStack;
private final IChannelControlBlock ccb;
private final Object bufferRecycleLock = new Object();
+ private int frameSize;
+ private long recycledBuffers = 0;
+ private long flushedBuffers = 0;
public FullFrameChannelReadInterface(IChannelControlBlock ccb) {
this.ccb = ccb;
@@ -43,17 +47,22 @@
credits = 0;
emptyBufferAcceptor = buffer -> {
final int delta = buffer.remaining();
+ if (delta != frameSize) {
+ LOGGER.warn("partial frame being recycled; expected size {}, actual size {}", frameSize, delta);
+ }
synchronized (bufferRecycleLock) {
if (ccb.isRemotelyClosed()) {
return;
}
riEmptyStack.push(buffer);
+ recycledBuffers++;
ccb.addPendingCredits(delta);
}
};
}
@Override
+ @GuardedBy("ChannelControlBlock")
public int read(ISocketChannel sc, int size) throws IOException, NetException {
synchronized (bufferRecycleLock) {
while (true) {
@@ -62,16 +71,12 @@
}
if (currentReadBuffer == null) {
currentReadBuffer = riEmptyStack.poll();
- //if current buffer == null and limit not reached
- // factory.createBuffer factory
if (currentReadBuffer == null) {
currentReadBuffer = bufferFactory.createBuffer();
}
}
if (currentReadBuffer == null) {
- if (LOGGER.isWarnEnabled()) {
- LOGGER.warn("{} read buffers exceeded. Current empty buffers: {}", ccb, riEmptyStack.size());
- }
+ logStats();
throw new IllegalStateException(ccb + " read buffers exceeded");
}
int rSize = Math.min(size, currentReadBuffer.remaining());
@@ -95,6 +100,7 @@
}
if (currentReadBuffer.remaining() <= 0) {
flush();
+ flushedBuffers++;
}
}
}
@@ -102,7 +108,16 @@
@Override
public void setBufferFactory(IBufferFactory bufferFactory, int limit, int frameSize) {
+ this.frameSize = frameSize;
super.setBufferFactory(bufferFactory, limit, frameSize);
ccb.addPendingCredits(limit * frameSize);
}
+
+ private void logStats() {
+ if (LOGGER.isWarnEnabled()) {
+ LOGGER.warn(
+ "{} read buffers exceeded; current empty buffers: {}, created buffers: {}, recycled buffers: {}, flushed buffers: {}",
+ ccb, riEmptyStack.size(), bufferFactory.getCreatedBuffersCount(), recycledBuffers, flushedBuffers);
+ }
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelWriteInterface.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelWriteInterface.java
index 3f4618b..a7be3a6 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelWriteInterface.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelWriteInterface.java
@@ -23,6 +23,7 @@
import org.apache.hyracks.api.comm.IConnectionWriterState;
import org.apache.hyracks.api.comm.MuxDemuxCommand;
import org.apache.hyracks.api.exceptions.NetException;
+import org.apache.hyracks.util.annotations.GuardedBy;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -35,6 +36,7 @@
}
@Override
+ @GuardedBy("ChannelControlBlock")
public void write(IConnectionWriterState writerState) throws NetException {
if (currentWriteBuffer == null) {
currentWriteBuffer = wiFullQueue.poll();
@@ -43,6 +45,9 @@
int size = Math.min(currentWriteBuffer.remaining(), credits);
if (size > 0) {
credits -= size;
+ if (credits % currentWriteBuffer.capacity() != 0) {
+ LOGGER.warn("partial frame being written on {}", ccb);
+ }
writerState.getCommand().setChannelId(channelId);
writerState.getCommand().setCommandType(MuxDemuxCommand.CommandType.DATA);
writerState.getCommand().setData(size);
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
index 64c1a53..061c6eee 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
@@ -34,7 +34,7 @@
import org.apache.hyracks.net.protocols.tcp.ITCPConnectionEventListener;
import org.apache.hyracks.net.protocols.tcp.TCPConnection;
import org.apache.hyracks.util.JSONUtil;
-import org.apache.hyracks.util.annotations.ThreadSafetyGuaranteedBy;
+import org.apache.hyracks.util.annotations.GuardedBy;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -406,6 +406,7 @@
int channelId = readerState.command.getChannelId();
ccb = cSet.registerChannel(channelId);
muxDemux.getChannelOpenListener().channelOpened(ccb);
+ break;
}
}
if (LOGGER.isTraceEnabled()) {
@@ -429,7 +430,7 @@
return muxDemux.getChannelInterfaceFactory();
}
- @ThreadSafetyGuaranteedBy("MultiplexedConnection.this")
+ @GuardedBy("MultiplexedConnection.this")
private class EventCounter implements IEventCounter {
private int counter;
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/test/java/org/apache/hyracks/net/tests/FullFrameChannelReadInterfaceTest.java b/hyracks-fullstack/hyracks/hyracks-net/src/test/java/org/apache/hyracks/net/tests/FullFrameChannelReadInterfaceTest.java
index ef6048d..2180d1d 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/test/java/org/apache/hyracks/net/tests/FullFrameChannelReadInterfaceTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/test/java/org/apache/hyracks/net/tests/FullFrameChannelReadInterfaceTest.java
@@ -176,5 +176,11 @@
counter++;
return ByteBuffer.allocate(frameSize);
}
+
+ @Override
+ public int getCreatedBuffersCount() {
+ return counter;
+ }
+
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/annotations/ThreadSafetyGuaranteedBy.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/annotations/GuardedBy.java
similarity index 92%
rename from hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/annotations/ThreadSafetyGuaranteedBy.java
rename to hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/annotations/GuardedBy.java
index 7ca1a94..0ce2727 100644
--- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/annotations/ThreadSafetyGuaranteedBy.java
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/annotations/GuardedBy.java
@@ -29,8 +29,8 @@
* to be thread safe by {@link #value()}
*/
@Documented
-@Target({ ElementType.TYPE, ElementType.METHOD })
+@Target({ ElementType.TYPE, ElementType.METHOD, ElementType.FIELD })
@Retention(RetentionPolicy.SOURCE)
-public @interface ThreadSafetyGuaranteedBy {
+public @interface GuardedBy {
String value();
}