[NO ISSUE][NET] Networking Fixes

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Ensure received partitions requests after job failure
  are aborted to prevent leaked network channels.
- Do not send channel close after channel write error
  since the contract is to close the channel when remote
  errors are received.
- Only remove closed outgoing connections to establish
  new connections since incoming connections need to be
  reestablished by the remote destination.
- Do not perform further operations on failed multiplexed
  connections to avoid CanceledKeyException.
- Add test case for received partition requests after
  job failure.

Change-Id: Idc45f47fdf0419bf75d461e16f028237a5143de7
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2613
Reviewed-by: Michael Blow <mblow@apache.org>
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Michael Blow <mblow@apache.org>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java
index 126d9a4..3016a7a 100644
--- a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java
+++ b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java
@@ -25,6 +25,7 @@
 import org.apache.hyracks.api.comm.IBufferAcceptor;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.net.protocols.muxdemux.AbstractChannelWriteInterface;
 import org.apache.hyracks.net.protocols.muxdemux.ChannelControlBlock;
 
 public class NetworkOutputChannel implements IFrameWriter {
@@ -43,7 +44,7 @@
     public NetworkOutputChannel(ChannelControlBlock ccb, int nBuffers) {
         this.ccb = ccb;
         this.nBuffers = nBuffers;
-        emptyStack = new ArrayDeque<ByteBuffer>(nBuffers);
+        emptyStack = new ArrayDeque<>(nBuffers);
         ccb.getWriteInterface().setEmptyBufferAcceptor(new WriteEmptyBufferAcceptor());
     }
 
@@ -58,7 +59,7 @@
 
     @Override
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-        ByteBuffer destBuffer = null;
+        ByteBuffer destBuffer;
         while (buffer.hasRemaining()) {
             synchronized (this) {
                 while (true) {
@@ -76,6 +77,7 @@
                     try {
                         wait();
                     } catch (InterruptedException e) {
+                        Thread.currentThread().interrupt();
                         throw HyracksDataException.create(e);
                     }
                 }
@@ -94,7 +96,7 @@
 
     @Override
     public void fail() throws HyracksDataException {
-        ccb.getWriteInterface().getFullBufferAcceptor().error(1);
+        ccb.getWriteInterface().getFullBufferAcceptor().error(AbstractChannelWriteInterface.REMOTE_WRITE_ERROR_CODE);
     }
 
     @Override
@@ -103,7 +105,7 @@
     }
 
     public void abort() {
-        ccb.getWriteInterface().getFullBufferAcceptor().error(1);
+        ccb.getWriteInterface().getFullBufferAcceptor().error(AbstractChannelWriteInterface.REMOTE_WRITE_ERROR_CODE);
         synchronized (NetworkOutputChannel.this) {
             aborted = true;
             NetworkOutputChannel.this.notifyAll();
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/pom.xml b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/pom.xml
index d7ed47d..c962029 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/pom.xml
@@ -87,5 +87,9 @@
       <groupId>org.apache.logging.log4j</groupId>
       <artifactId>log4j-api</artifactId>
     </dependency>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
   </dependencies>
 </project>
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 76b5c8c..6a7d645 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -665,6 +665,7 @@
     }
 
     public void notifyTasksCompleted(CcId ccId) throws Exception {
+        partitionManager.jobsCompleted(ccId);
         application.onRegisterNode(ccId);
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java
index f3276a4..cfe0991 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java
@@ -27,7 +27,6 @@
 import org.apache.hyracks.api.comm.ICloseableBufferAcceptor;
 import org.apache.hyracks.api.comm.NetworkAddress;
 import org.apache.hyracks.api.dataflow.ConnectorDescriptorId;
-import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.exceptions.NetException;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.partitions.PartitionId;
@@ -129,12 +128,7 @@
                 LOGGER.debug("Received initial partition request: " + pid + " on channel: " + ccb);
             }
             noc = new NetworkOutputChannel(ccb, nBuffers);
-            try {
-                partitionManager.registerPartitionRequest(pid, noc);
-            } catch (HyracksException e) {
-                e.printStackTrace();
-                noc.abort();
-            }
+            partitionManager.registerPartitionRequest(pid, noc);
         }
 
         @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionManager.java
index 9ee4a9e..d023ce9 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionManager.java
@@ -19,20 +19,22 @@
 package org.apache.hyracks.control.nc.partitions;
 
 import java.util.ArrayList;
-import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.hyracks.api.control.CcId;
 import org.apache.hyracks.api.dataflow.TaskAttemptId;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.io.IWorkspaceFileFactory;
 import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobStatus;
 import org.apache.hyracks.api.partitions.IPartition;
 import org.apache.hyracks.api.partitions.PartitionId;
+import org.apache.hyracks.api.resources.IDeallocatable;
 import org.apache.hyracks.comm.channels.NetworkOutputChannel;
 import org.apache.hyracks.control.common.job.PartitionDescriptor;
 import org.apache.hyracks.control.common.job.PartitionState;
@@ -40,6 +42,9 @@
 import org.apache.hyracks.control.nc.io.WorkspaceFileFactory;
 import org.apache.hyracks.control.nc.resources.DefaultDeallocatableRegistry;
 
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+
 public class PartitionManager {
 
     private final NodeControllerService ncs;
@@ -52,11 +57,14 @@
 
     private final Map<PartitionId, NetworkOutputChannel> partitionRequests = new HashMap<>();
 
+    private final Cache<JobId, JobId> failedJobsCache;
+
     public PartitionManager(NodeControllerService ncs) {
         this.ncs = ncs;
         this.availablePartitionMap = new HashMap<>();
         this.deallocatableRegistry = new DefaultDeallocatableRegistry();
         this.fileFactory = new WorkspaceFileFactory(deallocatableRegistry, ncs.getIoManager());
+        failedJobsCache = CacheBuilder.newBuilder().expireAfterWrite(1, TimeUnit.MINUTES).build();
     }
 
     public synchronized void registerPartition(PartitionId pid, CcId ccId, TaskAttemptId taId, IPartition partition,
@@ -95,37 +103,20 @@
         return availablePartitionMap.get(pid).get(0);
     }
 
-    public synchronized void unregisterPartitions(JobId jobId, Collection<IPartition> unregisteredPartitions) {
-        for (Iterator<Map.Entry<PartitionId, List<IPartition>>> i = availablePartitionMap.entrySet().iterator(); i
-                .hasNext();) {
-            Map.Entry<PartitionId, List<IPartition>> e = i.next();
-            PartitionId pid = e.getKey();
-            if (jobId.equals(pid.getJobId())) {
-                for (IPartition p : e.getValue()) {
-                    unregisteredPartitions.add(p);
-                }
-                i.remove();
-            }
+    public synchronized void registerPartitionRequest(PartitionId partitionId, NetworkOutputChannel writer) {
+        if (failedJobsCache.getIfPresent(partitionId.getJobId()) != null) {
+            writer.abort();
         }
-    }
-
-    public synchronized void registerPartitionRequest(PartitionId partitionId, NetworkOutputChannel writer)
-            throws HyracksException {
-        try {
-            List<IPartition> pList = availablePartitionMap.get(partitionId);
-            if (pList != null && !pList.isEmpty()) {
-                IPartition partition = pList.get(0);
-                writer.setFrameSize(partition.getTaskContext().getInitialFrameSize());
-                partition.writeTo(writer);
-                if (!partition.isReusable()) {
-                    availablePartitionMap.remove(partitionId);
-                }
-            } else {
-                //throw new HyracksException("Request for unknown partition " + partitionId);
-                partitionRequests.put(partitionId, writer);
+        List<IPartition> pList = availablePartitionMap.get(partitionId);
+        if (pList != null && !pList.isEmpty()) {
+            IPartition partition = pList.get(0);
+            writer.setFrameSize(partition.getTaskContext().getInitialFrameSize());
+            partition.writeTo(writer);
+            if (!partition.isReusable()) {
+                availablePartitionMap.remove(partitionId);
             }
-        } catch (Exception e) {
-            throw HyracksDataException.create(e);
+        } else {
+            partitionRequests.put(partitionId, writer);
         }
     }
 
@@ -137,7 +128,25 @@
         deallocatableRegistry.close();
     }
 
-    public void updatePartitionState(CcId ccId, PartitionId pid, TaskAttemptId taId, IPartition partition,
+    public synchronized void jobCompleted(JobId jobId, JobStatus status) {
+        if (status == JobStatus.FAILURE) {
+            failedJobsCache.put(jobId, jobId);
+        }
+        final List<IPartition> jobPartitions = unregisterPartitions(jobId);
+        final List<NetworkOutputChannel> pendingRequests = removePendingRequests(jobId, status);
+        if (!jobPartitions.isEmpty() || !pendingRequests.isEmpty()) {
+            ncs.getExecutor().execute(() -> {
+                jobPartitions.forEach(IDeallocatable::deallocate);
+                pendingRequests.forEach(NetworkOutputChannel::abort);
+            });
+        }
+    }
+
+    public synchronized void jobsCompleted(CcId ccId) {
+        failedJobsCache.asMap().keySet().removeIf(jobId -> jobId.getCcId().equals(ccId));
+    }
+
+    private void updatePartitionState(CcId ccId, PartitionId pid, TaskAttemptId taId, IPartition partition,
             PartitionState state) throws HyracksDataException {
         PartitionDescriptor desc = new PartitionDescriptor(pid, ncs.getId(), taId, partition.isReusable());
         desc.setState(state);
@@ -147,4 +156,36 @@
             throw HyracksDataException.create(e);
         }
     }
-}
+
+    private List<IPartition> unregisterPartitions(JobId jobId) {
+        final List<IPartition> unregisteredPartitions = new ArrayList<>();
+        for (Iterator<Map.Entry<PartitionId, List<IPartition>>> i = availablePartitionMap.entrySet().iterator(); i
+                .hasNext();) {
+            Map.Entry<PartitionId, List<IPartition>> entry = i.next();
+            PartitionId pid = entry.getKey();
+            if (jobId.equals(pid.getJobId())) {
+                unregisteredPartitions.addAll(entry.getValue());
+                i.remove();
+            }
+        }
+        return unregisteredPartitions;
+    }
+
+    private List<NetworkOutputChannel> removePendingRequests(JobId jobId, JobStatus status) {
+        if (status != JobStatus.FAILURE) {
+            return Collections.emptyList();
+        }
+        final List<NetworkOutputChannel> pendingRequests = new ArrayList<>();
+        final Iterator<Map.Entry<PartitionId, NetworkOutputChannel>> requestsIterator =
+                partitionRequests.entrySet().iterator();
+        while (requestsIterator.hasNext()) {
+            final Map.Entry<PartitionId, NetworkOutputChannel> entry = requestsIterator.next();
+            final PartitionId partitionId = entry.getKey();
+            if (partitionId.getJobId().equals(jobId)) {
+                pendingRequests.add(entry.getValue());
+                requestsIterator.remove();
+            }
+        }
+        return pendingRequests;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java
index d38cd5e..c5a9d73 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java
@@ -18,17 +18,13 @@
  */
 package org.apache.hyracks.control.nc.work;
 
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Map;
 
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobStatus;
-import org.apache.hyracks.api.partitions.IPartition;
 import org.apache.hyracks.control.common.work.AbstractWork;
 import org.apache.hyracks.control.nc.Joblet;
 import org.apache.hyracks.control.nc.NodeControllerService;
-import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -53,23 +49,7 @@
             LOGGER.info("Cleaning up after job: " + jobId);
         }
         ncs.removeJobParameterByteStore(jobId);
-        final List<IPartition> unregisteredPartitions = new ArrayList<IPartition>();
-        ncs.getPartitionManager().unregisterPartitions(jobId, unregisteredPartitions);
-        ncs.getExecutor().execute(new Runnable() {
-            @Override
-            public void run() {
-                for (IPartition p : unregisteredPartitions) {
-                    try {
-                        // Put deallocate in a try block to make sure that every IPartition is de-allocated.
-                        p.deallocate();
-                    } catch (Exception e) {
-                        if (LOGGER.isWarnEnabled()) {
-                            LOGGER.log(Level.WARN, e.getMessage(), e);
-                        }
-                    }
-                }
-            }
-        });
+        ncs.getPartitionManager().jobCompleted(jobId, status);;
         Map<JobId, Joblet> jobletMap = ncs.getJobletMap();
         Joblet joblet = jobletMap.remove(jobId);
         if (joblet != null) {
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java
index 0b548f6..28c1a71 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java
@@ -31,6 +31,7 @@
 
 public abstract class AbstractChannelWriteInterface implements IChannelWriteInterface {
 
+    public static final int REMOTE_WRITE_ERROR_CODE = 1;
     private static final Logger LOGGER = LogManager.getLogger();
     protected final IChannelControlBlock ccb;
     protected final Queue<ByteBuffer> wiFullQueue;
@@ -135,7 +136,9 @@
                     return;
                 }
                 eos = true;
-                adjustChannelWritability();
+                if (ecode != REMOTE_WRITE_ERROR_CODE) {
+                    adjustChannelWritability();
+                }
             }
         }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
index 286320b..a7fa49e 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
@@ -31,6 +31,7 @@
 import org.apache.hyracks.api.exceptions.NetException;
 import org.apache.hyracks.net.protocols.tcp.ITCPConnectionEventListener;
 import org.apache.hyracks.net.protocols.tcp.TCPConnection;
+import org.apache.hyracks.util.annotations.ThreadSafetyGuaranteedBy;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -67,28 +68,7 @@
 
     MultiplexedConnection(MuxDemux muxDemux) {
         this.muxDemux = muxDemux;
-        pendingWriteEventsCounter = new IEventCounter() {
-            private int counter;
-
-            @Override
-            public synchronized void increment() {
-                ++counter;
-                if (counter == 1) {
-                    tcpConnection.enable(SelectionKey.OP_WRITE);
-                }
-            }
-
-            @Override
-            public synchronized void decrement() {
-                --counter;
-                if (counter == 0) {
-                    tcpConnection.disable(SelectionKey.OP_WRITE);
-                }
-                if (counter < 0) {
-                    throw new IllegalStateException();
-                }
-            }
-        };
+        pendingWriteEventsCounter = new EventCounter();
         cSet = new ChannelSet(this, pendingWriteEventsCounter);
         readerState = new ReaderState();
         writerState = new WriterState();
@@ -429,4 +409,32 @@
     public IChannelInterfaceFactory getChannelInterfaceFactory() {
         return muxDemux.getChannelInterfaceFactory();
     }
+
+    @ThreadSafetyGuaranteedBy("MultiplexedConnection.this")
+    private class EventCounter implements IEventCounter {
+        private int counter;
+
+        @Override
+        public synchronized void increment() {
+            if (!connectionFailure) {
+                ++counter;
+                if (counter == 1) {
+                    tcpConnection.enable(SelectionKey.OP_WRITE);
+                }
+            }
+        }
+
+        @Override
+        public synchronized void decrement() {
+            if (!connectionFailure) {
+                --counter;
+                if (counter == 0) {
+                    tcpConnection.disable(SelectionKey.OP_WRITE);
+                }
+                if (counter < 0) {
+                    throw new IllegalStateException();
+                }
+            }
+        }
+    }
 }
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java
index c12909c..c58cb86 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java
@@ -111,7 +111,9 @@
             @Override
             public void connectionClosed(TCPConnection connection) {
                 synchronized (MuxDemux.this) {
-                    connectionMap.remove(connection.getRemoteAddress());
+                    if (connection.getType() == TCPConnection.ConnectionType.OUTGOING) {
+                        connectionMap.remove(connection.getRemoteAddress());
+                    }
                 }
             }
         }, nThreads);
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPConnection.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPConnection.java
index b0e2eed..ff4627a 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPConnection.java
@@ -29,6 +29,11 @@
 
 public class TCPConnection {
 
+    public enum ConnectionType {
+        INCOMING,
+        OUTGOING
+    }
+
     private static final Logger LOGGER = LogManager.getLogger();
 
     private final TCPEndpoint endpoint;
@@ -43,11 +48,15 @@
 
     private Object attachment;
 
-    public TCPConnection(TCPEndpoint endpoint, SocketChannel channel, SelectionKey key, Selector selector) {
+    private ConnectionType type;
+
+    public TCPConnection(TCPEndpoint endpoint, SocketChannel channel, SelectionKey key, Selector selector,
+            ConnectionType type) {
         this.endpoint = endpoint;
         this.channel = channel;
         this.key = key;
         this.selector = selector;
+        this.type = type;
         remoteAddress = (InetSocketAddress) channel.socket().getRemoteSocketAddress();
     }
 
@@ -102,6 +111,10 @@
         }
     }
 
+    public ConnectionType getType() {
+        return type;
+    }
+
     @Override
     public String toString() {
         return "TCPConnection[Remote Address: " + remoteAddress + " Local Address: " + endpoint.getLocalAddress() + "]";
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java
index affa59e..05e2175 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java
@@ -18,6 +18,8 @@
  */
 package org.apache.hyracks.net.protocols.tcp;
 
+import static org.apache.hyracks.net.protocols.tcp.TCPConnection.ConnectionType;
+
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
@@ -160,7 +162,8 @@
                         for (SocketChannel channel : workingIncomingConnections) {
                             register(channel);
                             SelectionKey sKey = channel.register(selector, 0);
-                            TCPConnection connection = new TCPConnection(TCPEndpoint.this, channel, sKey, selector);
+                            TCPConnection connection = new TCPConnection(TCPEndpoint.this, channel, sKey, selector,
+                                    ConnectionType.INCOMING);
                             sKey.attach(connection);
                             synchronized (connectionListener) {
                                 connectionListener.acceptedConnection(connection);
@@ -220,7 +223,8 @@
         }
 
         private void createConnection(SelectionKey key, SocketChannel channel) {
-            TCPConnection connection = new TCPConnection(TCPEndpoint.this, channel, key, selector);
+            TCPConnection connection =
+                    new TCPConnection(TCPEndpoint.this, channel, key, selector, ConnectionType.OUTGOING);
             key.attach(connection);
             key.interestOps(0);
             synchronized (connectionListener) {