Extensible exception handling in QueryServiceServlet

Change-Id: If8037a97f3d0b0febb8caf68e099f1fd24e0ac49
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1836
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <hubailmor@gmail.com>
diff --git a/asterixdb/asterix-app/pom.xml b/asterixdb/asterix-app/pom.xml
index 01afdcf..beac085 100644
--- a/asterixdb/asterix-app/pom.xml
+++ b/asterixdb/asterix-app/pom.xml
@@ -433,6 +433,10 @@
       <artifactId>hyracks-net</artifactId>
     </dependency>
     <dependency>
+      <groupId>org.apache.hyracks</groupId>
+      <artifactId>hyracks-ipc</artifactId>
+    </dependency>
+    <dependency>
       <groupId>javax.xml.bind</groupId>
       <artifactId>jaxb-api</artifactId>
       <scope>test</scope>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
index ba98f73..dea5259 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
@@ -202,9 +202,7 @@
             stopNcTheard.join();
         }
 
-        if (cc != null) {
-            cc.stop();
-        }
+        stopCC(false);
 
         if (deleteOldInstanceData) {
             deleteTransactionLogs();
@@ -212,6 +210,13 @@
         }
     }
 
+    public void stopCC(boolean terminateNCService) throws Exception {
+        if (cc != null) {
+            cc.stop(terminateNCService);
+            cc = null;
+        }
+    }
+
     protected String getDefaultStoragePath() {
         return joinPath("target", "io", "dir");
     }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
index 2b70685..9547514 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
@@ -20,12 +20,16 @@
 package org.apache.asterix.api.http.server;
 
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeoutException;
+import java.util.logging.Level;
 
+import io.netty.handler.codec.http.HttpResponseStatus;
 import org.apache.asterix.algebra.base.ILangExtension;
 import org.apache.asterix.app.message.ExecuteStatementRequestMessage;
 import org.apache.asterix.app.message.ExecuteStatementResponseMessage;
 import org.apache.asterix.app.result.ResultReader;
 import org.apache.asterix.common.api.IApplicationContext;
+import org.apache.asterix.common.config.GlobalConfig;
 import org.apache.asterix.common.messaging.api.INCMessageBroker;
 import org.apache.asterix.common.messaging.api.MessageFuture;
 import org.apache.asterix.om.types.ARecordType;
@@ -35,6 +39,7 @@
 import org.apache.hyracks.api.application.INCServiceContext;
 import org.apache.hyracks.api.dataset.ResultSetId;
 import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.ipc.exceptions.IPCException;
 
 /**
  * Query service servlet that can run on NC nodes.
@@ -91,4 +96,14 @@
             sessionOutput.out().append(responseMsg.getResult());
         }
     }
+
+    @Override
+    protected HttpResponseStatus handleExecuteStatementException(Throwable t) {
+        if (t instanceof IPCException || t instanceof TimeoutException) {
+            GlobalConfig.ASTERIX_LOGGER.log(Level.WARNING, t.toString(), t);
+            return HttpResponseStatus.SERVICE_UNAVAILABLE;
+        } else {
+            return super.handleExecuteStatementException(t);
+        }
+    }
 }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
index ec90ae9..9ee064e 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
@@ -420,21 +420,10 @@
                 ResultUtil.printStatus(sessionOutput, ResultStatus.SUCCESS);
             }
             errorCount = 0;
-        } catch (AlgebricksException | TokenMgrError | org.apache.asterix.aqlplus.parser.TokenMgrError pe) {
-            GlobalConfig.ASTERIX_LOGGER.log(Level.INFO, pe.getMessage(), pe);
-            ResultUtil.printError(resultWriter, pe);
-            ResultUtil.printStatus(sessionOutput, ResultStatus.FATAL);
-            status = HttpResponseStatus.BAD_REQUEST;
-        } catch (HyracksException pe) {
-            GlobalConfig.ASTERIX_LOGGER.log(Level.WARNING, pe.getMessage(), pe);
-            ResultUtil.printError(resultWriter, pe);
-            ResultUtil.printStatus(sessionOutput, ResultStatus.FATAL);
-            status = HttpResponseStatus.INTERNAL_SERVER_ERROR;
-        } catch (Exception e) {
-            GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, "Unexpected exception", e);
+        } catch (Exception | TokenMgrError | org.apache.asterix.aqlplus.parser.TokenMgrError e) {
+            status = handleExecuteStatementException(e);
             ResultUtil.printError(resultWriter, e);
             ResultUtil.printStatus(sessionOutput, ResultStatus.FATAL);
-            status = HttpResponseStatus.INTERNAL_SERVER_ERROR;
         } finally {
             if (execStartEnd[0] == -1) {
                 execStartEnd[1] = -1;
@@ -475,4 +464,18 @@
                 param.clientContextID, queryCtx);
         outExecStartEnd[1] = System.nanoTime();
     }
+
+    protected HttpResponseStatus handleExecuteStatementException(Throwable t) {
+        if (t instanceof org.apache.asterix.aqlplus.parser.TokenMgrError || t instanceof TokenMgrError
+                || t instanceof AlgebricksException) {
+            GlobalConfig.ASTERIX_LOGGER.log(Level.INFO, t.getMessage(), t);
+            return HttpResponseStatus.BAD_REQUEST;
+        } else if (t instanceof HyracksException) {
+            GlobalConfig.ASTERIX_LOGGER.log(Level.WARNING, t.getMessage(), t);
+            return HttpResponseStatus.INTERNAL_SERVER_ERROR;
+        } else {
+            GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, "Unexpected exception", t);
+            return HttpResponseStatus.INTERNAL_SERVER_ERROR;
+        }
+    }
 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorApiServletTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorApiServletTest.java
index 8b1bbe0..3d6543b 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorApiServletTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorApiServletTest.java
@@ -48,7 +48,11 @@
 import org.apache.hyracks.api.io.ManagedFileSplit;
 import org.apache.hyracks.http.api.IServletRequest;
 import org.apache.hyracks.http.api.IServletResponse;
+import org.junit.After;
+import org.junit.AfterClass;
 import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -61,10 +65,20 @@
 
 public class ConnectorApiServletTest {
 
-    @Test
-    public void testGet() throws Exception {
+    @BeforeClass
+    public static void setup() throws Exception {
         // Starts test asterixdb cluster.
         SqlppExecutionTest.setUp();
+    }
+
+    @AfterClass
+    public static void teardown() throws Exception {
+        // Tears down the asterixdb cluster.
+        SqlppExecutionTest.tearDown();
+    }
+
+    @Test
+    public void testGet() throws Exception {
         // Configures a test connector api servlet.
         ConnectorApiServlet let = new ConnectorApiServlet(new ConcurrentHashMap<>(), new String[] { "/" },
                 (ICcApplicationContext) ExecutionTestUtil.integrationUtil.cc.getApplicationContext());
@@ -113,9 +127,6 @@
         ArrayNode splits = (ArrayNode) actualResponse.get("splits");
         String path = (splits.get(0)).get("path").asText();
         Assert.assertTrue(path.endsWith("Metadata/Dataset_idx_Dataset"));
-
-        // Tears down the asterixdb cluster.
-        SqlppExecutionTest.tearDown();
     }
 
     @Test
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java
index 1744836..50c2986 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java
@@ -29,6 +29,7 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.function.Predicate;
 
 import org.apache.asterix.common.utils.Servlets;
 import org.apache.asterix.test.runtime.SqlppExecutionWithCancellationTest;
@@ -46,14 +47,15 @@
 
     @Override
     public InputStream executeQueryService(String str, TestCaseContext.OutputFormat fmt, URI uri,
-            List<TestCase.CompilationUnit.Parameter> params, boolean jsonEncoded, boolean cancellable)
-            throws Exception {
+            List<TestCase.CompilationUnit.Parameter> params, boolean jsonEncoded,
+            Predicate<Integer> responseCodeValidator, boolean cancellable) throws Exception {
         String clientContextId = UUID.randomUUID().toString();
         final List<TestCase.CompilationUnit.Parameter> newParams =
                 cancellable ? upsertParam(params, "client_context_id", clientContextId) : params;
         Callable<InputStream> query = () -> {
             try {
-                return CancellationTestExecutor.super.executeQueryService(str, fmt, uri, newParams, jsonEncoded, true);
+                return CancellationTestExecutor.super.executeQueryService(str, fmt, uri, newParams, jsonEncoded,
+                        responseCodeValidator, true);
             } catch (Exception e) {
                 e.printStackTrace();
                 throw e;
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
index 35ab1df..ed6a77a 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
@@ -518,17 +518,27 @@
 
     public InputStream executeQueryService(String str, OutputFormat fmt, URI uri,
             List<CompilationUnit.Parameter> params, boolean jsonEncoded) throws Exception {
-        return executeQueryService(str, fmt, uri, params, jsonEncoded, false);
+        return executeQueryService(str, fmt, uri, params, jsonEncoded, null, false);
+    }
+
+    public InputStream executeQueryService(String str, OutputFormat fmt, URI uri,
+            List<CompilationUnit.Parameter> params, boolean jsonEncoded, Predicate<Integer> responseCodeValidator)
+            throws Exception {
+        return executeQueryService(str, fmt, uri, params, jsonEncoded, responseCodeValidator, false);
     }
 
     protected InputStream executeQueryService(String str, OutputFormat fmt, URI uri,
-            List<CompilationUnit.Parameter> params, boolean jsonEncoded, boolean cancellable) throws Exception {
+            List<CompilationUnit.Parameter> params, boolean jsonEncoded, Predicate<Integer> responseCodeValidator,
+            boolean cancellable) throws Exception {
         final List<CompilationUnit.Parameter> newParams = upsertParam(params, "format", fmt.mimeType());
         HttpUriRequest method = jsonEncoded ? constructPostMethodJson(str, uri, "statement", newParams)
                 : constructPostMethodUrl(str, uri, "statement", newParams);
         // Set accepted output response type
         method.setHeader("Accept", OutputFormat.CLEAN_JSON.mimeType());
         HttpResponse response = executeHttpRequest(method);
+        if (responseCodeValidator != null) {
+            checkResponse(response, responseCodeValidator);
+        }
         return response.getEntity().getContent();
     }
 
@@ -637,8 +647,13 @@
     }
 
     public InputStream executeJSONGet(OutputFormat fmt, URI uri) throws Exception {
+        return executeJSONGet(fmt, uri, code -> code == HttpStatus.SC_OK);
+    }
+
+    public InputStream executeJSONGet(OutputFormat fmt, URI uri, Predicate<Integer> responseCodeValidator)
+            throws Exception {
         HttpUriRequest request = constructGetMethod(uri, fmt, new ArrayList<>());
-        HttpResponse response = executeAndCheckHttpRequest(request);
+        HttpResponse response = executeAndCheckHttpRequest(request, responseCodeValidator);
         return response.getEntity().getContent();
     }
 
@@ -1101,7 +1116,7 @@
             }
             final URI uri = getEndpoint(Servlets.QUERY_SERVICE);
             if (DELIVERY_IMMEDIATE.equals(delivery)) {
-                resultStream = executeQueryService(statement, fmt, uri, params, true, true);
+                resultStream = executeQueryService(statement, fmt, uri, params, true, null, true);
                 resultStream = ResultExtractor.extract(resultStream);
             } else {
                 String handleVar = getHandleVariable(statement);
@@ -1355,7 +1370,7 @@
         return uri;
     }
 
-    protected URI getEndpoint(String servlet) throws URISyntaxException {
+    public URI getEndpoint(String servlet) throws URISyntaxException {
         return createEndpointURI(getPath(servlet).replaceAll("/\\*$", ""), null);
     }
 
diff --git a/asterixdb/pom.xml b/asterixdb/pom.xml
index e49f9aa..dd86bea 100644
--- a/asterixdb/pom.xml
+++ b/asterixdb/pom.xml
@@ -826,6 +826,11 @@
       </dependency>
       <dependency>
         <groupId>org.apache.hyracks</groupId>
+        <artifactId>hyracks-ipc</artifactId>
+        <version>${hyracks.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.hyracks</groupId>
         <artifactId>algebricks-compiler</artifactId>
         <version>${algebricks.version}</version>
       </dependency>
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
index 00b3cb6..e89ed56 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
@@ -78,7 +78,7 @@
 
     protected void respond(ChannelHandlerContext ctx, HttpVersion httpVersion, HttpResponseStatus status) {
         DefaultHttpResponse response = new DefaultHttpResponse(httpVersion, status);
-        ctx.write(response).addListener(ChannelFutureListener.CLOSE);
+        ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
     }
 
     private void submit(ChannelHandlerContext ctx, IServlet servlet, FullHttpRequest request) throws IOException {
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/pom.xml b/hyracks-fullstack/hyracks/hyracks-ipc/pom.xml
index 45c5e04..cc3a513 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/pom.xml
@@ -41,6 +41,10 @@
   </properties>
   <dependencies>
     <dependency>
+      <groupId>commons-io</groupId>
+      <artifactId>commons-io</artifactId>
+    </dependency>
+    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <scope>test</scope>
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
index 9efd70e..d1659a8 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
@@ -23,6 +23,7 @@
 import java.net.ServerSocket;
 import java.net.StandardSocketOptions;
 import java.nio.ByteBuffer;
+import java.nio.channels.Channel;
 import java.nio.channels.ClosedChannelException;
 import java.nio.channels.SelectableChannel;
 import java.nio.channels.SelectionKey;
@@ -32,13 +33,17 @@
 import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import org.apache.commons.io.IOUtils;
+
 public class IPCConnectionManager {
     private static final Logger LOGGER = Logger.getLogger(IPCConnectionManager.class.getName());
 
@@ -88,9 +93,10 @@
         networkThread.start();
     }
 
-    void stop() throws IOException {
+    void stop() {
         stopped = true;
-        serverSocketChannel.close();
+        IOUtils.closeQuietly(serverSocketChannel);
+        networkThread.selector.wakeup();
     }
 
     IPCHandle getIPCHandle(InetSocketAddress remoteAddress, int retries) throws IOException, InterruptedException {
@@ -174,6 +180,8 @@
     private class NetworkThread extends Thread {
         private final Selector selector;
 
+        private final Set<SocketChannel> openChannels = new HashSet<>();
+
         public NetworkThread() {
             super("IPC Network Listener Thread [" + address + "]");
             setDaemon(true);
@@ -187,6 +195,14 @@
         @Override
         public void run() {
             try {
+                doRun();
+            } finally {
+                cleanup();
+            }
+        }
+
+        private void doRun() {
+            try {
                 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
             } catch (ClosedChannelException e) {
                 throw new RuntimeException(e);
@@ -204,6 +220,7 @@
                     if (!workingPendingConnections.isEmpty()) {
                         for (IPCHandle handle : workingPendingConnections) {
                             SocketChannel channel = SocketChannel.open();
+                            openChannels.add(channel);
                             channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
                             channel.configureBlocking(false);
                             SelectionKey cKey;
@@ -269,7 +286,8 @@
                                 system.getPerformanceCounters().addMessageBytesReceived(len);
                                 if (len < 0) {
                                     key.cancel();
-                                    channel.close();
+                                    IOUtils.closeQuietly(channel);
+                                    openChannels.remove(channel);
                                     handle.close();
                                 } else {
                                     handle.processIncomingMessages();
@@ -285,7 +303,8 @@
                                 system.getPerformanceCounters().addMessageBytesSent(len);
                                 if (len < 0) {
                                     key.cancel();
-                                    channel.close();
+                                    IOUtils.closeQuietly(channel);
+                                    openChannels.remove(channel);
                                     handle.close();
                                 } else if (!writeBuffer.hasRemaining()) {
                                     key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
@@ -297,6 +316,7 @@
                             } else if (key.isAcceptable()) {
                                 assert sc == serverSocketChannel;
                                 SocketChannel channel = serverSocketChannel.accept();
+                                openChannels.add(channel);
                                 channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
                                 channel.configureBlocking(false);
                                 IPCHandle handle = new IPCHandle(system, null);
@@ -334,6 +354,14 @@
             }
         }
 
+        private void cleanup() {
+            for (Channel channel : openChannels) {
+                IOUtils.closeQuietly(channel);
+            }
+            openChannels.clear();
+            IOUtils.closeQuietly(selector);
+        }
+
         private boolean finishConnect(SocketChannel channel) {
             boolean connectFinished = false;
             try {
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java
index 0997e57..d9ab210 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java
@@ -59,7 +59,8 @@
     public void start() {
         cMgr.start();
     }
-    public void stop() throws IOException{
+
+    public void stop() {
         cMgr.stop();
     }