Merge branch 'mad-hatter'
Change-Id: Ief0a2fe1f0cc29411eb52b52506a5bc1a2a3fd99
diff --git a/asterixdb/NOTICE b/asterixdb/NOTICE
index 7615782..b4729a8 100644
--- a/asterixdb/NOTICE
+++ b/asterixdb/NOTICE
@@ -1,5 +1,5 @@
Apache AsterixDB
-Copyright 2015-2019 The Apache Software Foundation
+Copyright 2015-2020 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java
index e294510..9876eed 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java
@@ -32,6 +32,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
import org.apache.asterix.common.cluster.IClusterStateManager;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
@@ -75,10 +76,13 @@
response.setStatus(HttpResponseStatus.SERVICE_UNAVAILABLE);
} catch (IllegalArgumentException e) { // NOSONAR - exception not logged or rethrown
response.setStatus(HttpResponseStatus.NOT_FOUND);
+ } catch (RejectedExecutionException e) {
+ // we must be shutting down, return 503
+ LOGGER.info("RejectedExecutionException while servicing request; returning 503", e);
+ sendError(response, HttpResponseStatus.SERVICE_UNAVAILABLE, null);
} catch (Exception e) {
- LOGGER.log(Level.INFO, "exception thrown for " + request, e);
- response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
- responseWriter.write(e.toString());
+ LOGGER.warn("exception while servicing request; returning 500", e);
+ sendError(response, HttpResponseStatus.INTERNAL_SERVER_ERROR, e.toString());
}
responseWriter.flush();
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationDestination.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationDestination.java
index 2fe9de8..f835c43 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationDestination.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationDestination.java
@@ -52,9 +52,9 @@
Set<IPartitionReplica> getReplicas();
/**
- * Gets the location of this {@link IReplicationDestination}
+ * Gets the (resolved) location of this {@link IReplicationDestination}
*
- * @return the location
+ * @return the (resolved) location
*/
InetSocketAddress getLocation();
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ReplicaIdentifier.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ReplicaIdentifier.java
index f68ad09..c4bb74c 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ReplicaIdentifier.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ReplicaIdentifier.java
@@ -20,11 +20,13 @@
import java.net.InetSocketAddress;
+import org.apache.hyracks.util.NetworkUtil;
+
public class ReplicaIdentifier {
private final int partition;
- private final InetSocketAddress location;
private final String id;
+ private volatile InetSocketAddress location;
private ReplicaIdentifier(int partition, InetSocketAddress location) {
this.partition = partition;
@@ -44,6 +46,12 @@
return location;
}
+ public InetSocketAddress refreshLocation() {
+ //noinspection NonAtomicOperationOnVolatileField
+ location = NetworkUtil.refresh(location);
+ return location;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java
index e81c25a..f2d2496 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java
@@ -97,7 +97,7 @@
public synchronized ISocketChannel getChannel() {
try {
if (!NetworkingUtil.isHealthy(sc)) {
- sc = ReplicationProtocol.establishReplicaConnection(appCtx, id.getLocation());
+ establishReplicaConnection();
}
return sc;
} catch (IOException e) {
@@ -105,6 +105,15 @@
}
}
+ private void establishReplicaConnection() throws IOException {
+ try {
+ sc = ReplicationProtocol.establishReplicaConnection(appCtx, id.getLocation());
+ } catch (Exception e) {
+ // try to re-resolve the address, in case our replica has had his IP address updated
+ sc = ReplicationProtocol.establishReplicaConnection(appCtx, id.refreshLocation());
+ }
+ }
+
public synchronized void close() {
try {
if (sc != null) {
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java
index ef1bc28..782a801 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java
@@ -32,6 +32,7 @@
import org.apache.asterix.replication.management.NetworkingUtil;
import org.apache.asterix.replication.messaging.ReplicationProtocol;
import org.apache.hyracks.api.network.ISocketChannel;
+import org.apache.hyracks.util.NetworkUtil;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -39,15 +40,20 @@
private static final Logger LOGGER = LogManager.getLogger();
private final Set<IPartitionReplica> replicas = new HashSet<>();
- private final InetSocketAddress location;
+ private final InetSocketAddress inputLocation;
+ private InetSocketAddress resolvedLocation;
private ISocketChannel logRepChannel;
private ReplicationDestination(InetSocketAddress location) {
- this.location = location;
+ this.inputLocation = location;
+ this.resolvedLocation = NetworkUtil.ensureResolved(location);
}
public static ReplicationDestination at(InetSocketAddress location) {
- return new ReplicationDestination(location);
+ if (!location.isUnresolved()) {
+ throw new IllegalArgumentException("only unresolved addresses are allowed!");
+ }
+ return new ReplicationDestination(new InetSocketAddress(location.getHostString(), location.getPort()));
}
@Override
@@ -79,7 +85,7 @@
public synchronized ISocketChannel getLogReplicationChannel(INcApplicationContext appCtx) {
try {
if (!NetworkingUtil.isHealthy(logRepChannel)) {
- logRepChannel = ReplicationProtocol.establishReplicaConnection(appCtx, location);
+ establishReplicaConnection(appCtx);
}
return logRepChannel;
} catch (IOException e) {
@@ -87,6 +93,16 @@
}
}
+ protected void establishReplicaConnection(INcApplicationContext appCtx) throws IOException {
+ try {
+ logRepChannel = ReplicationProtocol.establishReplicaConnection(appCtx, resolvedLocation);
+ } catch (Exception e) {
+ // try to re-resolve the address, in case our replica has had his IP address updated
+ resolvedLocation = NetworkUtil.refresh(resolvedLocation);
+ logRepChannel = ReplicationProtocol.establishReplicaConnection(appCtx, resolvedLocation);
+ }
+ }
+
private synchronized void closeLogReplicationChannel() {
try {
if (logRepChannel != null && logRepChannel.getSocketChannel().isOpen()) {
@@ -101,7 +117,7 @@
@Override
public InetSocketAddress getLocation() {
- return location;
+ return resolvedLocation;
}
@Override
@@ -113,16 +129,16 @@
return false;
}
ReplicationDestination that = (ReplicationDestination) o;
- return Objects.equals(location, that.location);
+ return Objects.equals(inputLocation, that.inputLocation);
}
@Override
public String toString() {
- return location.toString();
+ return resolvedLocation.toString();
}
@Override
public int hashCode() {
- return Objects.hash(location);
+ return Objects.hash(inputLocation);
}
}
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
index bd99ec4..7ed674e 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
@@ -34,6 +34,7 @@
import org.apache.asterix.common.transactions.ILogRecord;
import org.apache.asterix.replication.api.ReplicationDestination;
import org.apache.hyracks.api.replication.IReplicationJob;
+import org.apache.hyracks.util.NetworkUtil;
import org.apache.hyracks.util.annotations.ThreadSafe;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -60,7 +61,7 @@
@Override
public void register(IPartitionReplica replica) {
synchronized (dests) {
- final InetSocketAddress location = replica.getIdentifier().getLocation();
+ final InetSocketAddress location = NetworkUtil.ensureUnresolved(replica.getIdentifier().getLocation());
final ReplicationDestination replicationDest = dests.computeIfAbsent(location, ReplicationDestination::at);
replicationDest.add(replica);
logReplicationManager.register(replicationDest);
@@ -71,7 +72,7 @@
@Override
public void unregister(IPartitionReplica replica) {
synchronized (dests) {
- final InetSocketAddress location = replica.getIdentifier().getLocation();
+ final InetSocketAddress location = NetworkUtil.ensureUnresolved(replica.getIdentifier().getLocation());
final ReplicationDestination dest = dests.get(location);
if (dest == null) {
LOGGER.warn(() -> "Asked to unregister unknown replica " + replica);
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
index 123709b..261236c 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
@@ -44,8 +44,7 @@
}
public void sync() throws IOException {
- final Object syncLock = appCtx.getReplicaManager().getReplicaSyncLock();
- synchronized (syncLock) {
+ synchronized (appCtx.getReplicaManager().getReplicaSyncLock()) {
final ICheckpointManager checkpointManager = appCtx.getTransactionSubsystem().getCheckpointManager();
try {
// suspend checkpointing datasets to prevent async IO operations while sync'ing replicas
diff --git a/hyracks-fullstack/NOTICE b/hyracks-fullstack/NOTICE
index 77f31ad..95fe98a 100644
--- a/hyracks-fullstack/NOTICE
+++ b/hyracks-fullstack/NOTICE
@@ -1,5 +1,5 @@
Apache Hyracks and Algebricks
-Copyright 2015-2019 The Apache Software Foundation
+Copyright 2015-2020 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/pom.xml b/hyracks-fullstack/hyracks/hyracks-hdfs/pom.xml
index 7e00d82..9528eaa 100644
--- a/hyracks-fullstack/hyracks/hyracks-hdfs/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/pom.xml
@@ -205,7 +205,7 @@
<dependency>
<groupId>xml-apis</groupId>
<artifactId>xml-apis</artifactId>
- <version>2.0.2</version>
+ <version>1.0.b2</version>
<scope>test</scope>
</dependency>
</dependencies>
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/ReconnectingIPCHandle.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/ReconnectingIPCHandle.java
index db0ed6b..a3578ad 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/ReconnectingIPCHandle.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/ReconnectingIPCHandle.java
@@ -23,6 +23,7 @@
import org.apache.hyracks.ipc.api.IIPCEventListener;
import org.apache.hyracks.ipc.api.IIPCHandle;
import org.apache.hyracks.ipc.exceptions.IPCException;
+import org.apache.hyracks.util.NetworkUtil;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -78,7 +79,7 @@
}
LOGGER.warn("ipcHandle {} disconnected; will attempt to reconnect {} times", delegate, reconnectAttempts);
listener.ipcHandleDisconnected(delegate);
- delegate = ipc.getHandle(getRemoteAddress(), reconnectAttempts);
+ delegate = ipc.getHandle(NetworkUtil.refresh(getRemoteAddress()), reconnectAttempts);
LOGGER.warn("ipcHandle {} restored", delegate);
listener.ipcHandleRestored(delegate);
return delegate;
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/NetworkUtil.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/NetworkUtil.java
index 763319f..3f6e90c 100644
--- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/NetworkUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/NetworkUtil.java
@@ -28,6 +28,7 @@
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.List;
+import java.util.Objects;
import javax.net.ssl.SSLEngine;
@@ -130,4 +131,22 @@
enlargedBuffer.put(src);
return enlargedBuffer;
}
+
+ public static InetSocketAddress ensureUnresolved(InetSocketAddress address) {
+ return address.isUnresolved() ? address
+ : InetSocketAddress.createUnresolved(address.getHostString(), address.getPort());
+ }
+
+ public static InetSocketAddress ensureResolved(InetSocketAddress address) {
+ return address.isUnresolved() ? new InetSocketAddress(address.getHostString(), address.getPort()) : address;
+ }
+
+ public static InetSocketAddress refresh(InetSocketAddress original) {
+ InetSocketAddress refreshed = new InetSocketAddress(original.getHostString(), original.getPort());
+ if (!Objects.equals(original.getAddress(), refreshed.getAddress())) {
+ LOGGER.warn("ip address updated on refresh (was: {}, now: {})", original.getAddress(),
+ refreshed.getAddress());
+ }
+ return refreshed;
+ }
}