Extensible exception handling in QueryServiceServlet
Change-Id: If8037a97f3d0b0febb8caf68e099f1fd24e0ac49
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1836
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <hubailmor@gmail.com>
diff --git a/asterixdb/asterix-app/pom.xml b/asterixdb/asterix-app/pom.xml
index 01afdcf..beac085 100644
--- a/asterixdb/asterix-app/pom.xml
+++ b/asterixdb/asterix-app/pom.xml
@@ -433,6 +433,10 @@
<artifactId>hyracks-net</artifactId>
</dependency>
<dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-ipc</artifactId>
+ </dependency>
+ <dependency>
<groupId>javax.xml.bind</groupId>
<artifactId>jaxb-api</artifactId>
<scope>test</scope>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
index ba98f73..dea5259 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
@@ -202,9 +202,7 @@
stopNcTheard.join();
}
- if (cc != null) {
- cc.stop();
- }
+ stopCC(false);
if (deleteOldInstanceData) {
deleteTransactionLogs();
@@ -212,6 +210,13 @@
}
}
+ public void stopCC(boolean terminateNCService) throws Exception {
+ if (cc != null) {
+ cc.stop(terminateNCService);
+ cc = null;
+ }
+ }
+
protected String getDefaultStoragePath() {
return joinPath("target", "io", "dir");
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
index 2b70685..9547514 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
@@ -20,12 +20,16 @@
package org.apache.asterix.api.http.server;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeoutException;
+import java.util.logging.Level;
+import io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.asterix.algebra.base.ILangExtension;
import org.apache.asterix.app.message.ExecuteStatementRequestMessage;
import org.apache.asterix.app.message.ExecuteStatementResponseMessage;
import org.apache.asterix.app.result.ResultReader;
import org.apache.asterix.common.api.IApplicationContext;
+import org.apache.asterix.common.config.GlobalConfig;
import org.apache.asterix.common.messaging.api.INCMessageBroker;
import org.apache.asterix.common.messaging.api.MessageFuture;
import org.apache.asterix.om.types.ARecordType;
@@ -35,6 +39,7 @@
import org.apache.hyracks.api.application.INCServiceContext;
import org.apache.hyracks.api.dataset.ResultSetId;
import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.ipc.exceptions.IPCException;
/**
* Query service servlet that can run on NC nodes.
@@ -91,4 +96,14 @@
sessionOutput.out().append(responseMsg.getResult());
}
}
+
+ @Override
+ protected HttpResponseStatus handleExecuteStatementException(Throwable t) {
+ if (t instanceof IPCException || t instanceof TimeoutException) {
+ GlobalConfig.ASTERIX_LOGGER.log(Level.WARNING, t.toString(), t);
+ return HttpResponseStatus.SERVICE_UNAVAILABLE;
+ } else {
+ return super.handleExecuteStatementException(t);
+ }
+ }
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
index ec90ae9..9ee064e 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
@@ -420,21 +420,10 @@
ResultUtil.printStatus(sessionOutput, ResultStatus.SUCCESS);
}
errorCount = 0;
- } catch (AlgebricksException | TokenMgrError | org.apache.asterix.aqlplus.parser.TokenMgrError pe) {
- GlobalConfig.ASTERIX_LOGGER.log(Level.INFO, pe.getMessage(), pe);
- ResultUtil.printError(resultWriter, pe);
- ResultUtil.printStatus(sessionOutput, ResultStatus.FATAL);
- status = HttpResponseStatus.BAD_REQUEST;
- } catch (HyracksException pe) {
- GlobalConfig.ASTERIX_LOGGER.log(Level.WARNING, pe.getMessage(), pe);
- ResultUtil.printError(resultWriter, pe);
- ResultUtil.printStatus(sessionOutput, ResultStatus.FATAL);
- status = HttpResponseStatus.INTERNAL_SERVER_ERROR;
- } catch (Exception e) {
- GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, "Unexpected exception", e);
+ } catch (Exception | TokenMgrError | org.apache.asterix.aqlplus.parser.TokenMgrError e) {
+ status = handleExecuteStatementException(e);
ResultUtil.printError(resultWriter, e);
ResultUtil.printStatus(sessionOutput, ResultStatus.FATAL);
- status = HttpResponseStatus.INTERNAL_SERVER_ERROR;
} finally {
if (execStartEnd[0] == -1) {
execStartEnd[1] = -1;
@@ -475,4 +464,18 @@
param.clientContextID, queryCtx);
outExecStartEnd[1] = System.nanoTime();
}
+
+ protected HttpResponseStatus handleExecuteStatementException(Throwable t) {
+ if (t instanceof org.apache.asterix.aqlplus.parser.TokenMgrError || t instanceof TokenMgrError
+ || t instanceof AlgebricksException) {
+ GlobalConfig.ASTERIX_LOGGER.log(Level.INFO, t.getMessage(), t);
+ return HttpResponseStatus.BAD_REQUEST;
+ } else if (t instanceof HyracksException) {
+ GlobalConfig.ASTERIX_LOGGER.log(Level.WARNING, t.getMessage(), t);
+ return HttpResponseStatus.INTERNAL_SERVER_ERROR;
+ } else {
+ GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, "Unexpected exception", t);
+ return HttpResponseStatus.INTERNAL_SERVER_ERROR;
+ }
+ }
}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorApiServletTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorApiServletTest.java
index 8b1bbe0..3d6543b 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorApiServletTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorApiServletTest.java
@@ -48,7 +48,11 @@
import org.apache.hyracks.api.io.ManagedFileSplit;
import org.apache.hyracks.http.api.IServletRequest;
import org.apache.hyracks.http.api.IServletResponse;
+import org.junit.After;
+import org.junit.AfterClass;
import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -61,10 +65,20 @@
public class ConnectorApiServletTest {
- @Test
- public void testGet() throws Exception {
+ @BeforeClass
+ public static void setup() throws Exception {
// Starts test asterixdb cluster.
SqlppExecutionTest.setUp();
+ }
+
+ @AfterClass
+ public static void teardown() throws Exception {
+ // Tears down the asterixdb cluster.
+ SqlppExecutionTest.tearDown();
+ }
+
+ @Test
+ public void testGet() throws Exception {
// Configures a test connector api servlet.
ConnectorApiServlet let = new ConnectorApiServlet(new ConcurrentHashMap<>(), new String[] { "/" },
(ICcApplicationContext) ExecutionTestUtil.integrationUtil.cc.getApplicationContext());
@@ -113,9 +127,6 @@
ArrayNode splits = (ArrayNode) actualResponse.get("splits");
String path = (splits.get(0)).get("path").asText();
Assert.assertTrue(path.endsWith("Metadata/Dataset_idx_Dataset"));
-
- // Tears down the asterixdb cluster.
- SqlppExecutionTest.tearDown();
}
@Test
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java
index 1744836..50c2986 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java
@@ -29,6 +29,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.function.Predicate;
import org.apache.asterix.common.utils.Servlets;
import org.apache.asterix.test.runtime.SqlppExecutionWithCancellationTest;
@@ -46,14 +47,15 @@
@Override
public InputStream executeQueryService(String str, TestCaseContext.OutputFormat fmt, URI uri,
- List<TestCase.CompilationUnit.Parameter> params, boolean jsonEncoded, boolean cancellable)
- throws Exception {
+ List<TestCase.CompilationUnit.Parameter> params, boolean jsonEncoded,
+ Predicate<Integer> responseCodeValidator, boolean cancellable) throws Exception {
String clientContextId = UUID.randomUUID().toString();
final List<TestCase.CompilationUnit.Parameter> newParams =
cancellable ? upsertParam(params, "client_context_id", clientContextId) : params;
Callable<InputStream> query = () -> {
try {
- return CancellationTestExecutor.super.executeQueryService(str, fmt, uri, newParams, jsonEncoded, true);
+ return CancellationTestExecutor.super.executeQueryService(str, fmt, uri, newParams, jsonEncoded,
+ responseCodeValidator, true);
} catch (Exception e) {
e.printStackTrace();
throw e;
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
index 35ab1df..ed6a77a 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
@@ -518,17 +518,27 @@
public InputStream executeQueryService(String str, OutputFormat fmt, URI uri,
List<CompilationUnit.Parameter> params, boolean jsonEncoded) throws Exception {
- return executeQueryService(str, fmt, uri, params, jsonEncoded, false);
+ return executeQueryService(str, fmt, uri, params, jsonEncoded, null, false);
+ }
+
+ public InputStream executeQueryService(String str, OutputFormat fmt, URI uri,
+ List<CompilationUnit.Parameter> params, boolean jsonEncoded, Predicate<Integer> responseCodeValidator)
+ throws Exception {
+ return executeQueryService(str, fmt, uri, params, jsonEncoded, responseCodeValidator, false);
}
protected InputStream executeQueryService(String str, OutputFormat fmt, URI uri,
- List<CompilationUnit.Parameter> params, boolean jsonEncoded, boolean cancellable) throws Exception {
+ List<CompilationUnit.Parameter> params, boolean jsonEncoded, Predicate<Integer> responseCodeValidator,
+ boolean cancellable) throws Exception {
final List<CompilationUnit.Parameter> newParams = upsertParam(params, "format", fmt.mimeType());
HttpUriRequest method = jsonEncoded ? constructPostMethodJson(str, uri, "statement", newParams)
: constructPostMethodUrl(str, uri, "statement", newParams);
// Set accepted output response type
method.setHeader("Accept", OutputFormat.CLEAN_JSON.mimeType());
HttpResponse response = executeHttpRequest(method);
+ if (responseCodeValidator != null) {
+ checkResponse(response, responseCodeValidator);
+ }
return response.getEntity().getContent();
}
@@ -637,8 +647,13 @@
}
public InputStream executeJSONGet(OutputFormat fmt, URI uri) throws Exception {
+ return executeJSONGet(fmt, uri, code -> code == HttpStatus.SC_OK);
+ }
+
+ public InputStream executeJSONGet(OutputFormat fmt, URI uri, Predicate<Integer> responseCodeValidator)
+ throws Exception {
HttpUriRequest request = constructGetMethod(uri, fmt, new ArrayList<>());
- HttpResponse response = executeAndCheckHttpRequest(request);
+ HttpResponse response = executeAndCheckHttpRequest(request, responseCodeValidator);
return response.getEntity().getContent();
}
@@ -1101,7 +1116,7 @@
}
final URI uri = getEndpoint(Servlets.QUERY_SERVICE);
if (DELIVERY_IMMEDIATE.equals(delivery)) {
- resultStream = executeQueryService(statement, fmt, uri, params, true, true);
+ resultStream = executeQueryService(statement, fmt, uri, params, true, null, true);
resultStream = ResultExtractor.extract(resultStream);
} else {
String handleVar = getHandleVariable(statement);
@@ -1355,7 +1370,7 @@
return uri;
}
- protected URI getEndpoint(String servlet) throws URISyntaxException {
+ public URI getEndpoint(String servlet) throws URISyntaxException {
return createEndpointURI(getPath(servlet).replaceAll("/\\*$", ""), null);
}
diff --git a/asterixdb/pom.xml b/asterixdb/pom.xml
index e49f9aa..dd86bea 100644
--- a/asterixdb/pom.xml
+++ b/asterixdb/pom.xml
@@ -826,6 +826,11 @@
</dependency>
<dependency>
<groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-ipc</artifactId>
+ <version>${hyracks.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
<artifactId>algebricks-compiler</artifactId>
<version>${algebricks.version}</version>
</dependency>
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
index 00b3cb6..e89ed56 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
@@ -78,7 +78,7 @@
protected void respond(ChannelHandlerContext ctx, HttpVersion httpVersion, HttpResponseStatus status) {
DefaultHttpResponse response = new DefaultHttpResponse(httpVersion, status);
- ctx.write(response).addListener(ChannelFutureListener.CLOSE);
+ ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}
private void submit(ChannelHandlerContext ctx, IServlet servlet, FullHttpRequest request) throws IOException {
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/pom.xml b/hyracks-fullstack/hyracks/hyracks-ipc/pom.xml
index 45c5e04..cc3a513 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/pom.xml
@@ -41,6 +41,10 @@
</properties>
<dependencies>
<dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ </dependency>
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
index 9efd70e..d1659a8 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
@@ -23,6 +23,7 @@
import java.net.ServerSocket;
import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
+import java.nio.channels.Channel;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
@@ -32,13 +33,17 @@
import java.util.ArrayList;
import java.util.BitSet;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
+import org.apache.commons.io.IOUtils;
+
public class IPCConnectionManager {
private static final Logger LOGGER = Logger.getLogger(IPCConnectionManager.class.getName());
@@ -88,9 +93,10 @@
networkThread.start();
}
- void stop() throws IOException {
+ void stop() {
stopped = true;
- serverSocketChannel.close();
+ IOUtils.closeQuietly(serverSocketChannel);
+ networkThread.selector.wakeup();
}
IPCHandle getIPCHandle(InetSocketAddress remoteAddress, int retries) throws IOException, InterruptedException {
@@ -174,6 +180,8 @@
private class NetworkThread extends Thread {
private final Selector selector;
+ private final Set<SocketChannel> openChannels = new HashSet<>();
+
public NetworkThread() {
super("IPC Network Listener Thread [" + address + "]");
setDaemon(true);
@@ -187,6 +195,14 @@
@Override
public void run() {
try {
+ doRun();
+ } finally {
+ cleanup();
+ }
+ }
+
+ private void doRun() {
+ try {
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
} catch (ClosedChannelException e) {
throw new RuntimeException(e);
@@ -204,6 +220,7 @@
if (!workingPendingConnections.isEmpty()) {
for (IPCHandle handle : workingPendingConnections) {
SocketChannel channel = SocketChannel.open();
+ openChannels.add(channel);
channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
channel.configureBlocking(false);
SelectionKey cKey;
@@ -269,7 +286,8 @@
system.getPerformanceCounters().addMessageBytesReceived(len);
if (len < 0) {
key.cancel();
- channel.close();
+ IOUtils.closeQuietly(channel);
+ openChannels.remove(channel);
handle.close();
} else {
handle.processIncomingMessages();
@@ -285,7 +303,8 @@
system.getPerformanceCounters().addMessageBytesSent(len);
if (len < 0) {
key.cancel();
- channel.close();
+ IOUtils.closeQuietly(channel);
+ openChannels.remove(channel);
handle.close();
} else if (!writeBuffer.hasRemaining()) {
key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
@@ -297,6 +316,7 @@
} else if (key.isAcceptable()) {
assert sc == serverSocketChannel;
SocketChannel channel = serverSocketChannel.accept();
+ openChannels.add(channel);
channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
channel.configureBlocking(false);
IPCHandle handle = new IPCHandle(system, null);
@@ -334,6 +354,14 @@
}
}
+ private void cleanup() {
+ for (Channel channel : openChannels) {
+ IOUtils.closeQuietly(channel);
+ }
+ openChannels.clear();
+ IOUtils.closeQuietly(selector);
+ }
+
private boolean finishConnect(SocketChannel channel) {
boolean connectFinished = false;
try {
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java
index 0997e57..d9ab210 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java
@@ -59,7 +59,8 @@
public void start() {
cMgr.start();
}
- public void stop() throws IOException{
+
+ public void stop() {
cMgr.stop();
}