Merge branch 'gerrit/neo' into 'gerrit/trinity'
Change-Id: I79302832b29c734100a3a90f28af07764f57a1ac
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/RemoveUnusedOneToOneEquiJoinRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/RemoveUnusedOneToOneEquiJoinRule.java
index 61361f6..265791e 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/RemoveUnusedOneToOneEquiJoinRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/RemoveUnusedOneToOneEquiJoinRule.java
@@ -24,6 +24,7 @@
import java.util.List;
import java.util.Set;
+import org.apache.asterix.common.config.DatasetConfig;
import org.apache.asterix.metadata.declared.DataSource;
import org.apache.asterix.metadata.declared.DatasetDataSource;
import org.apache.asterix.metadata.entities.Dataset;
@@ -165,10 +166,14 @@
// Check that all datascans scan the same dataset, and that the join condition
// only used primary key variables of those datascans.
for (int i = 0; i < dataScans.size(); i++) {
+ DatasetDataSource currentDataSource = (DatasetDataSource) dataScans.get(i).getDataSource();
+ if (currentDataSource.getDataset().getDatasetType() == DatasetConfig.DatasetType.EXTERNAL) {
+ // The PK condition is not satisfied when external datasets are involved (no PKs)
+ return -1;
+ }
if (i > 0) {
- DatasetDataSource prevAqlDataSource = (DatasetDataSource) dataScans.get(i - 1).getDataSource();
- DatasetDataSource currAqlDataSource = (DatasetDataSource) dataScans.get(i).getDataSource();
- if (!prevAqlDataSource.getDataset().equals(currAqlDataSource.getDataset())) {
+ DatasetDataSource previousDataSource = (DatasetDataSource) dataScans.get(i - 1).getDataSource();
+ if (!previousDataSource.getDataset().equals(currentDataSource.getDataset())) {
return -1;
}
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceRequestParameters.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceRequestParameters.java
index b6913e4..20d79d5 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceRequestParameters.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceRequestParameters.java
@@ -19,6 +19,8 @@
package org.apache.asterix.api.http.server;
+import static org.apache.asterix.utils.RedactionUtil.REDACTED_SENSITIVE_ENTRY_VALUE;
+
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
@@ -43,6 +45,7 @@
import org.apache.hyracks.http.api.IServletRequest;
import org.apache.hyracks.http.server.utils.HttpUtil;
import org.apache.hyracks.util.JSONUtil;
+import org.apache.hyracks.util.LogRedactionUtil;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.JsonProcessingException;
@@ -80,7 +83,8 @@
SIGNATURE("signature"),
MULTI_STATEMENT("multi-statement"),
MAX_WARNINGS("max-warnings"),
- SQL_COMPAT("sql-compat");
+ SQL_COMPAT("sql-compat"),
+ SOURCE("source");
private final String str;
@@ -124,6 +128,7 @@
private String statement;
private String clientContextID;
private String dataverse;
+ private String source;
private ClientType clientType = ClientType.ASTERIX;
private OutputFormat format = OutputFormat.CLEAN_JSON;
private ResultDelivery mode = ResultDelivery.IMMEDIATE;
@@ -172,6 +177,14 @@
this.statement = statement;
}
+ public String getSource() {
+ return source;
+ }
+
+ public void setSource(String source) {
+ this.source = source;
+ }
+
public OutputFormat getFormat() {
return format;
}
@@ -380,7 +393,8 @@
ObjectNode object = OBJECT_MAPPER.createObjectNode();
object.put("host", host);
object.put("path", path);
- object.put("statement", statement != null ? JSONUtil.escape(new StringBuilder(), statement).toString() : null);
+ object.put("statement", statement != null
+ ? LogRedactionUtil.statement(JSONUtil.escape(new StringBuilder(), statement).toString()) : null);
object.put("pretty", pretty);
object.put("mode", mode.getName());
object.put("clientContextID", clientContextID);
@@ -402,9 +416,10 @@
object.put("readOnly", readOnly);
object.put("maxWarnings", maxWarnings);
object.put("sqlCompat", sqlCompatMode);
+ object.put("source", source);
if (statementParams != null) {
for (Map.Entry<String, JsonNode> statementParam : statementParams.entrySet()) {
- object.set('$' + statementParam.getKey(), statementParam.getValue());
+ object.set('$' + statementParam.getKey(), REDACTED_SENSITIVE_ENTRY_VALUE);
}
}
return object;
@@ -486,6 +501,7 @@
setSignature(parseBoolean(req, Parameter.SIGNATURE.str(), valGetter, isSignature()));
setClientType(parseIfExists(req, Parameter.CLIENT_TYPE.str(), valGetter, getClientType(), clientTypes::get));
setSQLCompatMode(parseBoolean(req, Parameter.SQL_COMPAT.str(), valGetter, isSQLCompatMode()));
+ setSource(valGetter.apply(req, Parameter.SOURCE.str()));
}
protected void setExtraParams(JsonNode jsonRequest) throws HyracksDataException {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
index c57cc3b..f84e45a 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
@@ -90,7 +90,6 @@
import org.apache.hyracks.http.api.IServletRequest;
import org.apache.hyracks.http.api.IServletResponse;
import org.apache.hyracks.http.server.utils.HttpUtil;
-import org.apache.hyracks.util.LogRedactionUtil;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -278,7 +277,10 @@
if (forceReadOnly) {
param.setReadOnly(true);
}
- LOGGER.info(() -> "handleRequest: " + LogRedactionUtil.statement(param.toString()));
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info("handleRequest: uuid={}, clientContextID={}, {}", requestRef.getUuid(),
+ param.getClientContextID(), param.toString());
+ }
delivery = param.getMode();
setSessionConfig(sessionOutput, param, delivery);
final ResultProperties resultProperties = new ResultProperties(delivery, param.getMaxResultReads());
@@ -430,14 +432,12 @@
executionState.setStatus(ResultStatus.FATAL, HttpResponseStatus.BAD_REQUEST);
return true;
case REQUEST_TIMEOUT:
- LOGGER.info(() -> "handleException: request execution timed out: "
- + LogRedactionUtil.userData(param.toString()));
+ LOGGER.info(() -> "handleException: request execution timed out: " + param.toString());
executionState.setStatus(ResultStatus.TIMEOUT, HttpResponseStatus.OK);
return true;
case REJECT_NODE_UNREGISTERED:
case REJECT_BAD_CLUSTER_STATE:
- LOGGER.warn(() -> "handleException: " + ex.getMessage() + ": "
- + LogRedactionUtil.userData(param.toString()));
+ LOGGER.warn(() -> "handleException: " + ex.getMessage() + ": " + param.toString());
executionState.setStatus(ResultStatus.FATAL, HttpResponseStatus.SERVICE_UNAVAILABLE);
return true;
default:
@@ -457,11 +457,9 @@
QueryServiceRequestParameters param, IServletResponse response) {
if (t instanceof org.apache.asterix.lang.sqlpp.parser.TokenMgrError || t instanceof AlgebricksException) {
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("handleException: {}: {}", t.getMessage(), LogRedactionUtil.statement(param.toString()),
- t);
+ LOGGER.debug("handleException: {}: {}", t.getMessage(), param.toString(), t);
} else {
- LOGGER.info(() -> "handleException: " + t.getMessage() + ": "
- + LogRedactionUtil.statement(param.toString()));
+ LOGGER.info(() -> "handleException: " + t.getMessage() + ": " + param.toString());
}
executionState.setStatus(ResultStatus.FATAL, HttpResponseStatus.BAD_REQUEST);
return;
@@ -473,7 +471,7 @@
return;
}
}
- LOGGER.warn(() -> "handleException: unexpected exception: " + LogRedactionUtil.userData(param.toString()), t);
+ LOGGER.warn(() -> "handleException: unexpected exception: " + param.toString(), t);
executionState.setStatus(ResultStatus.FATAL, HttpResponseStatus.INTERNAL_SERVER_ERROR);
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
index 05bc87b..e314177 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
@@ -243,7 +243,8 @@
@Override
public String toString() {
- return String.format("%s(id=%s, from=%s, uuid=%s): %s", getClass().getSimpleName(), requestMessageId,
- requestNodeId, requestReference.getUuid(), LogRedactionUtil.statement(statementsText));
+ return String.format("%s(id=%s, from=%s, uuid=%s, clientContextID=%s): %s", getClass().getSimpleName(),
+ requestMessageId, requestNodeId, requestReference.getUuid(), clientContextID,
+ LogRedactionUtil.statement(statementsText));
}
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index e602b76..0db8dff 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -4864,9 +4864,9 @@
// ensure request not cancelled before running job
ensureNotCancelled(clientRequest);
final JobId jobId = JobUtils.runJob(hcc, jobSpec, jobFlags, false);
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("createAndRunJob jobId:{}, uuid:{}", jobId,
- requestParameters.getRequestReference().getUuid());
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info("Created job {} for query uuid:{}, clientContextID:{}", jobId,
+ requestParameters.getRequestReference().getUuid(), requestParameters.getClientContextId());
}
clientRequest.setJobId(jobId);
if (jId != null) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RedactionUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RedactionUtil.java
index 48cf511..d8b47b8 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RedactionUtil.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RedactionUtil.java
@@ -26,6 +26,8 @@
import org.apache.hyracks.util.ILogRedactor;
+import com.fasterxml.jackson.databind.node.TextNode;
+
public class RedactionUtil {
private RedactionUtil() {
throw new AssertionError("do not instantiate");
@@ -34,6 +36,7 @@
private static final Pattern STATEMENT_PATTERN =
Pattern.compile("(" + SECRET_ACCESS_KEY_FIELD_NAME + ").*", CASE_INSENSITIVE | DOTALL);
private static final String STATEMENT_REPLACEMENT = "$1...<redacted sensitive data>";
+ public static final TextNode REDACTED_SENSITIVE_ENTRY_VALUE = new TextNode("<redacted sensitive entry>");
public static final ILogRedactor LOG_REDACTOR = new ILogRedactor() {
@Override
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/queries/external-cross-product.sqlpp b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/external-cross-product.sqlpp
new file mode 100644
index 0000000..ec96012
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/external-cross-product.sqlpp
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+DROP DATAVERSE test IF EXISTS;
+CREATE DATAVERSE test;
+USE test;
+
+CREATE TYPE OpenType AS {
+};
+
+CREATE EXTERNAL DATASET Orders(OpenType) USING localfs
+(
+ ("path"="asterix_nc1://data/json/double-150-11.json"),
+ ("format"="json")
+);
+
+SELECT COUNT(*)
+FROM Orders o1, Orders o2, Orders o3;
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/external-cross-product.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/external-cross-product.plan
new file mode 100644
index 0000000..fd95cc1
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/external-cross-product.plan
@@ -0,0 +1,45 @@
+-- DISTRIBUTE_RESULT |UNPARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |UNPARTITIONED|
+ -- STREAM_PROJECT |UNPARTITIONED|
+ -- ASSIGN |UNPARTITIONED|
+ -- AGGREGATE |UNPARTITIONED|
+ -- RANDOM_MERGE_EXCHANGE |PARTITIONED|
+ -- AGGREGATE |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- NESTED_LOOP |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- NESTED_LOOP |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- REPLICATE |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- DATASOURCE_SCAN (test.Orders) |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- REPLICATE |PARTITIONED|
+ -- BROADCAST_EXCHANGE |PARTITIONED|
+ -- REPLICATE |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- DATASOURCE_SCAN (test.Orders) |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- REPLICATE |PARTITIONED|
+ -- BROADCAST_EXCHANGE |PARTITIONED|
+ -- REPLICATE |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- DATASOURCE_SCAN (test.Orders) |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/ExceptionUtils.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/ExceptionUtils.java
index e07cdd4..d616eb5 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/ExceptionUtils.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/ExceptionUtils.java
@@ -100,7 +100,7 @@
* the subsequent failure
* @return the root exception, or null if both parameters are null
*/
- public static Throwable suppress(Throwable first, Throwable second) {
+ public static <T extends Throwable> T suppress(T first, T second) {
if (second instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
index 97b8859..271de53 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
@@ -21,7 +21,9 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -30,7 +32,10 @@
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hyracks.api.util.ExceptionUtils;
import org.apache.hyracks.http.api.IChannelClosedHandler;
import org.apache.hyracks.http.api.IServlet;
import org.apache.hyracks.util.MXHelper;
@@ -80,8 +85,7 @@
private final ServletRegistry servlets;
private final EventLoopGroup bossGroup;
private final EventLoopGroup workerGroup;
- private final InetSocketAddress defaultAddress;
- private final List<InetSocketAddress> addresses;
+ private final Set<InetSocketAddress> addresses;
private final ThreadPoolExecutor executor;
// Mutable members
private volatile int state = STOPPED;
@@ -114,15 +118,14 @@
this(bossGroup, workerGroup, Collections.singletonList(address), config, closeHandler);
}
- public HttpServer(EventLoopGroup bossGroup, EventLoopGroup workerGroup, List<InetSocketAddress> addresses,
+ public HttpServer(EventLoopGroup bossGroup, EventLoopGroup workerGroup, Collection<InetSocketAddress> addresses,
HttpServerConfig config, IChannelClosedHandler closeHandler) {
if (addresses.isEmpty()) {
throw new IllegalArgumentException("no addresses specified");
}
this.bossGroup = bossGroup;
this.workerGroup = workerGroup;
- this.addresses = addresses;
- defaultAddress = addresses.get(0);
+ this.addresses = new LinkedHashSet<>(addresses);
this.closedHandler = closeHandler;
this.config = config;
channels = new ArrayList<>();
@@ -130,9 +133,16 @@
servlets = new ServletRegistry();
workQueue = new LinkedBlockingQueue<>(config.getRequestQueueSize());
int numExecutorThreads = config.getThreadCount();
+ int[] ports = this.addresses.stream().mapToInt(InetSocketAddress::getPort).distinct().toArray();
+ String desc;
+ if (ports.length > 1) {
+ desc = this.addresses.stream().map(a -> a.getAddress().getHostAddress() + ":" + a.getPort())
+ .collect(Collectors.joining(",", "[", "]"));
+ } else {
+ desc = "port:" + ports[0];
+ }
executor = new ThreadPoolExecutor(numExecutorThreads, numExecutorThreads, 0L, TimeUnit.MILLISECONDS, workQueue,
- runnable -> new Thread(runnable,
- "HttpExecutor(port:" + defaultAddress.getPort() + ")-" + threadId.getAndIncrement()));
+ runnable -> new Thread(runnable, "HttpExecutor(" + desc + ")-" + threadId.getAndIncrement()));
long directMemoryBudget = numExecutorThreads * (long) HIGH_WRITE_BUFFER_WATER_MARK
+ numExecutorThreads * config.getMaxResponseChunkSize();
LOGGER.log(Level.DEBUG,
@@ -156,7 +166,6 @@
doStart();
setStarted();
} catch (Throwable e) { // NOSONAR
- LOGGER.error("Failure starting an Http Server at: {}", defaultAddress, e);
setFailed(e);
throw e;
}
@@ -255,14 +264,19 @@
return servlets.getServlets();
}
- protected void doStart() throws InterruptedException, IOException {
+ protected void doStart() throws Exception {
for (IServlet servlet : servlets.getServlets()) {
- servlet.init();
+ try {
+ servlet.init();
+ } catch (IOException e) {
+ LOGGER.error("Failure initializing servlet {} on http server {}", servlet, addresses, e);
+ throw e;
+ }
}
bind();
}
- private void bind() throws InterruptedException {
+ private void bind() throws Exception {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
.childOption(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(RECEIVE_BUFFER_SIZE))
@@ -270,17 +284,28 @@
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, WRITE_BUFFER_WATER_MARK)
.handler(new LoggingHandler(LogLevel.DEBUG)).childHandler(getChannelInitializer());
- List<ChannelFuture> channelFutures = new ArrayList<>();
+ List<Pair<InetSocketAddress, ChannelFuture>> channelFutures = new ArrayList<>();
for (InetSocketAddress address : addresses) {
- channelFutures.add(b.bind(address));
+ channelFutures.add(org.apache.commons.lang3.tuple.Pair.of(address, b.bind(address)));
}
- for (ChannelFuture future : channelFutures) {
- Channel channel = future.sync().channel();
- channel.closeFuture().addListener(channelCloseListener);
- synchronized (lock) {
- channels.add(channel);
+ Exception failure = null;
+ for (Pair<InetSocketAddress, ChannelFuture> addressFuture : channelFutures) {
+ try {
+ Channel channel = addressFuture.getRight().sync().channel();
+ channel.closeFuture().addListener(channelCloseListener);
+ synchronized (lock) {
+ channels.add(channel);
+ }
+ } catch (InterruptedException e) {
+ throw e;
+ } catch (Exception e) {
+ LOGGER.error("Bind failure starting http server at {}", addressFuture.getLeft(), e);
+ failure = ExceptionUtils.suppress(failure, e);
}
}
+ if (failure != null) {
+ throw failure;
+ }
}
private void triggerRecovery() throws InterruptedException {
@@ -395,7 +420,7 @@
@Override
public String toString() {
- return "{\"class\":\"" + getClass().getSimpleName() + "\",\"address\":" + defaultAddress + ",\"state\":\""
+ return "{\"class\":\"" + getClass().getSimpleName() + "\",\"addresses\":" + addresses + ",\"state\":\""
+ getState() + "\"}";
}
@@ -403,8 +428,9 @@
return config;
}
+ @Deprecated // this returns an arbitrary (the first supplied if collection is ordered) address
public InetSocketAddress getAddress() {
- return defaultAddress;
+ return addresses.iterator().next();
}
private void closeChannels() throws InterruptedException {