[NO ISSUE][NET] Ensure CLOSE Is Not Sent After Channel ERROR

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Currently it is possible to send network channel
  CLOSE command after a channel ERROR was sent. When this
  happens and the channel was recycled to be reused
  on the receiver side, the CLOSE command will result
  in NPE. There is no need to send a CLOSE command
  after an ERROR command because when an ERROR command
  is received, it is treated as ERROR + CLOSE on the
  receiver side.
- Avoid registering partition requests for failed jobs.

Change-Id: I17a769a46f4d13220adb22dd255e56dc4ccc458d
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2954
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Reviewed-by: Michael Blow <mblow@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/MessagingChannelWriteInterface.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/MessagingChannelWriteInterface.java
index 9ad870c..84f7831 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/MessagingChannelWriteInterface.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/MessagingChannelWriteInterface.java
@@ -54,7 +54,7 @@
             ecodeSent = true;
             ccb.reportLocalEOS();
             adjustChannelWritability();
-        } else if (eos && !eosSent) {
+        } else if (isPendingCloseWrite()) {
             writerState.getCommand().setChannelId(channelId);
             writerState.getCommand().setCommandType(MuxDemuxCommand.CommandType.CLOSE_CHANNEL);
             writerState.getCommand().setData(0);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionManager.java
index 9ac7168..7c8fb34 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionManager.java
@@ -107,6 +107,7 @@
     public synchronized void registerPartitionRequest(PartitionId partitionId, NetworkOutputChannel writer) {
         if (failedJobsCache.getIfPresent(partitionId.getJobId()) != null) {
             writer.abort(AbstractChannelWriteInterface.REMOTE_ERROR_CODE);
+            return;
         }
         List<IPartition> pList = availablePartitionMap.get(partitionId);
         if (pList != null && !pList.isEmpty()) {
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java
index 0a28e93..5c927f9 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java
@@ -75,7 +75,7 @@
         if (writableDataPresent) {
             return credits > 0;
         }
-        if (eos && !eosSent) {
+        if (isPendingCloseWrite()) {
             return true;
         }
         return ecode.get() == REMOTE_ERROR_CODE && !ecodeSent;
@@ -116,6 +116,10 @@
         return credits;
     }
 
+    protected boolean isPendingCloseWrite() {
+        return eos && !eosSent && !ecodeSent;
+    }
+
     private class CloseableBufferAcceptor implements ICloseableBufferAcceptor {
         @Override
         public void accept(ByteBuffer buffer) {
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelWriteInterface.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelWriteInterface.java
index 628007d..3f4618b 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelWriteInterface.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelWriteInterface.java
@@ -58,7 +58,7 @@
             ecodeSent = true;
             ccb.reportLocalEOS();
             adjustChannelWritability();
-        } else if (eos && !eosSent) {
+        } else if (isPendingCloseWrite()) {
             writerState.getCommand().setChannelId(channelId);
             writerState.getCommand().setCommandType(MuxDemuxCommand.CommandType.CLOSE_CHANNEL);
             writerState.getCommand().setData(0);