[NO ISSUE][RT] Abort tasks on local network failures

- user model changes: no
- storage format changes: no
- interface changes: yes
  Add error code to IInputChannelMonitor.notifyFailure

Details:
- Previously, there was an assumption that all failures
  reported to an IInputChannelMonitor come from a remote
  task.
- This assumption is not always true and could lead
  to jobs hanging.
- To fix this, we report an error code indicating whether
  the failure is local or remote and if the failure is local
  then we fail the local task and report the failure to cc.

Change-Id: I7ea5b9008383faaac7c563671242b03919090b35
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2806
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/channels/IInputChannelMonitor.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/channels/IInputChannelMonitor.java
index 52509d3..559f49a 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/channels/IInputChannelMonitor.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/channels/IInputChannelMonitor.java
@@ -19,9 +19,9 @@
 package org.apache.hyracks.api.channels;
 
 public interface IInputChannelMonitor {
-    public void notifyFailure(IInputChannel channel);
+    void notifyFailure(IInputChannel channel, int errorCode);
 
-    public void notifyDataAvailability(IInputChannel channel, int nFrames);
+    void notifyDataAvailability(IInputChannel channel, int nFrames);
 
-    public void notifyEndOfStream(IInputChannel channel);
+    void notifyEndOfStream(IInputChannel channel);
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index b6d7cc7..09193d9 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -148,6 +148,7 @@
     public static final int CANNOT_ADD_ELEMENT_TO_INVERTED_INDEX_SEARCH_RESULT = 112;
     public static final int UNDEFINED_INVERTED_LIST_MERGE_TYPE = 113;
     public static final int NODE_IS_NOT_ACTIVE = 114;
+    public static final int LOCAL_NETWORK_ERROR = 115;
 
     // Compilation error codes.
     public static final int RULECOLLECTION_NOT_INSTANCE_OF_LIST = 10000;
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index bf73b91..c704d7e 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -131,6 +131,7 @@
 112 = Cannot add an element to an inverted-index search result.
 113 = Undefined inverted-list merge type: %1$s
 114 = Node (%1$s) is not active
+115 = Local network error
 
 10000 = The given rule collection %1$s is not an instance of the List class.
 10001 = Cannot compose partition constraint %1$s with %2$s
diff --git a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetReader.java b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetReader.java
index 36c77ce..b1566f4 100644
--- a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetReader.java
+++ b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetReader.java
@@ -199,7 +199,7 @@
         }
 
         @Override
-        public synchronized void notifyFailure(IInputChannel channel) {
+        public synchronized void notifyFailure(IInputChannel channel, int errorCode) {
             failed = true;
             notifyAll();
         }
diff --git a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/DatasetNetworkInputChannel.java b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/DatasetNetworkInputChannel.java
index 44c3d36..0f96a6e 100644
--- a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/DatasetNetworkInputChannel.java
+++ b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/DatasetNetworkInputChannel.java
@@ -141,7 +141,7 @@
 
         @Override
         public void error(int ecode) {
-            monitor.notifyFailure(DatasetNetworkInputChannel.this);
+            monitor.notifyFailure(DatasetNetworkInputChannel.this, ecode);
         }
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkInputChannel.java b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkInputChannel.java
index a831492..7e893f8 100644
--- a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkInputChannel.java
+++ b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkInputChannel.java
@@ -133,7 +133,7 @@
 
         @Override
         public void error(int ecode) {
-            monitor.notifyFailure(NetworkInputChannel.this);
+            monitor.notifyFailure(NetworkInputChannel.this, ecode);
         }
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java
index 3016a7a..8bee56e 100644
--- a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java
+++ b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java
@@ -96,7 +96,7 @@
 
     @Override
     public void fail() throws HyracksDataException {
-        ccb.getWriteInterface().getFullBufferAcceptor().error(AbstractChannelWriteInterface.REMOTE_WRITE_ERROR_CODE);
+        ccb.getWriteInterface().getFullBufferAcceptor().error(AbstractChannelWriteInterface.REMOTE_ERROR_CODE);
     }
 
     @Override
@@ -105,7 +105,7 @@
     }
 
     public void abort() {
-        ccb.getWriteInterface().getFullBufferAcceptor().error(AbstractChannelWriteInterface.REMOTE_WRITE_ERROR_CODE);
+        ccb.getWriteInterface().getFullBufferAcceptor().error(AbstractChannelWriteInterface.REMOTE_ERROR_CODE);
         synchronized (NetworkOutputChannel.this) {
             aborted = true;
             NetworkOutputChannel.this.notifyAll();
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
index 23a5abb..340924d 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
@@ -309,7 +309,8 @@
                                 if (!addPendingThread(thread)) {
                                     return;
                                 }
-                                thread.setName(displayName + ":" + taskAttemptId + ":" + cIdx);
+                                thread.setName(
+                                        displayName + ":" + joblet.getJobId() + ":" + taskAttemptId + ":" + cIdx);
                                 thread.setPriority(Thread.MIN_PRIORITY);
                                 try {
                                     pushFrames(collector, inputChannelsFromConnectors.get(cIdx), writer);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/ReceiveSideMaterializingCollector.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/ReceiveSideMaterializingCollector.java
index c902ad8..5ffed7e 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/ReceiveSideMaterializingCollector.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/ReceiveSideMaterializingCollector.java
@@ -103,7 +103,7 @@
         }
 
         @Override
-        public synchronized void notifyFailure(IInputChannel channel) {
+        public synchronized void notifyFailure(IInputChannel channel, int errorCode) {
             failed.set(true);
             notifyAll();
         }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/pom.xml b/hyracks-fullstack/hyracks/hyracks-dataflow-std/pom.xml
index c78c98c..0ff425f 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/pom.xml
@@ -76,6 +76,11 @@
       <version>${project.version}</version>
     </dependency>
     <dependency>
+      <groupId>org.apache.hyracks</groupId>
+      <artifactId>hyracks-net</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
       <groupId>com.e-movimento.tinytools</groupId>
       <artifactId>privilegedaccessor</artifactId>
       <version>1.2.2</version>
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/InputChannelFrameReader.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/InputChannelFrameReader.java
index bf9f575..5ce29c2 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/InputChannelFrameReader.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/InputChannelFrameReader.java
@@ -25,8 +25,10 @@
 import org.apache.hyracks.api.comm.FrameHelper;
 import org.apache.hyracks.api.comm.IFrame;
 import org.apache.hyracks.api.comm.IFrameReader;
+import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
+import org.apache.hyracks.net.protocols.muxdemux.AbstractChannelWriteInterface;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -38,28 +40,35 @@
 
     private boolean eos;
 
-    private boolean failed;
+    private int errorCode;
 
     public InputChannelFrameReader(IInputChannel channel) {
         this.channel = channel;
         availableFrames = 0;
+        errorCode = AbstractChannelWriteInterface.NO_ERROR_CODE;
         eos = false;
-        failed = false;
     }
 
     @Override
     public void open() throws HyracksDataException {
     }
 
+    private boolean hasFailed() {
+        return errorCode != AbstractChannelWriteInterface.NO_ERROR_CODE;
+    }
+
     private synchronized boolean canGetNextBuffer() throws HyracksDataException {
-        while (!failed && !eos && availableFrames <= 0) {
+        while (!hasFailed() && !eos && availableFrames <= 0) {
             try {
                 wait();
             } catch (InterruptedException e) {
                 throw HyracksDataException.create(e);
             }
         }
-        if (failed) {
+        if (hasFailed()) {
+            if (errorCode == AbstractChannelWriteInterface.LOCAL_ERROR_CODE) {
+                throw HyracksDataException.create(ErrorCode.LOCAL_NETWORK_ERROR);
+            }
             // Do not throw exception here to allow the root cause exception gets propagated to the master first.
             // Return false to allow the nextFrame(...) call to be a non-op.
             LOGGER.warn("Sender failed.. returning silently");
@@ -96,8 +105,7 @@
 
         for (int i = 1; i < nBlocks; ++i) {
             if (!canGetNextBuffer()) {
-                throw new HyracksDataException(
-                        "InputChannelReader is waiting for the new frames, but the input stream is finished");
+                return false;
             }
             srcFrame = channel.getNextBuffer();
             frame.getBuffer().put(srcFrame);
@@ -116,8 +124,11 @@
     }
 
     @Override
-    public synchronized void notifyFailure(IInputChannel channel) {
-        failed = true;
+    public synchronized void notifyFailure(IInputChannel channel, int errorCode) {
+        // Note: if a remote failure overwrites the value of localFailure, then we rely on
+        // the fact that the remote task will notify the cc of the failure.
+        // Otherwise, the local task must fail
+        this.errorCode = errorCode;
         notifyAll();
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/NonDeterministicChannelReader.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/NonDeterministicChannelReader.java
index 3c0a06b..5a1d5f8 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/NonDeterministicChannelReader.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/NonDeterministicChannelReader.java
@@ -23,8 +23,10 @@
 
 import org.apache.hyracks.api.channels.IInputChannel;
 import org.apache.hyracks.api.channels.IInputChannelMonitor;
+import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.partitions.PartitionId;
+import org.apache.hyracks.net.protocols.muxdemux.AbstractChannelWriteInterface;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -47,6 +49,8 @@
 
     private int lastReadSender;
 
+    private boolean localFailure;
+
     public NonDeterministicChannelReader(int nSenderPartitions, BitSet expectedPartitions) {
         this.nSenderPartitions = nSenderPartitions;
         channels = new IInputChannel[nSenderPartitions];
@@ -107,6 +111,9 @@
             }
             if (!failSenders.isEmpty()) {
                 LOGGER.warn("Sender failed.. returning silently");
+                if (localFailure) {
+                    throw HyracksDataException.create(ErrorCode.LOCAL_NETWORK_ERROR);
+                }
                 // Do not throw exception here to allow the root cause exception gets propagated to the master first.
                 // Return a negative value to allow the nextFrame(...) call to be a non-op.
                 return -1;
@@ -141,11 +148,15 @@
     }
 
     @Override
-    public synchronized void notifyFailure(IInputChannel channel) {
+    public synchronized void notifyFailure(IInputChannel channel, int errorCode) {
         PartitionId pid = (PartitionId) channel.getAttachment();
         int senderIndex = pid.getSenderIndex();
         LOGGER.warn("Failure: " + pid.getConnectorDescriptorId() + " sender: " + senderIndex + " receiver: "
                 + pid.getReceiverIndex());
+        // Note: if a remote failure overwrites the value of localFailure, then we rely on
+        // the fact that the remote task will notify the cc of the failure.
+        // Otherwise, the local task must fail
+        localFailure = errorCode == AbstractChannelWriteInterface.LOCAL_ERROR_CODE;
         failSenders.set(senderIndex);
         eosSenders.set(senderIndex);
         notifyAll();
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelReadInterface.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelReadInterface.java
index ff8d451..31cb69f 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelReadInterface.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelReadInterface.java
@@ -42,10 +42,6 @@
         }
     }
 
-    public void reportError(int ecode) {
-        fba.error(ecode);
-    }
-
     @Override
     public void setFullBufferAcceptor(ICloseableBufferAcceptor fullBufferAcceptor) {
         fba = fullBufferAcceptor;
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java
index 28c1a71..7ed9bfa 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java
@@ -31,7 +31,9 @@
 
 public abstract class AbstractChannelWriteInterface implements IChannelWriteInterface {
 
-    public static final int REMOTE_WRITE_ERROR_CODE = 1;
+    public static final int NO_ERROR_CODE = 0;
+    public static final int REMOTE_ERROR_CODE = 1;
+    public static final int LOCAL_ERROR_CODE = -1;
     private static final Logger LOGGER = LogManager.getLogger();
     protected final IChannelControlBlock ccb;
     protected final Queue<ByteBuffer> wiFullQueue;
@@ -136,7 +138,7 @@
                     return;
                 }
                 eos = true;
-                if (ecode != REMOTE_WRITE_ERROR_CODE) {
+                if (ecode != REMOTE_ERROR_CODE) {
                     adjustChannelWritability();
                 }
             }
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelSet.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelSet.java
index c2fb9a8..49b9f7a 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelSet.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelSet.java
@@ -200,7 +200,7 @@
             for (int i = 0; i < ccbArray.length; ++i) {
                 ChannelControlBlock ccb = ccbArray[i];
                 if (ccb != null) {
-                    ccb.reportRemoteError(-1);
+                    ccb.reportRemoteError(AbstractChannelWriteInterface.LOCAL_ERROR_CODE);
                     markEOSAck(i);
                     unmarkPendingCredits(i);
                 }