[NO ISSUE][NET] Skip Channel Write on Connection Failure
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Do not attempt to adjust channel writability if
the failure was due to connection failure since
no more messages will be sent on that connection.
This is done to prevent a possible deadlock between
network IOThread that detected connection failure
and another thread that might be accessing the channel.
- Make sending error code conditions more explicit since
we currently have a single error code that is sent.
Change-Id: Ic25f05ac2c0d02699324f2d1b80c51f392654106
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2892
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/MessagingChannelWriteInterface.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/MessagingChannelWriteInterface.java
index 43c1542..9ad870c 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/MessagingChannelWriteInterface.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/MessagingChannelWriteInterface.java
@@ -46,10 +46,10 @@
} else {
adjustChannelWritability();
}
- } else if (ecode >= 0 && !ecodeSent) {
+ } else if (ecode.get() == REMOTE_ERROR_CODE && !ecodeSent) {
writerState.getCommand().setChannelId(channelId);
writerState.getCommand().setCommandType(MuxDemuxCommand.CommandType.ERROR);
- writerState.getCommand().setData(ecode);
+ writerState.getCommand().setData(REMOTE_ERROR_CODE);
writerState.reset(null, 0, null);
ecodeSent = true;
ccb.reportLocalEOS();
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/InputChannelFrameReader.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/InputChannelFrameReader.java
index 5ce29c2..f32adcc 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/InputChannelFrameReader.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/InputChannelFrameReader.java
@@ -66,7 +66,7 @@
}
}
if (hasFailed()) {
- if (errorCode == AbstractChannelWriteInterface.LOCAL_ERROR_CODE) {
+ if (errorCode == AbstractChannelWriteInterface.CONNECTION_LOST_ERROR_CODE) {
throw HyracksDataException.create(ErrorCode.LOCAL_NETWORK_ERROR);
}
// Do not throw exception here to allow the root cause exception gets propagated to the master first.
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/NonDeterministicChannelReader.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/NonDeterministicChannelReader.java
index d8dc4b9..becbb00 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/NonDeterministicChannelReader.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/NonDeterministicChannelReader.java
@@ -158,7 +158,7 @@
// Note: if a remote failure overwrites the value of localFailure, then we rely on
// the fact that the remote task will notify the cc of the failure.
// Otherwise, the local task must fail
- localFailure = errorCode == AbstractChannelWriteInterface.LOCAL_ERROR_CODE;
+ localFailure = errorCode == AbstractChannelWriteInterface.CONNECTION_LOST_ERROR_CODE;
failSenders.set(senderIndex);
eosSenders.set(senderIndex);
notifyAll();
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java
index 7ed9bfa..0a28e93 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java
@@ -21,6 +21,7 @@
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Queue;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hyracks.api.comm.IBufferAcceptor;
import org.apache.hyracks.api.comm.IChannelControlBlock;
@@ -32,18 +33,18 @@
public abstract class AbstractChannelWriteInterface implements IChannelWriteInterface {
public static final int NO_ERROR_CODE = 0;
+ public static final int CONNECTION_LOST_ERROR_CODE = -1;
public static final int REMOTE_ERROR_CODE = 1;
- public static final int LOCAL_ERROR_CODE = -1;
private static final Logger LOGGER = LogManager.getLogger();
protected final IChannelControlBlock ccb;
protected final Queue<ByteBuffer> wiFullQueue;
+ protected final AtomicInteger ecode = new AtomicInteger(NO_ERROR_CODE);
protected boolean channelWritabilityState;
protected final int channelId;
protected IBufferAcceptor eba;
protected int credits;
protected boolean eos;
protected boolean eosSent;
- protected int ecode;
protected boolean ecodeSent;
protected ByteBuffer currentWriteBuffer;
private final ICloseableBufferAcceptor fba;
@@ -56,7 +57,6 @@
credits = 0;
eos = false;
eosSent = false;
- ecode = -1;
ecodeSent = false;
}
@@ -78,10 +78,7 @@
if (eos && !eosSent) {
return true;
}
- if (ecode >= 0 && !ecodeSent) {
- return true;
- }
- return false;
+ return ecode.get() == REMOTE_ERROR_CODE && !ecodeSent;
}
@Override
@@ -138,7 +135,7 @@
return;
}
eos = true;
- if (ecode != REMOTE_ERROR_CODE) {
+ if (ecode.get() != REMOTE_ERROR_CODE) {
adjustChannelWritability();
}
}
@@ -146,8 +143,11 @@
@Override
public void error(int ecode) {
+ AbstractChannelWriteInterface.this.ecode.set(ecode);
+ if (ecode == CONNECTION_LOST_ERROR_CODE) {
+ return;
+ }
synchronized (ccb) {
- AbstractChannelWriteInterface.this.ecode = ecode;
adjustChannelWritability();
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelSet.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelSet.java
index a546349..31a37ef 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelSet.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelSet.java
@@ -200,7 +200,7 @@
for (int i = 0; i < ccbArray.length; ++i) {
ChannelControlBlock ccb = ccbArray[i];
if (ccb != null) {
- ccb.reportRemoteError(AbstractChannelWriteInterface.LOCAL_ERROR_CODE);
+ ccb.reportRemoteError(AbstractChannelWriteInterface.CONNECTION_LOST_ERROR_CODE);
markEOSAck(i);
unmarkPendingCredits(i);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelWriteInterface.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelWriteInterface.java
index 17b70a8..628007d 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelWriteInterface.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelWriteInterface.java
@@ -50,10 +50,10 @@
} else {
adjustChannelWritability();
}
- } else if (ecode >= 0 && !ecodeSent) {
+ } else if (ecode.get() == REMOTE_ERROR_CODE && !ecodeSent) {
writerState.getCommand().setChannelId(channelId);
writerState.getCommand().setCommandType(MuxDemuxCommand.CommandType.ERROR);
- writerState.getCommand().setData(ecode);
+ writerState.getCommand().setData(REMOTE_ERROR_CODE);
writerState.reset(null, 0, null);
ecodeSent = true;
ccb.reportLocalEOS();