Merge branch 'gerrit/march-hare'

Change-Id: I477a59907d5d20aa88202c6fb1fa2b8af165e6b9
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java
index e294510..9876eed 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java
@@ -32,6 +32,7 @@
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
 
 import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
@@ -75,10 +76,13 @@
             response.setStatus(HttpResponseStatus.SERVICE_UNAVAILABLE);
         } catch (IllegalArgumentException e) { // NOSONAR - exception not logged or rethrown
             response.setStatus(HttpResponseStatus.NOT_FOUND);
+        } catch (RejectedExecutionException e) {
+            // we must be shutting down, return 503
+            LOGGER.info("RejectedExecutionException while servicing request; returning 503", e);
+            sendError(response, HttpResponseStatus.SERVICE_UNAVAILABLE, null);
         } catch (Exception e) {
-            LOGGER.log(Level.INFO, "exception thrown for " + request, e);
-            response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
-            responseWriter.write(e.toString());
+            LOGGER.warn("exception while servicing request; returning 500", e);
+            sendError(response, HttpResponseStatus.INTERNAL_SERVER_ERROR, e.toString());
         }
         responseWriter.flush();
     }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
index 18191d6..0d86cb9 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
@@ -20,6 +20,7 @@
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -51,8 +52,9 @@
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.hyracks.api.application.ICCServiceContext;
 import org.apache.hyracks.api.client.NodeStatus;
+import org.apache.hyracks.api.control.IGatekeeper;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.logging.log4j.Level;
+import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -61,13 +63,16 @@
     private static final Logger LOGGER = LogManager.getLogger();
     protected IClusterStateManager clusterManager;
     protected volatile String metadataNodeId;
-    protected Set<String> pendingStartupCompletionNodes = new HashSet<>();
+    protected Set<String> pendingStartupCompletionNodes = Collections.synchronizedSet(new HashSet<>());
     protected final ICCMessageBroker messageBroker;
     private final boolean replicationEnabled;
+    private final IGatekeeper gatekeeper;
 
     public NcLifecycleCoordinator(ICCServiceContext serviceCtx, boolean replicationEnabled) {
         this.messageBroker = (ICCMessageBroker) serviceCtx.getMessageBroker();
         this.replicationEnabled = replicationEnabled;
+        this.gatekeeper =
+                ((ClusterControllerService) serviceCtx.getControllerService()).getApplication().getGatekeeper();
     }
 
     @Override
@@ -120,7 +125,14 @@
     }
 
     private void process(NCLifecycleTaskReportMessage msg) throws HyracksDataException {
-        pendingStartupCompletionNodes.remove(msg.getNodeId());
+        if (!pendingStartupCompletionNodes.remove(msg.getNodeId())) {
+            LOGGER.warn("Received unexpected startup completion message from node {}", msg.getNodeId());
+        }
+        if (!gatekeeper.isAuthorized(msg.getNodeId())) {
+            LOGGER.warn("Node {} lost authorization before startup completed; ignoring registration result",
+                    msg.getNodeId());
+            return;
+        }
         if (msg.isSuccess()) {
             clusterManager.updateNodeState(msg.getNodeId(), true, msg.getLocalCounters());
             if (msg.getNodeId().equals(metadataNodeId)) {
@@ -128,9 +140,7 @@
             }
             clusterManager.refreshState();
         } else {
-            if (LOGGER.isErrorEnabled()) {
-                LOGGER.log(Level.ERROR, msg.getNodeId() + " failed to complete startup. ", msg.getException());
-            }
+            LOGGER.error("Node {} failed to complete startup", msg.getNodeId(), msg.getException());
         }
     }
 
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
index b7a8c63..9cc295e 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
@@ -32,6 +32,7 @@
 import org.apache.asterix.common.config.MetadataProperties;
 import org.apache.asterix.common.metadata.IMetadataBootstrap;
 import org.apache.asterix.common.utils.NcLocalCounters;
+import org.apache.asterix.hyracks.bootstrap.CCApplication;
 import org.apache.asterix.runtime.transaction.ResourceIdManager;
 import org.apache.asterix.runtime.utils.BulkTxnIdFactory;
 import org.apache.asterix.runtime.utils.CcApplicationContext;
@@ -211,10 +212,13 @@
         final ClusterControllerService ccs = Mockito.mock(ClusterControllerService.class);
         JobIdFactory jobIdFactory = new JobIdFactory(CcId.valueOf(0));
         Mockito.when(ccs.getJobIdFactory()).thenReturn(jobIdFactory);
+        final CCApplication ccApplication = Mockito.mock(CCApplication.class);
+        Mockito.when(ccs.getApplication()).thenReturn(ccApplication);
         Mockito.when(iccServiceContext.getAppConfig()).thenReturn(applicationConfig);
         Mockito.when(iccServiceContext.getControllerService()).thenReturn(ccs);
 
         Mockito.when(ccApplicationContext.getServiceContext()).thenReturn(iccServiceContext);
+        Mockito.when(ccApplication.getGatekeeper()).thenReturn(nodeId -> true);
 
         NcLifecycleCoordinator coordinator =
                 new NcLifecycleCoordinator(ccApplicationContext.getServiceContext(), false);
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
index 6c39372..a37e6e4 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
@@ -120,7 +120,6 @@
 
     /**
      * Register the specified node partitions with the specified nodeId with this cluster state manager
-     * then calls {@link IClusterStateManager#refreshState()}
      *
      * @param nodeId
      * @param nodePartitions
@@ -130,7 +129,6 @@
 
     /**
      * De-register the specified node's partitions from this cluster state manager
-     * then calls {@link IClusterStateManager#refreshState()}
      *
      * @param nodeId
      * @throws HyracksDataException
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationDestination.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationDestination.java
index 2fe9de8..f835c43 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationDestination.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationDestination.java
@@ -52,9 +52,9 @@
     Set<IPartitionReplica> getReplicas();
 
     /**
-     * Gets the location of this {@link IReplicationDestination}
+     * Gets the (resolved) location of this {@link IReplicationDestination}
      *
-     * @return the location
+     * @return the (resolved) location
      */
     InetSocketAddress getLocation();
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ReplicaIdentifier.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ReplicaIdentifier.java
index f68ad09..c4bb74c 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ReplicaIdentifier.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ReplicaIdentifier.java
@@ -20,11 +20,13 @@
 
 import java.net.InetSocketAddress;
 
+import org.apache.hyracks.util.NetworkUtil;
+
 public class ReplicaIdentifier {
 
     private final int partition;
-    private final InetSocketAddress location;
     private final String id;
+    private volatile InetSocketAddress location;
 
     private ReplicaIdentifier(int partition, InetSocketAddress location) {
         this.partition = partition;
@@ -44,6 +46,12 @@
         return location;
     }
 
+    public InetSocketAddress refreshLocation() {
+        //noinspection NonAtomicOperationOnVolatileField
+        location = NetworkUtil.refresh(location);
+        return location;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java
index e81c25a..f2d2496 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java
@@ -97,7 +97,7 @@
     public synchronized ISocketChannel getChannel() {
         try {
             if (!NetworkingUtil.isHealthy(sc)) {
-                sc = ReplicationProtocol.establishReplicaConnection(appCtx, id.getLocation());
+                establishReplicaConnection();
             }
             return sc;
         } catch (IOException e) {
@@ -105,6 +105,15 @@
         }
     }
 
+    private void establishReplicaConnection() throws IOException {
+        try {
+            sc = ReplicationProtocol.establishReplicaConnection(appCtx, id.getLocation());
+        } catch (Exception e) {
+            // try to re-resolve the address, in case our replica has had his IP address updated
+            sc = ReplicationProtocol.establishReplicaConnection(appCtx, id.refreshLocation());
+        }
+    }
+
     public synchronized void close() {
         try {
             if (sc != null) {
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java
index ef1bc28..782a801 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java
@@ -32,6 +32,7 @@
 import org.apache.asterix.replication.management.NetworkingUtil;
 import org.apache.asterix.replication.messaging.ReplicationProtocol;
 import org.apache.hyracks.api.network.ISocketChannel;
+import org.apache.hyracks.util.NetworkUtil;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -39,15 +40,20 @@
 
     private static final Logger LOGGER = LogManager.getLogger();
     private final Set<IPartitionReplica> replicas = new HashSet<>();
-    private final InetSocketAddress location;
+    private final InetSocketAddress inputLocation;
+    private InetSocketAddress resolvedLocation;
     private ISocketChannel logRepChannel;
 
     private ReplicationDestination(InetSocketAddress location) {
-        this.location = location;
+        this.inputLocation = location;
+        this.resolvedLocation = NetworkUtil.ensureResolved(location);
     }
 
     public static ReplicationDestination at(InetSocketAddress location) {
-        return new ReplicationDestination(location);
+        if (!location.isUnresolved()) {
+            throw new IllegalArgumentException("only unresolved addresses are allowed!");
+        }
+        return new ReplicationDestination(new InetSocketAddress(location.getHostString(), location.getPort()));
     }
 
     @Override
@@ -79,7 +85,7 @@
     public synchronized ISocketChannel getLogReplicationChannel(INcApplicationContext appCtx) {
         try {
             if (!NetworkingUtil.isHealthy(logRepChannel)) {
-                logRepChannel = ReplicationProtocol.establishReplicaConnection(appCtx, location);
+                establishReplicaConnection(appCtx);
             }
             return logRepChannel;
         } catch (IOException e) {
@@ -87,6 +93,16 @@
         }
     }
 
+    protected void establishReplicaConnection(INcApplicationContext appCtx) throws IOException {
+        try {
+            logRepChannel = ReplicationProtocol.establishReplicaConnection(appCtx, resolvedLocation);
+        } catch (Exception e) {
+            // try to re-resolve the address, in case our replica has had his IP address updated
+            resolvedLocation = NetworkUtil.refresh(resolvedLocation);
+            logRepChannel = ReplicationProtocol.establishReplicaConnection(appCtx, resolvedLocation);
+        }
+    }
+
     private synchronized void closeLogReplicationChannel() {
         try {
             if (logRepChannel != null && logRepChannel.getSocketChannel().isOpen()) {
@@ -101,7 +117,7 @@
 
     @Override
     public InetSocketAddress getLocation() {
-        return location;
+        return resolvedLocation;
     }
 
     @Override
@@ -113,16 +129,16 @@
             return false;
         }
         ReplicationDestination that = (ReplicationDestination) o;
-        return Objects.equals(location, that.location);
+        return Objects.equals(inputLocation, that.inputLocation);
     }
 
     @Override
     public String toString() {
-        return location.toString();
+        return resolvedLocation.toString();
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(location);
+        return Objects.hash(inputLocation);
     }
 }
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
index bd99ec4..7ed674e 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
@@ -34,6 +34,7 @@
 import org.apache.asterix.common.transactions.ILogRecord;
 import org.apache.asterix.replication.api.ReplicationDestination;
 import org.apache.hyracks.api.replication.IReplicationJob;
+import org.apache.hyracks.util.NetworkUtil;
 import org.apache.hyracks.util.annotations.ThreadSafe;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -60,7 +61,7 @@
     @Override
     public void register(IPartitionReplica replica) {
         synchronized (dests) {
-            final InetSocketAddress location = replica.getIdentifier().getLocation();
+            final InetSocketAddress location = NetworkUtil.ensureUnresolved(replica.getIdentifier().getLocation());
             final ReplicationDestination replicationDest = dests.computeIfAbsent(location, ReplicationDestination::at);
             replicationDest.add(replica);
             logReplicationManager.register(replicationDest);
@@ -71,7 +72,7 @@
     @Override
     public void unregister(IPartitionReplica replica) {
         synchronized (dests) {
-            final InetSocketAddress location = replica.getIdentifier().getLocation();
+            final InetSocketAddress location = NetworkUtil.ensureUnresolved(replica.getIdentifier().getLocation());
             final ReplicationDestination dest = dests.get(location);
             if (dest == null) {
                 LOGGER.warn(() -> "Asked to unregister unknown replica " + replica);
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
index 123709b..261236c 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
@@ -44,8 +44,7 @@
     }
 
     public void sync() throws IOException {
-        final Object syncLock = appCtx.getReplicaManager().getReplicaSyncLock();
-        synchronized (syncLock) {
+        synchronized (appCtx.getReplicaManager().getReplicaSyncLock()) {
             final ICheckpointManager checkpointManager = appCtx.getTransactionSubsystem().getCheckpointManager();
             try {
                 // suspend checkpointing datasets to prevent async IO operations while sync'ing replicas
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/pom.xml b/hyracks-fullstack/hyracks/hyracks-hdfs/pom.xml
index 7e00d82..9528eaa 100644
--- a/hyracks-fullstack/hyracks/hyracks-hdfs/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/pom.xml
@@ -205,7 +205,7 @@
     <dependency>
       <groupId>xml-apis</groupId>
       <artifactId>xml-apis</artifactId>
-      <version>2.0.2</version>
+      <version>1.0.b2</version>
       <scope>test</scope>
     </dependency>
   </dependencies>
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/ReconnectingIPCHandle.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/ReconnectingIPCHandle.java
index db0ed6b..a3578ad 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/ReconnectingIPCHandle.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/ReconnectingIPCHandle.java
@@ -23,6 +23,7 @@
 import org.apache.hyracks.ipc.api.IIPCEventListener;
 import org.apache.hyracks.ipc.api.IIPCHandle;
 import org.apache.hyracks.ipc.exceptions.IPCException;
+import org.apache.hyracks.util.NetworkUtil;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -78,7 +79,7 @@
             }
             LOGGER.warn("ipcHandle {} disconnected; will attempt to reconnect {} times", delegate, reconnectAttempts);
             listener.ipcHandleDisconnected(delegate);
-            delegate = ipc.getHandle(getRemoteAddress(), reconnectAttempts);
+            delegate = ipc.getHandle(NetworkUtil.refresh(getRemoteAddress()), reconnectAttempts);
             LOGGER.warn("ipcHandle {} restored", delegate);
             listener.ipcHandleRestored(delegate);
             return delegate;
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/NetworkUtil.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/NetworkUtil.java
index 228fd91..1fcd806 100644
--- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/NetworkUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/NetworkUtil.java
@@ -29,6 +29,7 @@
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Objects;
 
 import javax.net.ssl.SSLEngine;
 
@@ -142,4 +143,22 @@
         enlargedBuffer.put(src);
         return enlargedBuffer;
     }
+
+    public static InetSocketAddress ensureUnresolved(InetSocketAddress address) {
+        return address.isUnresolved() ? address
+                : InetSocketAddress.createUnresolved(address.getHostString(), address.getPort());
+    }
+
+    public static InetSocketAddress ensureResolved(InetSocketAddress address) {
+        return address.isUnresolved() ? new InetSocketAddress(address.getHostString(), address.getPort()) : address;
+    }
+
+    public static InetSocketAddress refresh(InetSocketAddress original) {
+        InetSocketAddress refreshed = new InetSocketAddress(original.getHostString(), original.getPort());
+        if (!Objects.equals(original.getAddress(), refreshed.getAddress())) {
+            LOGGER.warn("ip address updated on refresh (was: {}, now: {})", original.getAddress(),
+                    refreshed.getAddress());
+        }
+        return refreshed;
+    }
 }