[NO ISSUE][NET] Allow Data Receivers To Report Errors
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
- When an error is encountered while reading a result,
report an error to the node sending the data to allow
it to abort the operation.
- Allow FullFrameChannelWriteInterface to report errors
even when some data is still pending to be sent.
- Add test case to ensure result senders are terminated.
Change-Id: Ie7fba6760edb498b88112a7a68b1d0b9f08022b5
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/5323
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ResultStreamingFailureTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ResultStreamingFailureTest.java
new file mode 100644
index 0000000..3a4823f
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ResultStreamingFailureTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.runtime;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil;
+import org.apache.http.NameValuePair;
+import org.apache.http.client.entity.UrlEncodedFormEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.message.BasicNameValuePair;
+import org.apache.hyracks.control.nc.result.ResultPartitionReader;
+import org.apache.hyracks.util.Span;
+import org.apache.hyracks.util.ThreadDumpUtil;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ResultStreamingFailureTest {
+
+ private static final AsterixHyracksIntegrationUtil integrationUtil = new AsterixHyracksIntegrationUtil();
+
+ @Before
+ public void setUp() throws Exception {
+ integrationUtil.init(true, AsterixHyracksIntegrationUtil.DEFAULT_CONF_FILE);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ integrationUtil.deinit(true);
+ }
+
+ @Test
+ public void resultStreamingFailureTest() throws Exception {
+ queryAndDropConnection();
+ // allow result sender to terminate and ensure no leaks
+ Span timeout = Span.start(5, TimeUnit.SECONDS);
+ while (!timeout.elapsed()) {
+ String threadDump = ThreadDumpUtil.takeDumpString();
+ if (!threadDump.contains(ResultPartitionReader.class.getName())) {
+ return;
+ }
+ TimeUnit.SECONDS.sleep(1);
+ }
+ throw new AssertionError("found leaking senders in:\n" + ThreadDumpUtil.takeDumpString());
+ }
+
+ private void queryAndDropConnection() throws IOException {
+ try (CloseableHttpClient httpClient = HttpClients.createDefault();) {
+ final List<NameValuePair> params = new ArrayList<>();
+ params.add(new BasicNameValuePair("statement", "select * from range(1, 10000000) r;"));
+ HttpPost request = new HttpPost("http://localhost:19004/query/service");
+ request.setEntity(new UrlEncodedFormEntity(params, StandardCharsets.UTF_8));
+ CloseableHttpResponse response = httpClient.execute(request);
+ Assert.assertEquals(200, response.getStatusLine().getStatusCode());
+ // close connection without streaming the result
+ }
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/channels/IInputChannel.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/channels/IInputChannel.java
index 4deba7b..7baf268 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/channels/IInputChannel.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/channels/IInputChannel.java
@@ -37,4 +37,9 @@
public void open(IHyracksCommonContext ctx) throws HyracksDataException;
public void close() throws HyracksDataException;
+
+ /**
+ * Called when a failure is encountered while reading data
+ */
+ void fail();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultSetReader.java b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultSetReader.java
index c8f5bd9..3c5a3d1 100644
--- a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultSetReader.java
+++ b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultSetReader.java
@@ -90,36 +90,43 @@
public int read(IFrame frame) throws HyracksDataException {
frame.reset();
int readSize = 0;
- if (isFirstRead() && !hasNextRecord()) {
- return readSize;
- }
- // read until frame is full or all result records have been read
- while (readSize < frame.getFrameSize()) {
- if (currentRecordMonitor.hasMoreFrames()) {
- final ByteBuffer readBuffer = currentRecordChannel.getNextBuffer();
- if (readBuffer == null) {
- throw new IllegalStateException("Unexpected empty frame");
- }
- currentRecordMonitor.notifyFrameRead();
- if (readSize == 0) {
- final int nBlocks = FrameHelper.deserializeNumOfMinFrame(readBuffer);
- frame.ensureFrameSize(frame.getMinSize() * nBlocks);
- frame.getBuffer().clear();
- }
- frame.getBuffer().put(readBuffer);
- currentRecordChannel.recycleBuffer(readBuffer);
- readSize = frame.getBuffer().position();
- } else {
- currentRecordChannel.close();
- if (currentRecordMonitor.failed()) {
- throw HyracksDataException.create(ErrorCode.FAILED_TO_READ_RESULT, jobId);
- }
- if (isLastRecord() || !hasNextRecord()) {
- break;
+ try {
+ if (isFirstRead() && !hasNextRecord()) {
+ return readSize;
+ }
+ // read until frame is full or all result records have been read
+ while (readSize < frame.getFrameSize()) {
+ if (currentRecordMonitor.hasMoreFrames()) {
+ final ByteBuffer readBuffer = currentRecordChannel.getNextBuffer();
+ if (readBuffer == null) {
+ throw new IllegalStateException("Unexpected empty frame");
+ }
+ currentRecordMonitor.notifyFrameRead();
+ if (readSize == 0) {
+ final int nBlocks = FrameHelper.deserializeNumOfMinFrame(readBuffer);
+ frame.ensureFrameSize(frame.getMinSize() * nBlocks);
+ frame.getBuffer().clear();
+ }
+ frame.getBuffer().put(readBuffer);
+ currentRecordChannel.recycleBuffer(readBuffer);
+ readSize = frame.getBuffer().position();
+ } else {
+ currentRecordChannel.close();
+ if (currentRecordMonitor.failed()) {
+ throw HyracksDataException.create(ErrorCode.FAILED_TO_READ_RESULT, jobId);
+ }
+ if (isLastRecord() || !hasNextRecord()) {
+ break;
+ }
}
}
+ frame.getBuffer().flip();
+ } catch (Exception e) {
+ if (isLocalFailure()) {
+ currentRecordChannel.fail();
+ }
+ throw e;
}
- frame.getBuffer().flip();
return readSize;
}
@@ -201,6 +208,10 @@
return knownRecords != null && currentRecord == knownRecords.length - 1;
}
+ private boolean isLocalFailure() {
+ return currentRecordMonitor != null && !currentRecordMonitor.failed();
+ }
+
private static class ResultInputChannelMonitor implements IInputChannelMonitor {
private int availableFrames;
diff --git a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkInputChannel.java b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkInputChannel.java
index 58664c6..53bb7cd 100644
--- a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkInputChannel.java
+++ b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkInputChannel.java
@@ -119,6 +119,11 @@
}
+ @Override
+ public void fail() {
+ // do nothing (covered by job lifecycle)
+ }
+
private class ReadFullBufferAcceptor implements ICloseableBufferAcceptor {
@Override
public void accept(ByteBuffer buffer) {
diff --git a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/ResultNetworkInputChannel.java b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/ResultNetworkInputChannel.java
index 1df39e9..38cf7c4 100644
--- a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/ResultNetworkInputChannel.java
+++ b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/ResultNetworkInputChannel.java
@@ -32,6 +32,7 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.result.ResultSetId;
+import org.apache.hyracks.net.protocols.muxdemux.AbstractChannelWriteInterface;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -127,6 +128,11 @@
}
+ @Override
+ public void fail() {
+ ccb.getWriteInterface().getFullBufferAcceptor().error(AbstractChannelWriteInterface.REMOTE_ERROR_CODE);
+ }
+
private class ReadFullBufferAcceptor implements ICloseableBufferAcceptor {
@Override
public void accept(ByteBuffer buffer) {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java
index 83677f8..2a40eb8 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java
@@ -99,6 +99,11 @@
}
+ @Override
+ public void fail() {
+ // do nothing (covered by job lifecycle)
+ }
+
private class FrameWriter implements IFrameWriter {
@Override
public void open() throws HyracksDataException {
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java
index 741ca8c..ee19de3 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java
@@ -75,14 +75,14 @@
@GuardedBy("ChannelControlBlock")
private boolean computeWritability() {
- boolean writableDataPresent = currentWriteBuffer != null || !wiFullQueue.isEmpty();
+ if (!ecodeSent && ecode.get() == REMOTE_ERROR_CODE) {
+ return true;
+ }
+ boolean writableDataPresent = !ecodeSent && (currentWriteBuffer != null || !wiFullQueue.isEmpty());
if (writableDataPresent) {
return credits > 0;
}
- if (isPendingCloseWrite()) {
- return true;
- }
- return ecode.get() == REMOTE_ERROR_CODE && !ecodeSent;
+ return isPendingCloseWrite();
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelWriteInterface.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelWriteInterface.java
index a7be3a6..e542a34 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelWriteInterface.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelWriteInterface.java
@@ -41,7 +41,15 @@
if (currentWriteBuffer == null) {
currentWriteBuffer = wiFullQueue.poll();
}
- if (currentWriteBuffer != null) {
+ if (!ecodeSent && ecode.get() == REMOTE_ERROR_CODE) {
+ writerState.getCommand().setChannelId(channelId);
+ writerState.getCommand().setCommandType(MuxDemuxCommand.CommandType.ERROR);
+ writerState.getCommand().setData(ecode.get());
+ writerState.reset(null, 0, null);
+ ecodeSent = true;
+ ccb.reportLocalEOS();
+ adjustChannelWritability();
+ } else if (currentWriteBuffer != null) {
int size = Math.min(currentWriteBuffer.remaining(), credits);
if (size > 0) {
credits -= size;
@@ -55,14 +63,6 @@
} else {
adjustChannelWritability();
}
- } else if (ecode.get() == REMOTE_ERROR_CODE && !ecodeSent) {
- writerState.getCommand().setChannelId(channelId);
- writerState.getCommand().setCommandType(MuxDemuxCommand.CommandType.ERROR);
- writerState.getCommand().setData(REMOTE_ERROR_CODE);
- writerState.reset(null, 0, null);
- ecodeSent = true;
- ccb.reportLocalEOS();
- adjustChannelWritability();
} else if (isPendingCloseWrite()) {
writerState.getCommand().setChannelId(channelId);
writerState.getCommand().setCommandType(MuxDemuxCommand.CommandType.CLOSE_CHANNEL);