[NO ISSUE][NET] Log ChannelReadInterface Stats on Failure

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:
- When the read buffers of a FullFrameChannelReadInterface
  are exceeded, log its stats to help in debugging the issue.
- Warn when a partial frame is being written over a full
  frame write channel.
- Rename ThreadSafetyGuaranteedBy -> GuardedBy.
- Annotate networking calls with their synchronization
  guards.

Change-Id: I89eed0c06dbf4b0e86747538bff286dc37853957
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/3663
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Al Hubail <mhubail@uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/MessagingChannelInterfaceFactory.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/MessagingChannelInterfaceFactory.java
index abd401a..17e2744 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/MessagingChannelInterfaceFactory.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/MessagingChannelInterfaceFactory.java
@@ -93,6 +93,11 @@
             }
             return buffer;
         }
+
+        @Override
+        public int getCreatedBuffersCount() {
+            return 0;
+        }
     }
 
     /**
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IBufferFactory.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IBufferFactory.java
index 9e8d931..d1d5b9b 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IBufferFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IBufferFactory.java
@@ -22,9 +22,20 @@
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
-@FunctionalInterface
 public interface IBufferFactory {
 
-    public ByteBuffer createBuffer() throws HyracksDataException;
+    /**
+     * Creates a buffer
+     *
+     * @return the created buffer
+     * @throws HyracksDataException
+     */
+    ByteBuffer createBuffer() throws HyracksDataException;
 
+    /**
+     * Gets the number of created buffers
+     *
+     * @return the number of created buffers
+     */
+    int getCreatedBuffersCount();
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/ReadBufferFactory.java b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/ReadBufferFactory.java
index 876b3de..ed639f5 100644
--- a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/ReadBufferFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/ReadBufferFactory.java
@@ -22,7 +22,9 @@
 
 import org.apache.hyracks.api.comm.IBufferFactory;
 import org.apache.hyracks.api.context.IHyracksCommonContext;
+import org.apache.hyracks.util.annotations.NotThreadSafe;
 
+@NotThreadSafe
 public class ReadBufferFactory implements IBufferFactory {
 
     private final int limit;
@@ -44,4 +46,9 @@
             return frame;
         }
     }
+
+    @Override
+    public int getCreatedBuffersCount() {
+        return counter;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java
index 2a6bdae..260c6b9 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java
@@ -35,8 +35,8 @@
 import org.apache.hyracks.api.job.resource.IJobCapacityController;
 import org.apache.hyracks.control.cc.job.IJobManager;
 import org.apache.hyracks.control.cc.job.JobRun;
+import org.apache.hyracks.util.annotations.GuardedBy;
 import org.apache.hyracks.util.annotations.NotThreadSafe;
-import org.apache.hyracks.util.annotations.ThreadSafetyGuaranteedBy;
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -45,7 +45,7 @@
  * An implementation of IJobQueue that gives more priority to jobs that are submitted earlier.
  */
 @NotThreadSafe
-@ThreadSafetyGuaranteedBy("JobManager")
+@GuardedBy("JobManager")
 public class FIFOJobQueue implements IJobQueue {
 
     private static final Logger LOGGER = LogManager.getLogger();
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelReadInterface.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelReadInterface.java
index 31cb69f..d81517d 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelReadInterface.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelReadInterface.java
@@ -24,6 +24,7 @@
 import org.apache.hyracks.api.comm.IBufferFactory;
 import org.apache.hyracks.api.comm.IChannelReadInterface;
 import org.apache.hyracks.api.comm.ICloseableBufferAcceptor;
+import org.apache.hyracks.util.annotations.GuardedBy;
 
 public abstract class AbstractChannelReadInterface implements IChannelReadInterface {
 
@@ -31,6 +32,7 @@
     protected IBufferAcceptor emptyBufferAcceptor;
     protected ByteBuffer currentReadBuffer;
     protected IBufferFactory bufferFactory;
+    @GuardedBy("MultiplexConnection")
     protected volatile int credits;
 
     @Override
@@ -58,11 +60,13 @@
     }
 
     @Override
+    @GuardedBy("MultiplexConnection")
     public int getCredits() {
         return credits;
     }
 
     @Override
+    @GuardedBy("MultiplexConnection")
     public void setReadCredits(int credits) {
         this.credits = credits;
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java
index 5c927f9..741ca8c 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java
@@ -27,6 +27,7 @@
 import org.apache.hyracks.api.comm.IChannelControlBlock;
 import org.apache.hyracks.api.comm.IChannelWriteInterface;
 import org.apache.hyracks.api.comm.ICloseableBufferAcceptor;
+import org.apache.hyracks.util.annotations.GuardedBy;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -42,6 +43,7 @@
     protected boolean channelWritabilityState;
     protected final int channelId;
     protected IBufferAcceptor eba;
+    @GuardedBy("ChannelControlBlock")
     protected int credits;
     protected boolean eos;
     protected boolean eosSent;
@@ -61,6 +63,7 @@
     }
 
     @Override
+    @GuardedBy("ChannelControlBlock")
     public void writeComplete() {
         if (currentWriteBuffer.remaining() <= 0) {
             currentWriteBuffer.clear();
@@ -70,6 +73,7 @@
         }
     }
 
+    @GuardedBy("ChannelControlBlock")
     private boolean computeWritability() {
         boolean writableDataPresent = currentWriteBuffer != null || !wiFullQueue.isEmpty();
         if (writableDataPresent) {
@@ -82,6 +86,7 @@
     }
 
     @Override
+    @GuardedBy("ChannelControlBlock")
     public void adjustChannelWritability() {
         boolean writable = computeWritability();
         if (writable) {
@@ -97,6 +102,7 @@
     }
 
     @Override
+    @GuardedBy("ChannelControlBlock")
     public void addCredits(int credit) {
         credits += credit;
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
index 3f88630..4ca7f1a 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
@@ -30,6 +30,7 @@
 import org.apache.hyracks.api.network.ISocketChannel;
 import org.apache.hyracks.net.protocols.muxdemux.MultiplexedConnection.WriterState;
 import org.apache.hyracks.util.JSONUtil;
+import org.apache.hyracks.util.annotations.GuardedBy;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -95,10 +96,12 @@
         return ri.read(sc, size);
     }
 
+    @GuardedBy("MultiplexConnection")
     int getReadCredits() {
         return ri.getCredits();
     }
 
+    @GuardedBy("MultiplexConnection")
     void setReadCredits(int credits) {
         ri.setReadCredits(credits);
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java
index f32e6bf..9d7f848 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java
@@ -27,6 +27,7 @@
 import org.apache.hyracks.api.comm.IChannelControlBlock;
 import org.apache.hyracks.api.exceptions.NetException;
 import org.apache.hyracks.api.network.ISocketChannel;
+import org.apache.hyracks.util.annotations.GuardedBy;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -36,6 +37,9 @@
     private final Deque<ByteBuffer> riEmptyStack;
     private final IChannelControlBlock ccb;
     private final Object bufferRecycleLock = new Object();
+    private int frameSize;
+    private long recycledBuffers = 0;
+    private long flushedBuffers = 0;
 
     public FullFrameChannelReadInterface(IChannelControlBlock ccb) {
         this.ccb = ccb;
@@ -43,17 +47,22 @@
         credits = 0;
         emptyBufferAcceptor = buffer -> {
             final int delta = buffer.remaining();
+            if (delta != frameSize) {
+                LOGGER.warn("partial frame being recycled; expected size {}, actual size {}", frameSize, delta);
+            }
             synchronized (bufferRecycleLock) {
                 if (ccb.isRemotelyClosed()) {
                     return;
                 }
                 riEmptyStack.push(buffer);
+                recycledBuffers++;
                 ccb.addPendingCredits(delta);
             }
         };
     }
 
     @Override
+    @GuardedBy("ChannelControlBlock")
     public int read(ISocketChannel sc, int size) throws IOException, NetException {
         synchronized (bufferRecycleLock) {
             while (true) {
@@ -62,16 +71,12 @@
                 }
                 if (currentReadBuffer == null) {
                     currentReadBuffer = riEmptyStack.poll();
-                    //if current buffer == null and limit not reached
-                    // factory.createBuffer factory
                     if (currentReadBuffer == null) {
                         currentReadBuffer = bufferFactory.createBuffer();
                     }
                 }
                 if (currentReadBuffer == null) {
-                    if (LOGGER.isWarnEnabled()) {
-                        LOGGER.warn("{} read buffers exceeded. Current empty buffers: {}", ccb, riEmptyStack.size());
-                    }
+                    logStats();
                     throw new IllegalStateException(ccb + " read buffers exceeded");
                 }
                 int rSize = Math.min(size, currentReadBuffer.remaining());
@@ -95,6 +100,7 @@
                 }
                 if (currentReadBuffer.remaining() <= 0) {
                     flush();
+                    flushedBuffers++;
                 }
             }
         }
@@ -102,7 +108,16 @@
 
     @Override
     public void setBufferFactory(IBufferFactory bufferFactory, int limit, int frameSize) {
+        this.frameSize = frameSize;
         super.setBufferFactory(bufferFactory, limit, frameSize);
         ccb.addPendingCredits(limit * frameSize);
     }
+
+    private void logStats() {
+        if (LOGGER.isWarnEnabled()) {
+            LOGGER.warn(
+                    "{} read buffers exceeded; current empty buffers: {}, created buffers: {}, recycled buffers: {}, flushed buffers: {}",
+                    ccb, riEmptyStack.size(), bufferFactory.getCreatedBuffersCount(), recycledBuffers, flushedBuffers);
+        }
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelWriteInterface.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelWriteInterface.java
index 3f4618b..a7be3a6 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelWriteInterface.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelWriteInterface.java
@@ -23,6 +23,7 @@
 import org.apache.hyracks.api.comm.IConnectionWriterState;
 import org.apache.hyracks.api.comm.MuxDemuxCommand;
 import org.apache.hyracks.api.exceptions.NetException;
+import org.apache.hyracks.util.annotations.GuardedBy;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -35,6 +36,7 @@
     }
 
     @Override
+    @GuardedBy("ChannelControlBlock")
     public void write(IConnectionWriterState writerState) throws NetException {
         if (currentWriteBuffer == null) {
             currentWriteBuffer = wiFullQueue.poll();
@@ -43,6 +45,9 @@
             int size = Math.min(currentWriteBuffer.remaining(), credits);
             if (size > 0) {
                 credits -= size;
+                if (credits % currentWriteBuffer.capacity() != 0) {
+                    LOGGER.warn("partial frame being written on {}", ccb);
+                }
                 writerState.getCommand().setChannelId(channelId);
                 writerState.getCommand().setCommandType(MuxDemuxCommand.CommandType.DATA);
                 writerState.getCommand().setData(size);
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
index 64c1a53..061c6eee 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
@@ -34,7 +34,7 @@
 import org.apache.hyracks.net.protocols.tcp.ITCPConnectionEventListener;
 import org.apache.hyracks.net.protocols.tcp.TCPConnection;
 import org.apache.hyracks.util.JSONUtil;
-import org.apache.hyracks.util.annotations.ThreadSafetyGuaranteedBy;
+import org.apache.hyracks.util.annotations.GuardedBy;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -406,6 +406,7 @@
                         int channelId = readerState.command.getChannelId();
                         ccb = cSet.registerChannel(channelId);
                         muxDemux.getChannelOpenListener().channelOpened(ccb);
+                        break;
                     }
                 }
                 if (LOGGER.isTraceEnabled()) {
@@ -429,7 +430,7 @@
         return muxDemux.getChannelInterfaceFactory();
     }
 
-    @ThreadSafetyGuaranteedBy("MultiplexedConnection.this")
+    @GuardedBy("MultiplexedConnection.this")
     private class EventCounter implements IEventCounter {
         private int counter;
 
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/test/java/org/apache/hyracks/net/tests/FullFrameChannelReadInterfaceTest.java b/hyracks-fullstack/hyracks/hyracks-net/src/test/java/org/apache/hyracks/net/tests/FullFrameChannelReadInterfaceTest.java
index ef6048d..2180d1d 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/test/java/org/apache/hyracks/net/tests/FullFrameChannelReadInterfaceTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/test/java/org/apache/hyracks/net/tests/FullFrameChannelReadInterfaceTest.java
@@ -176,5 +176,11 @@
             counter++;
             return ByteBuffer.allocate(frameSize);
         }
+
+        @Override
+        public int getCreatedBuffersCount() {
+            return counter;
+        }
+
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/annotations/ThreadSafetyGuaranteedBy.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/annotations/GuardedBy.java
similarity index 92%
rename from hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/annotations/ThreadSafetyGuaranteedBy.java
rename to hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/annotations/GuardedBy.java
index 7ca1a94..0ce2727 100644
--- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/annotations/ThreadSafetyGuaranteedBy.java
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/annotations/GuardedBy.java
@@ -29,8 +29,8 @@
  * to be thread safe by {@link #value()}
  */
 @Documented
-@Target({ ElementType.TYPE, ElementType.METHOD })
+@Target({ ElementType.TYPE, ElementType.METHOD, ElementType.FIELD })
 @Retention(RetentionPolicy.SOURCE)
-public @interface ThreadSafetyGuaranteedBy {
+public @interface GuardedBy {
     String value();
 }