[NO ISSUE][NET] Networking Fixes
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Ensure received partitions requests after job failure
are aborted to prevent leaked network channels.
- Do not send channel close after channel write error
since the contract is to close the channel when remote
errors are received.
- Only remove closed outgoing connections to establish
new connections since incoming connections need to be
reestablished by the remote destination.
- Do not perform further operations on failed multiplexed
connections to avoid CanceledKeyException.
- Add test case for received partition requests after
job failure.
Change-Id: Idc45f47fdf0419bf75d461e16f028237a5143de7
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2613
Reviewed-by: Michael Blow <mblow@apache.org>
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Michael Blow <mblow@apache.org>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java
index 126d9a4..3016a7a 100644
--- a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java
+++ b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java
@@ -25,6 +25,7 @@
import org.apache.hyracks.api.comm.IBufferAcceptor;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.net.protocols.muxdemux.AbstractChannelWriteInterface;
import org.apache.hyracks.net.protocols.muxdemux.ChannelControlBlock;
public class NetworkOutputChannel implements IFrameWriter {
@@ -43,7 +44,7 @@
public NetworkOutputChannel(ChannelControlBlock ccb, int nBuffers) {
this.ccb = ccb;
this.nBuffers = nBuffers;
- emptyStack = new ArrayDeque<ByteBuffer>(nBuffers);
+ emptyStack = new ArrayDeque<>(nBuffers);
ccb.getWriteInterface().setEmptyBufferAcceptor(new WriteEmptyBufferAcceptor());
}
@@ -58,7 +59,7 @@
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- ByteBuffer destBuffer = null;
+ ByteBuffer destBuffer;
while (buffer.hasRemaining()) {
synchronized (this) {
while (true) {
@@ -76,6 +77,7 @@
try {
wait();
} catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
throw HyracksDataException.create(e);
}
}
@@ -94,7 +96,7 @@
@Override
public void fail() throws HyracksDataException {
- ccb.getWriteInterface().getFullBufferAcceptor().error(1);
+ ccb.getWriteInterface().getFullBufferAcceptor().error(AbstractChannelWriteInterface.REMOTE_WRITE_ERROR_CODE);
}
@Override
@@ -103,7 +105,7 @@
}
public void abort() {
- ccb.getWriteInterface().getFullBufferAcceptor().error(1);
+ ccb.getWriteInterface().getFullBufferAcceptor().error(AbstractChannelWriteInterface.REMOTE_WRITE_ERROR_CODE);
synchronized (NetworkOutputChannel.this) {
aborted = true;
NetworkOutputChannel.this.notifyAll();
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/pom.xml b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/pom.xml
index d7ed47d..c962029 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/pom.xml
@@ -87,5 +87,9 @@
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
</dependencies>
</project>
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 76b5c8c..6a7d645 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -665,6 +665,7 @@
}
public void notifyTasksCompleted(CcId ccId) throws Exception {
+ partitionManager.jobsCompleted(ccId);
application.onRegisterNode(ccId);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java
index f3276a4..cfe0991 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java
@@ -27,7 +27,6 @@
import org.apache.hyracks.api.comm.ICloseableBufferAcceptor;
import org.apache.hyracks.api.comm.NetworkAddress;
import org.apache.hyracks.api.dataflow.ConnectorDescriptorId;
-import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.exceptions.NetException;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.partitions.PartitionId;
@@ -129,12 +128,7 @@
LOGGER.debug("Received initial partition request: " + pid + " on channel: " + ccb);
}
noc = new NetworkOutputChannel(ccb, nBuffers);
- try {
- partitionManager.registerPartitionRequest(pid, noc);
- } catch (HyracksException e) {
- e.printStackTrace();
- noc.abort();
- }
+ partitionManager.registerPartitionRequest(pid, noc);
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionManager.java
index 9ee4a9e..d023ce9 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionManager.java
@@ -19,20 +19,22 @@
package org.apache.hyracks.control.nc.partitions;
import java.util.ArrayList;
-import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import org.apache.hyracks.api.control.CcId;
import org.apache.hyracks.api.dataflow.TaskAttemptId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.io.IWorkspaceFileFactory;
import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobStatus;
import org.apache.hyracks.api.partitions.IPartition;
import org.apache.hyracks.api.partitions.PartitionId;
+import org.apache.hyracks.api.resources.IDeallocatable;
import org.apache.hyracks.comm.channels.NetworkOutputChannel;
import org.apache.hyracks.control.common.job.PartitionDescriptor;
import org.apache.hyracks.control.common.job.PartitionState;
@@ -40,6 +42,9 @@
import org.apache.hyracks.control.nc.io.WorkspaceFileFactory;
import org.apache.hyracks.control.nc.resources.DefaultDeallocatableRegistry;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+
public class PartitionManager {
private final NodeControllerService ncs;
@@ -52,11 +57,14 @@
private final Map<PartitionId, NetworkOutputChannel> partitionRequests = new HashMap<>();
+ private final Cache<JobId, JobId> failedJobsCache;
+
public PartitionManager(NodeControllerService ncs) {
this.ncs = ncs;
this.availablePartitionMap = new HashMap<>();
this.deallocatableRegistry = new DefaultDeallocatableRegistry();
this.fileFactory = new WorkspaceFileFactory(deallocatableRegistry, ncs.getIoManager());
+ failedJobsCache = CacheBuilder.newBuilder().expireAfterWrite(1, TimeUnit.MINUTES).build();
}
public synchronized void registerPartition(PartitionId pid, CcId ccId, TaskAttemptId taId, IPartition partition,
@@ -95,37 +103,20 @@
return availablePartitionMap.get(pid).get(0);
}
- public synchronized void unregisterPartitions(JobId jobId, Collection<IPartition> unregisteredPartitions) {
- for (Iterator<Map.Entry<PartitionId, List<IPartition>>> i = availablePartitionMap.entrySet().iterator(); i
- .hasNext();) {
- Map.Entry<PartitionId, List<IPartition>> e = i.next();
- PartitionId pid = e.getKey();
- if (jobId.equals(pid.getJobId())) {
- for (IPartition p : e.getValue()) {
- unregisteredPartitions.add(p);
- }
- i.remove();
- }
+ public synchronized void registerPartitionRequest(PartitionId partitionId, NetworkOutputChannel writer) {
+ if (failedJobsCache.getIfPresent(partitionId.getJobId()) != null) {
+ writer.abort();
}
- }
-
- public synchronized void registerPartitionRequest(PartitionId partitionId, NetworkOutputChannel writer)
- throws HyracksException {
- try {
- List<IPartition> pList = availablePartitionMap.get(partitionId);
- if (pList != null && !pList.isEmpty()) {
- IPartition partition = pList.get(0);
- writer.setFrameSize(partition.getTaskContext().getInitialFrameSize());
- partition.writeTo(writer);
- if (!partition.isReusable()) {
- availablePartitionMap.remove(partitionId);
- }
- } else {
- //throw new HyracksException("Request for unknown partition " + partitionId);
- partitionRequests.put(partitionId, writer);
+ List<IPartition> pList = availablePartitionMap.get(partitionId);
+ if (pList != null && !pList.isEmpty()) {
+ IPartition partition = pList.get(0);
+ writer.setFrameSize(partition.getTaskContext().getInitialFrameSize());
+ partition.writeTo(writer);
+ if (!partition.isReusable()) {
+ availablePartitionMap.remove(partitionId);
}
- } catch (Exception e) {
- throw HyracksDataException.create(e);
+ } else {
+ partitionRequests.put(partitionId, writer);
}
}
@@ -137,7 +128,25 @@
deallocatableRegistry.close();
}
- public void updatePartitionState(CcId ccId, PartitionId pid, TaskAttemptId taId, IPartition partition,
+ public synchronized void jobCompleted(JobId jobId, JobStatus status) {
+ if (status == JobStatus.FAILURE) {
+ failedJobsCache.put(jobId, jobId);
+ }
+ final List<IPartition> jobPartitions = unregisterPartitions(jobId);
+ final List<NetworkOutputChannel> pendingRequests = removePendingRequests(jobId, status);
+ if (!jobPartitions.isEmpty() || !pendingRequests.isEmpty()) {
+ ncs.getExecutor().execute(() -> {
+ jobPartitions.forEach(IDeallocatable::deallocate);
+ pendingRequests.forEach(NetworkOutputChannel::abort);
+ });
+ }
+ }
+
+ public synchronized void jobsCompleted(CcId ccId) {
+ failedJobsCache.asMap().keySet().removeIf(jobId -> jobId.getCcId().equals(ccId));
+ }
+
+ private void updatePartitionState(CcId ccId, PartitionId pid, TaskAttemptId taId, IPartition partition,
PartitionState state) throws HyracksDataException {
PartitionDescriptor desc = new PartitionDescriptor(pid, ncs.getId(), taId, partition.isReusable());
desc.setState(state);
@@ -147,4 +156,36 @@
throw HyracksDataException.create(e);
}
}
-}
+
+ private List<IPartition> unregisterPartitions(JobId jobId) {
+ final List<IPartition> unregisteredPartitions = new ArrayList<>();
+ for (Iterator<Map.Entry<PartitionId, List<IPartition>>> i = availablePartitionMap.entrySet().iterator(); i
+ .hasNext();) {
+ Map.Entry<PartitionId, List<IPartition>> entry = i.next();
+ PartitionId pid = entry.getKey();
+ if (jobId.equals(pid.getJobId())) {
+ unregisteredPartitions.addAll(entry.getValue());
+ i.remove();
+ }
+ }
+ return unregisteredPartitions;
+ }
+
+ private List<NetworkOutputChannel> removePendingRequests(JobId jobId, JobStatus status) {
+ if (status != JobStatus.FAILURE) {
+ return Collections.emptyList();
+ }
+ final List<NetworkOutputChannel> pendingRequests = new ArrayList<>();
+ final Iterator<Map.Entry<PartitionId, NetworkOutputChannel>> requestsIterator =
+ partitionRequests.entrySet().iterator();
+ while (requestsIterator.hasNext()) {
+ final Map.Entry<PartitionId, NetworkOutputChannel> entry = requestsIterator.next();
+ final PartitionId partitionId = entry.getKey();
+ if (partitionId.getJobId().equals(jobId)) {
+ pendingRequests.add(entry.getValue());
+ requestsIterator.remove();
+ }
+ }
+ return pendingRequests;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java
index d38cd5e..c5a9d73 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java
@@ -18,17 +18,13 @@
*/
package org.apache.hyracks.control.nc.work;
-import java.util.ArrayList;
-import java.util.List;
import java.util.Map;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobStatus;
-import org.apache.hyracks.api.partitions.IPartition;
import org.apache.hyracks.control.common.work.AbstractWork;
import org.apache.hyracks.control.nc.Joblet;
import org.apache.hyracks.control.nc.NodeControllerService;
-import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -53,23 +49,7 @@
LOGGER.info("Cleaning up after job: " + jobId);
}
ncs.removeJobParameterByteStore(jobId);
- final List<IPartition> unregisteredPartitions = new ArrayList<IPartition>();
- ncs.getPartitionManager().unregisterPartitions(jobId, unregisteredPartitions);
- ncs.getExecutor().execute(new Runnable() {
- @Override
- public void run() {
- for (IPartition p : unregisteredPartitions) {
- try {
- // Put deallocate in a try block to make sure that every IPartition is de-allocated.
- p.deallocate();
- } catch (Exception e) {
- if (LOGGER.isWarnEnabled()) {
- LOGGER.log(Level.WARN, e.getMessage(), e);
- }
- }
- }
- }
- });
+ ncs.getPartitionManager().jobCompleted(jobId, status);;
Map<JobId, Joblet> jobletMap = ncs.getJobletMap();
Joblet joblet = jobletMap.remove(jobId);
if (joblet != null) {
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java
index 0b548f6..28c1a71 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java
@@ -31,6 +31,7 @@
public abstract class AbstractChannelWriteInterface implements IChannelWriteInterface {
+ public static final int REMOTE_WRITE_ERROR_CODE = 1;
private static final Logger LOGGER = LogManager.getLogger();
protected final IChannelControlBlock ccb;
protected final Queue<ByteBuffer> wiFullQueue;
@@ -135,7 +136,9 @@
return;
}
eos = true;
- adjustChannelWritability();
+ if (ecode != REMOTE_WRITE_ERROR_CODE) {
+ adjustChannelWritability();
+ }
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
index 286320b..a7fa49e 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
@@ -31,6 +31,7 @@
import org.apache.hyracks.api.exceptions.NetException;
import org.apache.hyracks.net.protocols.tcp.ITCPConnectionEventListener;
import org.apache.hyracks.net.protocols.tcp.TCPConnection;
+import org.apache.hyracks.util.annotations.ThreadSafetyGuaranteedBy;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -67,28 +68,7 @@
MultiplexedConnection(MuxDemux muxDemux) {
this.muxDemux = muxDemux;
- pendingWriteEventsCounter = new IEventCounter() {
- private int counter;
-
- @Override
- public synchronized void increment() {
- ++counter;
- if (counter == 1) {
- tcpConnection.enable(SelectionKey.OP_WRITE);
- }
- }
-
- @Override
- public synchronized void decrement() {
- --counter;
- if (counter == 0) {
- tcpConnection.disable(SelectionKey.OP_WRITE);
- }
- if (counter < 0) {
- throw new IllegalStateException();
- }
- }
- };
+ pendingWriteEventsCounter = new EventCounter();
cSet = new ChannelSet(this, pendingWriteEventsCounter);
readerState = new ReaderState();
writerState = new WriterState();
@@ -429,4 +409,32 @@
public IChannelInterfaceFactory getChannelInterfaceFactory() {
return muxDemux.getChannelInterfaceFactory();
}
+
+ @ThreadSafetyGuaranteedBy("MultiplexedConnection.this")
+ private class EventCounter implements IEventCounter {
+ private int counter;
+
+ @Override
+ public synchronized void increment() {
+ if (!connectionFailure) {
+ ++counter;
+ if (counter == 1) {
+ tcpConnection.enable(SelectionKey.OP_WRITE);
+ }
+ }
+ }
+
+ @Override
+ public synchronized void decrement() {
+ if (!connectionFailure) {
+ --counter;
+ if (counter == 0) {
+ tcpConnection.disable(SelectionKey.OP_WRITE);
+ }
+ if (counter < 0) {
+ throw new IllegalStateException();
+ }
+ }
+ }
+ }
}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java
index c12909c..c58cb86 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java
@@ -111,7 +111,9 @@
@Override
public void connectionClosed(TCPConnection connection) {
synchronized (MuxDemux.this) {
- connectionMap.remove(connection.getRemoteAddress());
+ if (connection.getType() == TCPConnection.ConnectionType.OUTGOING) {
+ connectionMap.remove(connection.getRemoteAddress());
+ }
}
}
}, nThreads);
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPConnection.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPConnection.java
index b0e2eed..ff4627a 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPConnection.java
@@ -29,6 +29,11 @@
public class TCPConnection {
+ public enum ConnectionType {
+ INCOMING,
+ OUTGOING
+ }
+
private static final Logger LOGGER = LogManager.getLogger();
private final TCPEndpoint endpoint;
@@ -43,11 +48,15 @@
private Object attachment;
- public TCPConnection(TCPEndpoint endpoint, SocketChannel channel, SelectionKey key, Selector selector) {
+ private ConnectionType type;
+
+ public TCPConnection(TCPEndpoint endpoint, SocketChannel channel, SelectionKey key, Selector selector,
+ ConnectionType type) {
this.endpoint = endpoint;
this.channel = channel;
this.key = key;
this.selector = selector;
+ this.type = type;
remoteAddress = (InetSocketAddress) channel.socket().getRemoteSocketAddress();
}
@@ -102,6 +111,10 @@
}
}
+ public ConnectionType getType() {
+ return type;
+ }
+
@Override
public String toString() {
return "TCPConnection[Remote Address: " + remoteAddress + " Local Address: " + endpoint.getLocalAddress() + "]";
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java
index affa59e..05e2175 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java
@@ -18,6 +18,8 @@
*/
package org.apache.hyracks.net.protocols.tcp;
+import static org.apache.hyracks.net.protocols.tcp.TCPConnection.ConnectionType;
+
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
@@ -160,7 +162,8 @@
for (SocketChannel channel : workingIncomingConnections) {
register(channel);
SelectionKey sKey = channel.register(selector, 0);
- TCPConnection connection = new TCPConnection(TCPEndpoint.this, channel, sKey, selector);
+ TCPConnection connection = new TCPConnection(TCPEndpoint.this, channel, sKey, selector,
+ ConnectionType.INCOMING);
sKey.attach(connection);
synchronized (connectionListener) {
connectionListener.acceptedConnection(connection);
@@ -220,7 +223,8 @@
}
private void createConnection(SelectionKey key, SocketChannel channel) {
- TCPConnection connection = new TCPConnection(TCPEndpoint.this, channel, key, selector);
+ TCPConnection connection =
+ new TCPConnection(TCPEndpoint.this, channel, key, selector, ConnectionType.OUTGOING);
key.attach(connection);
key.interestOps(0);
synchronized (connectionListener) {