[NO ISSUE][NET] Allow Data Receivers To Report Errors

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:
- When an error is encountered while reading a result,
  report an error to the node sending the data to allow
  it to abort the operation.
- Allow FullFrameChannelWriteInterface to report errors
  even when some data is still pending to be sent.
- Add test case to ensure result senders are terminated.

Change-Id: Ie7fba6760edb498b88112a7a68b1d0b9f08022b5
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/5323
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ResultStreamingFailureTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ResultStreamingFailureTest.java
new file mode 100644
index 0000000..3a4823f
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ResultStreamingFailureTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.runtime;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil;
+import org.apache.http.NameValuePair;
+import org.apache.http.client.entity.UrlEncodedFormEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.message.BasicNameValuePair;
+import org.apache.hyracks.control.nc.result.ResultPartitionReader;
+import org.apache.hyracks.util.Span;
+import org.apache.hyracks.util.ThreadDumpUtil;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ResultStreamingFailureTest {
+
+    private static final AsterixHyracksIntegrationUtil integrationUtil = new AsterixHyracksIntegrationUtil();
+
+    @Before
+    public void setUp() throws Exception {
+        integrationUtil.init(true, AsterixHyracksIntegrationUtil.DEFAULT_CONF_FILE);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        integrationUtil.deinit(true);
+    }
+
+    @Test
+    public void resultStreamingFailureTest() throws Exception {
+        queryAndDropConnection();
+        // allow result sender to terminate and ensure no leaks
+        Span timeout = Span.start(5, TimeUnit.SECONDS);
+        while (!timeout.elapsed()) {
+            String threadDump = ThreadDumpUtil.takeDumpString();
+            if (!threadDump.contains(ResultPartitionReader.class.getName())) {
+                return;
+            }
+            TimeUnit.SECONDS.sleep(1);
+        }
+        throw new AssertionError("found leaking senders in:\n" + ThreadDumpUtil.takeDumpString());
+    }
+
+    private void queryAndDropConnection() throws IOException {
+        try (CloseableHttpClient httpClient = HttpClients.createDefault();) {
+            final List<NameValuePair> params = new ArrayList<>();
+            params.add(new BasicNameValuePair("statement", "select * from range(1, 10000000) r;"));
+            HttpPost request = new HttpPost("http://localhost:19004/query/service");
+            request.setEntity(new UrlEncodedFormEntity(params, StandardCharsets.UTF_8));
+            CloseableHttpResponse response = httpClient.execute(request);
+            Assert.assertEquals(200, response.getStatusLine().getStatusCode());
+            // close connection without streaming the result
+        }
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/channels/IInputChannel.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/channels/IInputChannel.java
index 4deba7b..7baf268 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/channels/IInputChannel.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/channels/IInputChannel.java
@@ -37,4 +37,9 @@
     public void open(IHyracksCommonContext ctx) throws HyracksDataException;
 
     public void close() throws HyracksDataException;
+
+    /**
+     * Called when a failure is encountered while reading data
+     */
+    void fail();
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultSetReader.java b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultSetReader.java
index c8f5bd9..3c5a3d1 100644
--- a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultSetReader.java
+++ b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultSetReader.java
@@ -90,36 +90,43 @@
     public int read(IFrame frame) throws HyracksDataException {
         frame.reset();
         int readSize = 0;
-        if (isFirstRead() && !hasNextRecord()) {
-            return readSize;
-        }
-        // read until frame is full or all result records have been read
-        while (readSize < frame.getFrameSize()) {
-            if (currentRecordMonitor.hasMoreFrames()) {
-                final ByteBuffer readBuffer = currentRecordChannel.getNextBuffer();
-                if (readBuffer == null) {
-                    throw new IllegalStateException("Unexpected empty frame");
-                }
-                currentRecordMonitor.notifyFrameRead();
-                if (readSize == 0) {
-                    final int nBlocks = FrameHelper.deserializeNumOfMinFrame(readBuffer);
-                    frame.ensureFrameSize(frame.getMinSize() * nBlocks);
-                    frame.getBuffer().clear();
-                }
-                frame.getBuffer().put(readBuffer);
-                currentRecordChannel.recycleBuffer(readBuffer);
-                readSize = frame.getBuffer().position();
-            } else {
-                currentRecordChannel.close();
-                if (currentRecordMonitor.failed()) {
-                    throw HyracksDataException.create(ErrorCode.FAILED_TO_READ_RESULT, jobId);
-                }
-                if (isLastRecord() || !hasNextRecord()) {
-                    break;
+        try {
+            if (isFirstRead() && !hasNextRecord()) {
+                return readSize;
+            }
+            // read until frame is full or all result records have been read
+            while (readSize < frame.getFrameSize()) {
+                if (currentRecordMonitor.hasMoreFrames()) {
+                    final ByteBuffer readBuffer = currentRecordChannel.getNextBuffer();
+                    if (readBuffer == null) {
+                        throw new IllegalStateException("Unexpected empty frame");
+                    }
+                    currentRecordMonitor.notifyFrameRead();
+                    if (readSize == 0) {
+                        final int nBlocks = FrameHelper.deserializeNumOfMinFrame(readBuffer);
+                        frame.ensureFrameSize(frame.getMinSize() * nBlocks);
+                        frame.getBuffer().clear();
+                    }
+                    frame.getBuffer().put(readBuffer);
+                    currentRecordChannel.recycleBuffer(readBuffer);
+                    readSize = frame.getBuffer().position();
+                } else {
+                    currentRecordChannel.close();
+                    if (currentRecordMonitor.failed()) {
+                        throw HyracksDataException.create(ErrorCode.FAILED_TO_READ_RESULT, jobId);
+                    }
+                    if (isLastRecord() || !hasNextRecord()) {
+                        break;
+                    }
                 }
             }
+            frame.getBuffer().flip();
+        } catch (Exception e) {
+            if (isLocalFailure()) {
+                currentRecordChannel.fail();
+            }
+            throw e;
         }
-        frame.getBuffer().flip();
         return readSize;
     }
 
@@ -201,6 +208,10 @@
         return knownRecords != null && currentRecord == knownRecords.length - 1;
     }
 
+    private boolean isLocalFailure() {
+        return currentRecordMonitor != null && !currentRecordMonitor.failed();
+    }
+
     private static class ResultInputChannelMonitor implements IInputChannelMonitor {
 
         private int availableFrames;
diff --git a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkInputChannel.java b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkInputChannel.java
index 58664c6..53bb7cd 100644
--- a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkInputChannel.java
+++ b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkInputChannel.java
@@ -119,6 +119,11 @@
 
     }
 
+    @Override
+    public void fail() {
+        // do nothing (covered by job lifecycle)
+    }
+
     private class ReadFullBufferAcceptor implements ICloseableBufferAcceptor {
         @Override
         public void accept(ByteBuffer buffer) {
diff --git a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/ResultNetworkInputChannel.java b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/ResultNetworkInputChannel.java
index 1df39e9..38cf7c4 100644
--- a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/ResultNetworkInputChannel.java
+++ b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/ResultNetworkInputChannel.java
@@ -32,6 +32,7 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.result.ResultSetId;
+import org.apache.hyracks.net.protocols.muxdemux.AbstractChannelWriteInterface;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -127,6 +128,11 @@
 
     }
 
+    @Override
+    public void fail() {
+        ccb.getWriteInterface().getFullBufferAcceptor().error(AbstractChannelWriteInterface.REMOTE_ERROR_CODE);
+    }
+
     private class ReadFullBufferAcceptor implements ICloseableBufferAcceptor {
         @Override
         public void accept(ByteBuffer buffer) {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java
index 83677f8..2a40eb8 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java
@@ -99,6 +99,11 @@
 
     }
 
+    @Override
+    public void fail() {
+        // do nothing (covered by job lifecycle)
+    }
+
     private class FrameWriter implements IFrameWriter {
         @Override
         public void open() throws HyracksDataException {
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java
index 741ca8c..ee19de3 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java
@@ -75,14 +75,14 @@
 
     @GuardedBy("ChannelControlBlock")
     private boolean computeWritability() {
-        boolean writableDataPresent = currentWriteBuffer != null || !wiFullQueue.isEmpty();
+        if (!ecodeSent && ecode.get() == REMOTE_ERROR_CODE) {
+            return true;
+        }
+        boolean writableDataPresent = !ecodeSent && (currentWriteBuffer != null || !wiFullQueue.isEmpty());
         if (writableDataPresent) {
             return credits > 0;
         }
-        if (isPendingCloseWrite()) {
-            return true;
-        }
-        return ecode.get() == REMOTE_ERROR_CODE && !ecodeSent;
+        return isPendingCloseWrite();
     }
 
     @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelWriteInterface.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelWriteInterface.java
index a7be3a6..e542a34 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelWriteInterface.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelWriteInterface.java
@@ -41,7 +41,15 @@
         if (currentWriteBuffer == null) {
             currentWriteBuffer = wiFullQueue.poll();
         }
-        if (currentWriteBuffer != null) {
+        if (!ecodeSent && ecode.get() == REMOTE_ERROR_CODE) {
+            writerState.getCommand().setChannelId(channelId);
+            writerState.getCommand().setCommandType(MuxDemuxCommand.CommandType.ERROR);
+            writerState.getCommand().setData(ecode.get());
+            writerState.reset(null, 0, null);
+            ecodeSent = true;
+            ccb.reportLocalEOS();
+            adjustChannelWritability();
+        } else if (currentWriteBuffer != null) {
             int size = Math.min(currentWriteBuffer.remaining(), credits);
             if (size > 0) {
                 credits -= size;
@@ -55,14 +63,6 @@
             } else {
                 adjustChannelWritability();
             }
-        } else if (ecode.get() == REMOTE_ERROR_CODE && !ecodeSent) {
-            writerState.getCommand().setChannelId(channelId);
-            writerState.getCommand().setCommandType(MuxDemuxCommand.CommandType.ERROR);
-            writerState.getCommand().setData(REMOTE_ERROR_CODE);
-            writerState.reset(null, 0, null);
-            ecodeSent = true;
-            ccb.reportLocalEOS();
-            adjustChannelWritability();
         } else if (isPendingCloseWrite()) {
             writerState.getCommand().setChannelId(channelId);
             writerState.getCommand().setCommandType(MuxDemuxCommand.CommandType.CLOSE_CHANNEL);