[NO ISSUE][NET] Networking Fixes

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Ensure received partitions requests after job failure
  are aborted to prevent leaked network channels.
- Do not send channel close after channel write error
  since the contract is to close the channel when remote
  errors are received.
- Only remove closed outgoing connections to establish
  new connections since incoming connections need to be
  reestablished by the remote destination.
- Do not perform further operations on failed multiplexed
  connections to avoid CanceledKeyException.
- Add test case for received partition requests after
  job failure.

Change-Id: Idc45f47fdf0419bf75d461e16f028237a5143de7
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2613
Reviewed-by: Michael Blow <mblow@apache.org>
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Michael Blow <mblow@apache.org>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/pom.xml b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/pom.xml
index d7ed47d..c962029 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/pom.xml
@@ -87,5 +87,9 @@
       <groupId>org.apache.logging.log4j</groupId>
       <artifactId>log4j-api</artifactId>
     </dependency>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
   </dependencies>
 </project>
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 76b5c8c..6a7d645 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -665,6 +665,7 @@
     }
 
     public void notifyTasksCompleted(CcId ccId) throws Exception {
+        partitionManager.jobsCompleted(ccId);
         application.onRegisterNode(ccId);
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java
index f3276a4..cfe0991 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java
@@ -27,7 +27,6 @@
 import org.apache.hyracks.api.comm.ICloseableBufferAcceptor;
 import org.apache.hyracks.api.comm.NetworkAddress;
 import org.apache.hyracks.api.dataflow.ConnectorDescriptorId;
-import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.exceptions.NetException;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.partitions.PartitionId;
@@ -129,12 +128,7 @@
                 LOGGER.debug("Received initial partition request: " + pid + " on channel: " + ccb);
             }
             noc = new NetworkOutputChannel(ccb, nBuffers);
-            try {
-                partitionManager.registerPartitionRequest(pid, noc);
-            } catch (HyracksException e) {
-                e.printStackTrace();
-                noc.abort();
-            }
+            partitionManager.registerPartitionRequest(pid, noc);
         }
 
         @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionManager.java
index 9ee4a9e..d023ce9 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionManager.java
@@ -19,20 +19,22 @@
 package org.apache.hyracks.control.nc.partitions;
 
 import java.util.ArrayList;
-import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.hyracks.api.control.CcId;
 import org.apache.hyracks.api.dataflow.TaskAttemptId;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.io.IWorkspaceFileFactory;
 import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobStatus;
 import org.apache.hyracks.api.partitions.IPartition;
 import org.apache.hyracks.api.partitions.PartitionId;
+import org.apache.hyracks.api.resources.IDeallocatable;
 import org.apache.hyracks.comm.channels.NetworkOutputChannel;
 import org.apache.hyracks.control.common.job.PartitionDescriptor;
 import org.apache.hyracks.control.common.job.PartitionState;
@@ -40,6 +42,9 @@
 import org.apache.hyracks.control.nc.io.WorkspaceFileFactory;
 import org.apache.hyracks.control.nc.resources.DefaultDeallocatableRegistry;
 
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+
 public class PartitionManager {
 
     private final NodeControllerService ncs;
@@ -52,11 +57,14 @@
 
     private final Map<PartitionId, NetworkOutputChannel> partitionRequests = new HashMap<>();
 
+    private final Cache<JobId, JobId> failedJobsCache;
+
     public PartitionManager(NodeControllerService ncs) {
         this.ncs = ncs;
         this.availablePartitionMap = new HashMap<>();
         this.deallocatableRegistry = new DefaultDeallocatableRegistry();
         this.fileFactory = new WorkspaceFileFactory(deallocatableRegistry, ncs.getIoManager());
+        failedJobsCache = CacheBuilder.newBuilder().expireAfterWrite(1, TimeUnit.MINUTES).build();
     }
 
     public synchronized void registerPartition(PartitionId pid, CcId ccId, TaskAttemptId taId, IPartition partition,
@@ -95,37 +103,20 @@
         return availablePartitionMap.get(pid).get(0);
     }
 
-    public synchronized void unregisterPartitions(JobId jobId, Collection<IPartition> unregisteredPartitions) {
-        for (Iterator<Map.Entry<PartitionId, List<IPartition>>> i = availablePartitionMap.entrySet().iterator(); i
-                .hasNext();) {
-            Map.Entry<PartitionId, List<IPartition>> e = i.next();
-            PartitionId pid = e.getKey();
-            if (jobId.equals(pid.getJobId())) {
-                for (IPartition p : e.getValue()) {
-                    unregisteredPartitions.add(p);
-                }
-                i.remove();
-            }
+    public synchronized void registerPartitionRequest(PartitionId partitionId, NetworkOutputChannel writer) {
+        if (failedJobsCache.getIfPresent(partitionId.getJobId()) != null) {
+            writer.abort();
         }
-    }
-
-    public synchronized void registerPartitionRequest(PartitionId partitionId, NetworkOutputChannel writer)
-            throws HyracksException {
-        try {
-            List<IPartition> pList = availablePartitionMap.get(partitionId);
-            if (pList != null && !pList.isEmpty()) {
-                IPartition partition = pList.get(0);
-                writer.setFrameSize(partition.getTaskContext().getInitialFrameSize());
-                partition.writeTo(writer);
-                if (!partition.isReusable()) {
-                    availablePartitionMap.remove(partitionId);
-                }
-            } else {
-                //throw new HyracksException("Request for unknown partition " + partitionId);
-                partitionRequests.put(partitionId, writer);
+        List<IPartition> pList = availablePartitionMap.get(partitionId);
+        if (pList != null && !pList.isEmpty()) {
+            IPartition partition = pList.get(0);
+            writer.setFrameSize(partition.getTaskContext().getInitialFrameSize());
+            partition.writeTo(writer);
+            if (!partition.isReusable()) {
+                availablePartitionMap.remove(partitionId);
             }
-        } catch (Exception e) {
-            throw HyracksDataException.create(e);
+        } else {
+            partitionRequests.put(partitionId, writer);
         }
     }
 
@@ -137,7 +128,25 @@
         deallocatableRegistry.close();
     }
 
-    public void updatePartitionState(CcId ccId, PartitionId pid, TaskAttemptId taId, IPartition partition,
+    public synchronized void jobCompleted(JobId jobId, JobStatus status) {
+        if (status == JobStatus.FAILURE) {
+            failedJobsCache.put(jobId, jobId);
+        }
+        final List<IPartition> jobPartitions = unregisterPartitions(jobId);
+        final List<NetworkOutputChannel> pendingRequests = removePendingRequests(jobId, status);
+        if (!jobPartitions.isEmpty() || !pendingRequests.isEmpty()) {
+            ncs.getExecutor().execute(() -> {
+                jobPartitions.forEach(IDeallocatable::deallocate);
+                pendingRequests.forEach(NetworkOutputChannel::abort);
+            });
+        }
+    }
+
+    public synchronized void jobsCompleted(CcId ccId) {
+        failedJobsCache.asMap().keySet().removeIf(jobId -> jobId.getCcId().equals(ccId));
+    }
+
+    private void updatePartitionState(CcId ccId, PartitionId pid, TaskAttemptId taId, IPartition partition,
             PartitionState state) throws HyracksDataException {
         PartitionDescriptor desc = new PartitionDescriptor(pid, ncs.getId(), taId, partition.isReusable());
         desc.setState(state);
@@ -147,4 +156,36 @@
             throw HyracksDataException.create(e);
         }
     }
-}
+
+    private List<IPartition> unregisterPartitions(JobId jobId) {
+        final List<IPartition> unregisteredPartitions = new ArrayList<>();
+        for (Iterator<Map.Entry<PartitionId, List<IPartition>>> i = availablePartitionMap.entrySet().iterator(); i
+                .hasNext();) {
+            Map.Entry<PartitionId, List<IPartition>> entry = i.next();
+            PartitionId pid = entry.getKey();
+            if (jobId.equals(pid.getJobId())) {
+                unregisteredPartitions.addAll(entry.getValue());
+                i.remove();
+            }
+        }
+        return unregisteredPartitions;
+    }
+
+    private List<NetworkOutputChannel> removePendingRequests(JobId jobId, JobStatus status) {
+        if (status != JobStatus.FAILURE) {
+            return Collections.emptyList();
+        }
+        final List<NetworkOutputChannel> pendingRequests = new ArrayList<>();
+        final Iterator<Map.Entry<PartitionId, NetworkOutputChannel>> requestsIterator =
+                partitionRequests.entrySet().iterator();
+        while (requestsIterator.hasNext()) {
+            final Map.Entry<PartitionId, NetworkOutputChannel> entry = requestsIterator.next();
+            final PartitionId partitionId = entry.getKey();
+            if (partitionId.getJobId().equals(jobId)) {
+                pendingRequests.add(entry.getValue());
+                requestsIterator.remove();
+            }
+        }
+        return pendingRequests;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java
index d38cd5e..c5a9d73 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java
@@ -18,17 +18,13 @@
  */
 package org.apache.hyracks.control.nc.work;
 
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Map;
 
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobStatus;
-import org.apache.hyracks.api.partitions.IPartition;
 import org.apache.hyracks.control.common.work.AbstractWork;
 import org.apache.hyracks.control.nc.Joblet;
 import org.apache.hyracks.control.nc.NodeControllerService;
-import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -53,23 +49,7 @@
             LOGGER.info("Cleaning up after job: " + jobId);
         }
         ncs.removeJobParameterByteStore(jobId);
-        final List<IPartition> unregisteredPartitions = new ArrayList<IPartition>();
-        ncs.getPartitionManager().unregisterPartitions(jobId, unregisteredPartitions);
-        ncs.getExecutor().execute(new Runnable() {
-            @Override
-            public void run() {
-                for (IPartition p : unregisteredPartitions) {
-                    try {
-                        // Put deallocate in a try block to make sure that every IPartition is de-allocated.
-                        p.deallocate();
-                    } catch (Exception e) {
-                        if (LOGGER.isWarnEnabled()) {
-                            LOGGER.log(Level.WARN, e.getMessage(), e);
-                        }
-                    }
-                }
-            }
-        });
+        ncs.getPartitionManager().jobCompleted(jobId, status);;
         Map<JobId, Joblet> jobletMap = ncs.getJobletMap();
         Joblet joblet = jobletMap.remove(jobId);
         if (joblet != null) {