[NO ISSUE][NET] Propagate Network Failure Error Codes
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Pass the network error code to the channel
to determine whether the error should be
sent to the receiver or not.
Change-Id: I29cffa916df9fecc9942f449ba6c5ca7c84b43b3
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2912
Reviewed-by: Michael Blow <mblow@apache.org>
Tested-by: Michael Blow <mblow@apache.org>
diff --git a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java
index 8bee56e..334fb5c 100644
--- a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java
+++ b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java
@@ -104,8 +104,8 @@
ccb.getWriteInterface().getFullBufferAcceptor().close();
}
- public void abort() {
- ccb.getWriteInterface().getFullBufferAcceptor().error(AbstractChannelWriteInterface.REMOTE_ERROR_CODE);
+ public void abort(int ecode) {
+ ccb.getWriteInterface().getFullBufferAcceptor().error(ecode);
synchronized (NetworkOutputChannel.this) {
aborted = true;
NetworkOutputChannel.this.notifyAll();
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java
index 98693d0..8b02f9c 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java
@@ -139,7 +139,7 @@
@Override
public void error(int ecode) {
if (noc != null) {
- noc.abort();
+ noc.abort(ecode);
}
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/ResultNetworkManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/ResultNetworkManager.java
index 6927a58..ee821d6 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/ResultNetworkManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/ResultNetworkManager.java
@@ -33,6 +33,7 @@
import org.apache.hyracks.api.result.ResultSetId;
import org.apache.hyracks.comm.channels.IChannelConnectionFactory;
import org.apache.hyracks.comm.channels.NetworkOutputChannel;
+import org.apache.hyracks.net.protocols.muxdemux.AbstractChannelWriteInterface;
import org.apache.hyracks.net.protocols.muxdemux.ChannelControlBlock;
import org.apache.hyracks.net.protocols.muxdemux.IChannelOpenListener;
import org.apache.hyracks.net.protocols.muxdemux.MultiplexedConnection;
@@ -146,7 +147,7 @@
partitionManager.initializeResultPartitionReader(jobId, rsId, partition, noc);
} catch (HyracksException e) {
LOGGER.warn("Failed to initialize result partition reader", e);
- noc.abort();
+ noc.abort(AbstractChannelWriteInterface.REMOTE_ERROR_CODE);
}
}
@@ -158,7 +159,7 @@
@Override
public void error(int ecode) {
if (noc != null) {
- noc.abort();
+ noc.abort(ecode);
}
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionManager.java
index d023ce9..9ac7168 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionManager.java
@@ -41,6 +41,7 @@
import org.apache.hyracks.control.nc.NodeControllerService;
import org.apache.hyracks.control.nc.io.WorkspaceFileFactory;
import org.apache.hyracks.control.nc.resources.DefaultDeallocatableRegistry;
+import org.apache.hyracks.net.protocols.muxdemux.AbstractChannelWriteInterface;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
@@ -105,7 +106,7 @@
public synchronized void registerPartitionRequest(PartitionId partitionId, NetworkOutputChannel writer) {
if (failedJobsCache.getIfPresent(partitionId.getJobId()) != null) {
- writer.abort();
+ writer.abort(AbstractChannelWriteInterface.REMOTE_ERROR_CODE);
}
List<IPartition> pList = availablePartitionMap.get(partitionId);
if (pList != null && !pList.isEmpty()) {
@@ -137,7 +138,8 @@
if (!jobPartitions.isEmpty() || !pendingRequests.isEmpty()) {
ncs.getExecutor().execute(() -> {
jobPartitions.forEach(IDeallocatable::deallocate);
- pendingRequests.forEach(NetworkOutputChannel::abort);
+ pendingRequests.forEach(networkOutputChannel -> networkOutputChannel
+ .abort(AbstractChannelWriteInterface.REMOTE_ERROR_CODE));
});
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionReader.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionReader.java
index 39cdc1e..3774530 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionReader.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionReader.java
@@ -25,6 +25,7 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.partitions.ResultSetPartitionId;
import org.apache.hyracks.comm.channels.NetworkOutputChannel;
+import org.apache.hyracks.net.protocols.muxdemux.AbstractChannelWriteInterface;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -82,7 +83,7 @@
}
} catch (Exception e) {
LOGGER.error(() -> "failed to send result partition " + resultState.getResultSetPartitionId(), e);
- channel.abort();
+ channel.abort(AbstractChannelWriteInterface.REMOTE_ERROR_CODE);
} finally {
close();
}