Use Chunked Http Response
Change-Id: I249180f58e92058dd3b264ea17c4196b4baf4348
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1474
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
index c38e0a9..ab05f10 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
@@ -52,10 +52,11 @@
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.dataset.IHyracksDataset;
import org.apache.hyracks.client.dataset.HyracksDataset;
+import org.apache.hyracks.http.api.IServlet;
+import org.apache.hyracks.http.api.IServletRequest;
+import org.apache.hyracks.http.api.IServletResponse;
import org.apache.hyracks.http.server.AbstractServlet;
-import org.apache.hyracks.http.server.IServlet;
-import org.apache.hyracks.http.server.IServletRequest;
-import org.apache.hyracks.http.server.IServletResponse;
+import org.apache.hyracks.http.server.util.ServletUtils;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpResponseStatus;
@@ -107,7 +108,7 @@
String printJob = request.getParameter("print-job");
String executeQuery = request.getParameter("execute-query");
try {
- IServletResponse.setContentType(response, IServlet.ContentType.TEXT_HTML, IServlet.Encoding.UTF8);
+ ServletUtils.setContentType(response, IServlet.ContentType.TEXT_HTML, IServlet.Encoding.UTF8);
} catch (IOException e) {
LOGGER.log(Level.WARNING, "Failure setting content type", e);
response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
@@ -158,7 +159,7 @@
if ("/".equals(requestURI)) {
try {
- IServletResponse.setContentType(response, IServlet.ContentType.TEXT_HTML, IServlet.Encoding.UTF8);
+ ServletUtils.setContentType(response, IServlet.ContentType.TEXT_HTML, IServlet.Encoding.UTF8);
} catch (IOException e) {
LOGGER.log(Level.WARNING, "Failure setting content type", e);
response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
@@ -177,7 +178,7 @@
// Special handler for font files and .png resources
if (resourcePath.endsWith(".png")) {
BufferedImage img = ImageIO.read(is);
- IServletResponse.setContentType(response, IServlet.ContentType.IMG_PNG);
+ ServletUtils.setContentType(response, IServlet.ContentType.IMG_PNG);
OutputStream outputStream = response.outputStream();
String formatName = "png";
ImageIO.write(img, formatName, outputStream);
@@ -185,7 +186,7 @@
return;
}
String type = IServlet.ContentType.mime(QueryWebInterfaceServlet.extension(resourcePath));
- IServletResponse.setContentType(response, "".equals(type) ? IServlet.ContentType.TEXT_PLAIN : type,
+ ServletUtils.setContentType(response, "".equals(type) ? IServlet.ContentType.TEXT_PLAIN : type,
IServlet.Encoding.UTF8);
writeOutput(response, is, resourcePath);
} catch (IOException e) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java
index 6fd6c47..038ed2f 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java
@@ -32,10 +32,11 @@
import org.apache.asterix.common.config.ReplicationProperties;
import org.apache.asterix.common.utils.JSONUtil;
import org.apache.asterix.runtime.util.ClusterStateManager;
+import org.apache.hyracks.http.api.IServlet;
+import org.apache.hyracks.http.api.IServletRequest;
+import org.apache.hyracks.http.api.IServletResponse;
import org.apache.hyracks.http.server.AbstractServlet;
-import org.apache.hyracks.http.server.IServlet;
-import org.apache.hyracks.http.server.IServletRequest;
-import org.apache.hyracks.http.server.IServletResponse;
+import org.apache.hyracks.http.server.util.ServletUtils;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
@@ -66,7 +67,7 @@
}
protected void getUnsafe(IServletRequest request, IServletResponse response) throws IOException {
- IServletResponse.setContentType(response, IServlet.ContentType.APPLICATION_JSON, IServlet.Encoding.UTF8);
+ ServletUtils.setContentType(response, IServlet.ContentType.APPLICATION_JSON, IServlet.Encoding.UTF8);
PrintWriter responseWriter = response.writer();
try {
ObjectNode json;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterControllerDetailsApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterControllerDetailsApiServlet.java
index 4419e8a..3d9167e 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterControllerDetailsApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterControllerDetailsApiServlet.java
@@ -27,9 +27,10 @@
import java.util.logging.Logger;
import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.http.server.IServlet;
-import org.apache.hyracks.http.server.IServletRequest;
-import org.apache.hyracks.http.server.IServletResponse;
+import org.apache.hyracks.http.api.IServlet;
+import org.apache.hyracks.http.api.IServletRequest;
+import org.apache.hyracks.http.api.IServletResponse;
+import org.apache.hyracks.http.server.util.ServletUtils;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -57,7 +58,7 @@
} else {
json = processNode(request, hcc);
}
- IServletResponse.setContentType(response, IServlet.ContentType.APPLICATION_JSON, IServlet.Encoding.UTF8);
+ ServletUtils.setContentType(response, IServlet.ContentType.APPLICATION_JSON, IServlet.Encoding.UTF8);
responseWriter.write(new ObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(json));
} catch (IllegalArgumentException e) { // NOSONAR - exception not logged or rethrown
response.setStatus(HttpResponseStatus.NOT_FOUND);
@@ -69,8 +70,7 @@
responseWriter.flush();
}
- private ObjectNode processNode(IServletRequest request, IHyracksClientConnection hcc)
- throws Exception {
+ private ObjectNode processNode(IServletRequest request, IHyracksClientConnection hcc) throws Exception {
String pathInfo = path(request);
if (pathInfo.endsWith("/")) {
throw new IllegalArgumentException();
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ConnectorApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ConnectorApiServlet.java
index d832672..caa00f1 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ConnectorApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ConnectorApiServlet.java
@@ -38,10 +38,11 @@
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.client.NodeControllerInfo;
import org.apache.hyracks.api.io.FileSplit;
+import org.apache.hyracks.http.api.IServlet;
+import org.apache.hyracks.http.api.IServletRequest;
+import org.apache.hyracks.http.api.IServletResponse;
import org.apache.hyracks.http.server.AbstractServlet;
-import org.apache.hyracks.http.server.IServlet;
-import org.apache.hyracks.http.server.IServletRequest;
-import org.apache.hyracks.http.server.IServletResponse;
+import org.apache.hyracks.http.server.util.ServletUtils;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
@@ -71,7 +72,7 @@
}
response.setStatus(HttpResponseStatus.OK);
try {
- IServletResponse.setContentType(response, IServlet.ContentType.TEXT_HTML, IServlet.Encoding.UTF8);
+ ServletUtils.setContentType(response, IServlet.ContentType.TEXT_HTML, IServlet.Encoding.UTF8);
} catch (IOException e) {
LOGGER.log(Level.WARNING, "Failure setting content type", e);
response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
@@ -107,8 +108,8 @@
return;
}
boolean temp = dataset.getDatasetDetails().isTemp();
- FileSplit[] fileSplits = metadataProvider.splitsForDataset(mdTxnCtx, dataverseName, datasetName,
- datasetName, temp);
+ FileSplit[] fileSplits =
+ metadataProvider.splitsForDataset(mdTxnCtx, dataverseName, datasetName, datasetName, temp);
ARecordType recordType = (ARecordType) metadataProvider.findType(dataset.getItemTypeDataverseName(),
dataset.getItemTypeName());
List<List<String>> primaryKeys = DatasetUtils.getPartitioningKeys(dataset);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DdlApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DdlApiServlet.java
index bcc6914..be6e280 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DdlApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DdlApiServlet.java
@@ -23,7 +23,7 @@
import org.apache.asterix.compiler.provider.ILangCompilationProvider;
import org.apache.asterix.lang.common.base.Statement;
import org.apache.asterix.translator.IStatementExecutorFactory;
-import org.apache.hyracks.http.server.IServletRequest;
+import org.apache.hyracks.http.api.IServletRequest;
public class DdlApiServlet extends RestApiServlet {
private static final byte ALLOWED_CATEGORIES =
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java
index d91352d..e6a32a3 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java
@@ -37,9 +37,10 @@
import org.apache.asterix.api.http.servlet.ServletConstants;
import org.apache.asterix.runtime.util.AppContextInfo;
import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.http.server.IServlet;
-import org.apache.hyracks.http.server.IServletRequest;
-import org.apache.hyracks.http.server.IServletResponse;
+import org.apache.hyracks.http.api.IServlet;
+import org.apache.hyracks.http.api.IServletRequest;
+import org.apache.hyracks.http.api.IServletResponse;
+import org.apache.hyracks.http.server.util.ServletUtils;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
@@ -56,7 +57,7 @@
@Override
protected void getUnsafe(IServletRequest request, IServletResponse response) throws IOException {
- IServletResponse.setContentType(response, IServlet.ContentType.APPLICATION_JSON, IServlet.Encoding.UTF8);
+ ServletUtils.setContentType(response, IServlet.ContentType.APPLICATION_JSON, IServlet.Encoding.UTF8);
PrintWriter responseWriter = response.writer();
ObjectNode json;
ObjectMapper om = new ObjectMapper();
@@ -95,7 +96,8 @@
Map<String, Map<String, Future<ObjectNode>>> ncDataMap = new HashMap<>();
for (String nc : AppContextInfo.INSTANCE.getMetadataProperties().getNodeNames()) {
Map<String, Future<ObjectNode>> ncData = new HashMap<>();
- ncData.put("threaddump", executor.submit(() -> fixupKeys((ObjectNode) om.readTree(hcc.getThreadDump(nc)))));
+ ncData.put("threaddump",
+ executor.submit(() -> fixupKeys((ObjectNode) om.readTree(hcc.getThreadDump(nc)))));
ncData.put("config", executor
.submit(() -> fixupKeys((ObjectNode) om.readTree(hcc.getNodeDetailsJSON(nc, false, true)))));
ncData.put("stats", executor.submit(() -> fixupKeys(processNodeStats(hcc, nc))));
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/FeedServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/FeedServlet.java
index ac79088..7fe7370 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/FeedServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/FeedServlet.java
@@ -31,10 +31,11 @@
import javax.imageio.ImageIO;
+import org.apache.hyracks.http.api.IServlet;
+import org.apache.hyracks.http.api.IServletRequest;
+import org.apache.hyracks.http.api.IServletResponse;
import org.apache.hyracks.http.server.AbstractServlet;
-import org.apache.hyracks.http.server.IServlet;
-import org.apache.hyracks.http.server.IServletRequest;
-import org.apache.hyracks.http.server.IServletResponse;
+import org.apache.hyracks.http.server.util.ServletUtils;
import io.netty.handler.codec.http.HttpResponseStatus;
@@ -53,7 +54,7 @@
String requestURI = request.getHttpRequest().uri();
if ("/".equals(requestURI)) {
- IServletResponse.setContentType(response, IServlet.ContentType.TEXT_HTML);
+ ServletUtils.setContentType(response, IServlet.ContentType.TEXT_HTML);
resourcePath = "/feed/home.html";
} else {
resourcePath = requestURI;
@@ -71,12 +72,12 @@
BufferedImage img = ImageIO.read(is);
OutputStream outputStream = response.outputStream();
String formatName = "png";
- IServletResponse.setContentType(response, IServlet.ContentType.IMG_PNG);
+ ServletUtils.setContentType(response, IServlet.ContentType.IMG_PNG);
ImageIO.write(img, formatName, outputStream);
return;
}
- IServletResponse.setContentType(response, IServlet.ContentType.TEXT_HTML, IServlet.Encoding.UTF8);
+ ServletUtils.setContentType(response, IServlet.ContentType.TEXT_HTML, IServlet.Encoding.UTF8);
InputStreamReader isr = new InputStreamReader(is);
StringBuilder sb = new StringBuilder();
BufferedReader br = new BufferedReader(isr);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/FullApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/FullApiServlet.java
index 7788136..9c08fbd 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/FullApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/FullApiServlet.java
@@ -23,7 +23,7 @@
import org.apache.asterix.compiler.provider.ILangCompilationProvider;
import org.apache.asterix.lang.common.base.Statement;
import org.apache.asterix.translator.IStatementExecutorFactory;
-import org.apache.hyracks.http.server.IServletRequest;
+import org.apache.hyracks.http.api.IServletRequest;
public class FullApiServlet extends RestApiServlet {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NodeControllerDetailsApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NodeControllerDetailsApiServlet.java
index c1423e7..2dbaa54 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NodeControllerDetailsApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NodeControllerDetailsApiServlet.java
@@ -31,9 +31,10 @@
import org.apache.asterix.runtime.util.ClusterStateManager;
import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.http.server.IServlet;
-import org.apache.hyracks.http.server.IServletRequest;
-import org.apache.hyracks.http.server.IServletResponse;
+import org.apache.hyracks.http.api.IServlet;
+import org.apache.hyracks.http.api.IServletRequest;
+import org.apache.hyracks.http.api.IServletResponse;
+import org.apache.hyracks.http.server.util.ServletUtils;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -66,7 +67,7 @@
} else {
json = processNode(request, hcc);
}
- IServletResponse.setContentType(response, IServlet.ContentType.APPLICATION_JSON, IServlet.Encoding.UTF8);
+ ServletUtils.setContentType(response, IServlet.ContentType.APPLICATION_JSON, IServlet.Encoding.UTF8);
responseWriter.write(om.writerWithDefaultPrettyPrinter().writeValueAsString(json));
} catch (IllegalStateException e) { // NOSONAR - exception not logged or rethrown
response.setStatus(HttpResponseStatus.SERVICE_UNAVAILABLE);
@@ -80,8 +81,7 @@
responseWriter.flush();
}
- private ObjectNode processNode(IServletRequest request, IHyracksClientConnection hcc)
- throws Exception {
+ private ObjectNode processNode(IServletRequest request, IHyracksClientConnection hcc) throws Exception {
String pathInfo = path(request);
if (pathInfo.endsWith("/")) {
throw new IllegalArgumentException();
@@ -172,7 +172,7 @@
}
final JsonNode value = valueArray.get(index);
json.remove(key);
- json.set(key.replaceAll("s$",""), value);
+ json.set(key.replaceAll("s$", ""), value);
}
}
}
@@ -205,8 +205,7 @@
String dump = hcc.getThreadDump(node);
if (dump == null) {
// check to see if this is a node that is simply down
- throw ClusterStateManager.INSTANCE.getNodePartitions(node) != null
- ? new IllegalStateException()
+ throw ClusterStateManager.INSTANCE.getNodePartitions(node) != null ? new IllegalStateException()
: new IllegalArgumentException();
}
return (ObjectNode) om.readTree(dump);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryApiServlet.java
index 917d9a8..160c801 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryApiServlet.java
@@ -23,7 +23,7 @@
import org.apache.asterix.compiler.provider.ILangCompilationProvider;
import org.apache.asterix.lang.common.base.Statement;
import org.apache.asterix.translator.IStatementExecutorFactory;
-import org.apache.hyracks.http.server.IServletRequest;
+import org.apache.hyracks.http.api.IServletRequest;
public class QueryApiServlet extends RestApiServlet {
private static final byte ALLOWED_CATEGORIES = Statement.Category.QUERY;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
index 6240f51..06ddf44 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
@@ -39,10 +39,11 @@
import org.apache.hyracks.api.dataset.ResultSetId;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.client.dataset.HyracksDataset;
+import org.apache.hyracks.http.api.IServlet;
+import org.apache.hyracks.http.api.IServletRequest;
+import org.apache.hyracks.http.api.IServletResponse;
import org.apache.hyracks.http.server.AbstractServlet;
-import org.apache.hyracks.http.server.IServlet;
-import org.apache.hyracks.http.server.IServletRequest;
-import org.apache.hyracks.http.server.IServletResponse;
+import org.apache.hyracks.http.server.util.ServletUtils;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
@@ -67,7 +68,7 @@
response.setStatus(HttpResponseStatus.OK);
// TODO this seems wrong ...
try {
- IServletResponse.setContentType(response, IServlet.ContentType.TEXT_HTML, IServlet.Encoding.UTF8);
+ ServletUtils.setContentType(response, IServlet.ContentType.TEXT_HTML, IServlet.Encoding.UTF8);
} catch (IOException e) {
LOGGER.log(Level.WARNING, "Failure setting content type", e);
response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
index b302bab..43530ea 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
@@ -53,10 +53,11 @@
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.dataset.IHyracksDataset;
import org.apache.hyracks.client.dataset.HyracksDataset;
+import org.apache.hyracks.http.api.IServlet;
+import org.apache.hyracks.http.api.IServletRequest;
+import org.apache.hyracks.http.api.IServletResponse;
import org.apache.hyracks.http.server.AbstractServlet;
-import org.apache.hyracks.http.server.IServlet;
-import org.apache.hyracks.http.server.IServletRequest;
-import org.apache.hyracks.http.server.IServletResponse;
+import org.apache.hyracks.http.server.util.ServletUtils;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.JsonProcessingException;
@@ -74,8 +75,7 @@
private final IStatementExecutorFactory statementExecutorFactory;
public QueryServiceServlet(ConcurrentMap<String, Object> ctx, String[] paths,
- ILangCompilationProvider compilationProvider,
- IStatementExecutorFactory statementExecutorFactory) {
+ ILangCompilationProvider compilationProvider, IStatementExecutorFactory statementExecutorFactory) {
super(ctx, paths);
this.compilationProvider = compilationProvider;
this.statementExecutorFactory = statementExecutorFactory;
@@ -325,8 +325,8 @@
SessionConfig.ResultDecorator handlePostfix = (AlgebricksAppendable app) -> app.append(",\n");
SessionConfig.OutputFormat format = getFormat(param.format);
- SessionConfig sessionConfig = new SessionConfig(resultWriter, format, resultPrefix, resultPostfix,
- handlePrefix, handlePostfix);
+ SessionConfig sessionConfig =
+ new SessionConfig(resultWriter, format, resultPrefix, resultPostfix, handlePrefix, handlePostfix);
sessionConfig.set(SessionConfig.FORMAT_WRAPPER_ARRAY, true);
sessionConfig.set(SessionConfig.FORMAT_INDENT_JSON, param.pretty);
sessionConfig.set(SessionConfig.FORMAT_QUOTE_RECORD,
@@ -482,7 +482,7 @@
QueryTranslator.ResultDelivery delivery = parseResultDelivery(param.mode);
SessionConfig sessionConfig = createSessionConfig(param, resultWriter);
- IServletResponse.setContentType(response, IServlet.ContentType.APPLICATION_JSON, IServlet.Encoding.UTF8);
+ ServletUtils.setContentType(response, IServlet.ContentType.APPLICATION_JSON, IServlet.Encoding.UTF8);
HttpResponseStatus status = HttpResponseStatus.OK;
Stats stats = new Stats();
@@ -517,8 +517,8 @@
IParser parser = compilationProvider.getParserFactory().createParser(param.statement);
List<Statement> statements = parser.parse();
MetadataManager.INSTANCE.init();
- IStatementExecutor translator = statementExecutorFactory.create(statements, sessionConfig,
- compilationProvider);
+ IStatementExecutor translator =
+ statementExecutorFactory.create(statements, sessionConfig, compilationProvider);
execStart = System.nanoTime();
translator.compileAndExecute(hcc, hds, delivery, stats);
execEnd = System.nanoTime();
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryStatusApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryStatusApiServlet.java
index 197a39c..039c740 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryStatusApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryStatusApiServlet.java
@@ -33,10 +33,11 @@
import org.apache.hyracks.api.dataset.ResultSetId;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.client.dataset.HyracksDataset;
+import org.apache.hyracks.http.api.IServlet;
+import org.apache.hyracks.http.api.IServletRequest;
+import org.apache.hyracks.http.api.IServletResponse;
import org.apache.hyracks.http.server.AbstractServlet;
-import org.apache.hyracks.http.server.IServlet;
-import org.apache.hyracks.http.server.IServletRequest;
-import org.apache.hyracks.http.server.IServletResponse;
+import org.apache.hyracks.http.server.util.ServletUtils;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -60,7 +61,7 @@
}
response.setStatus(HttpResponseStatus.OK);
try {
- IServletResponse.setContentType(response, IServlet.ContentType.TEXT_HTML, IServlet.Encoding.UTF8);
+ ServletUtils.setContentType(response, IServlet.ContentType.TEXT_HTML, IServlet.Encoding.UTF8);
} catch (IOException e) {
LOGGER.log(Level.WARNING, "Failure setting content type", e);
response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryWebInterfaceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryWebInterfaceServlet.java
index d924cf1..fac5883 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryWebInterfaceServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryWebInterfaceServlet.java
@@ -29,10 +29,11 @@
import org.apache.asterix.common.config.ExternalProperties;
import org.apache.asterix.runtime.util.AppContextInfo;
import org.apache.commons.io.IOUtils;
+import org.apache.hyracks.http.api.IServlet;
+import org.apache.hyracks.http.api.IServletRequest;
+import org.apache.hyracks.http.api.IServletResponse;
import org.apache.hyracks.http.server.AbstractServlet;
-import org.apache.hyracks.http.server.IServlet;
-import org.apache.hyracks.http.server.IServletRequest;
-import org.apache.hyracks.http.server.IServletResponse;
+import org.apache.hyracks.http.server.util.ServletUtils;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -70,7 +71,7 @@
response.setStatus(HttpResponseStatus.OK);
if ("/".equals(requestURI)) {
- IServletResponse.setContentType(response, IServlet.ContentType.TEXT_HTML);
+ ServletUtils.setContentType(response, IServlet.ContentType.TEXT_HTML);
resourcePath = "/queryui/queryui.html";
} else {
resourcePath = requestURI;
@@ -87,7 +88,7 @@
String mime = IServlet.ContentType.mime(extension);
if (mime != null) {
OutputStream out = response.outputStream();
- IServletResponse.setContentType(response, mime);
+ ServletUtils.setContentType(response, mime);
try {
IOUtils.copy(is, out);
} catch (Exception e) {
@@ -106,7 +107,7 @@
}
private void doPost(IServletResponse response) throws IOException {
- IServletResponse.setContentType(response, IServlet.ContentType.APPLICATION_JSON, IServlet.Encoding.UTF8);
+ ServletUtils.setContentType(response, IServlet.ContentType.APPLICATION_JSON, IServlet.Encoding.UTF8);
ExternalProperties externalProperties = AppContextInfo.INSTANCE.getExternalProperties();
response.setStatus(HttpResponseStatus.OK);
ObjectMapper om = new ObjectMapper();
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
index c2d1d33..787ff47 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
@@ -40,6 +40,7 @@
import org.apache.asterix.lang.common.base.Statement;
import org.apache.asterix.metadata.MetadataManager;
import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
import org.apache.asterix.translator.IStatementExecutorFactory;
import org.apache.asterix.translator.SessionConfig;
import org.apache.asterix.translator.SessionConfig.OutputFormat;
@@ -47,11 +48,13 @@
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.dataset.IHyracksDataset;
import org.apache.hyracks.client.dataset.HyracksDataset;
+import org.apache.hyracks.http.api.IServlet;
+import org.apache.hyracks.http.api.IServletRequest;
+import org.apache.hyracks.http.api.IServletResponse;
import org.apache.hyracks.http.server.AbstractServlet;
-import org.apache.hyracks.http.server.IServlet;
-import org.apache.hyracks.http.server.IServletRequest;
-import org.apache.hyracks.http.server.IServletResponse;
+import org.apache.hyracks.http.server.util.ServletUtils;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -78,17 +81,13 @@
* based on the Accept: header and other servlet parameters.
*/
static SessionConfig initResponse(IServletRequest request, IServletResponse response) throws IOException {
- IServletResponse.setContentType(response, IServlet.ContentType.TEXT_PLAIN, IServlet.Encoding.UTF8);
-
+ ServletUtils.setContentType(response, IServlet.ContentType.TEXT_PLAIN, IServlet.Encoding.UTF8);
// CLEAN_JSON output is the default; most generally useful for a
// programmatic HTTP API
OutputFormat format = OutputFormat.CLEAN_JSON;
// First check the "output" servlet parameter.
String output = request.getParameter("output");
- String accept = request.getHeader("Accept");
- if (accept == null) {
- accept = "";
- }
+ String accept = request.getHeader("Accept", "");
if (output != null) {
if ("CSV".equals(output)) {
format = OutputFormat.CSV;
@@ -114,22 +113,12 @@
SessionConfig.ResultDecorator handlePrefix =
(AlgebricksAppendable app) -> app.append("{ \"").append("handle").append("\": ");
SessionConfig.ResultDecorator handlePostfix = (AlgebricksAppendable app) -> app.append(" }");
-
SessionConfig sessionConfig =
new SessionConfig(response.writer(), format, null, null, handlePrefix, handlePostfix);
// If it's JSON or ADM, check for the "wrapper-array" flag. Default is
// "true" for JSON and "false" for ADM. (Not applicable for CSV.)
- boolean wrapperArray;
- switch (format) {
- case CLEAN_JSON:
- case LOSSLESS_JSON:
- wrapperArray = true;
- break;
- default:
- wrapperArray = false;
- break;
- }
+ boolean wrapperArray = format == OutputFormat.CLEAN_JSON || format == OutputFormat.LOSSLESS_JSON;
String wrapperParam = request.getParameter("wrapper-array");
if (wrapperParam != null) {
wrapperArray = Boolean.valueOf(wrapperParam);
@@ -142,22 +131,24 @@
// Now that format is set, output the content-type
switch (format) {
case ADM:
- IServletResponse.setContentType(response, "application/x-adm");
+ ServletUtils.setContentType(response, "application/x-adm");
break;
case CLEAN_JSON:
// No need to reflect "clean-ness" in output type; fall through
case LOSSLESS_JSON:
- IServletResponse.setContentType(response, "application/json");
+ ServletUtils.setContentType(response, "application/json");
break;
case CSV:
// Check for header parameter or in Accept:.
if ("present".equals(request.getParameter("header")) || accept.contains("header=present")) {
- IServletResponse.setContentType(response, "text/csv; header=present");
+ ServletUtils.setContentType(response, "text/csv; header=present");
sessionConfig.set(SessionConfig.FORMAT_CSV_HEADER, true);
} else {
- IServletResponse.setContentType(response, "text/csv; header=absent");
+ ServletUtils.setContentType(response, "text/csv; header=absent");
}
break;
+ default:
+ throw new IOException("Unknown format " + format);
}
return sessionConfig;
}
@@ -169,45 +160,49 @@
// enable cross-origin resource sharing
response.setHeader("Access-Control-Allow-Origin", "*");
response.setHeader("Access-Control-Allow-Headers", "Origin, X-Requested-With, Content-Type, Accept");
-
SessionConfig sessionConfig = initResponse(request, response);
QueryTranslator.ResultDelivery resultDelivery = whichResultDelivery(request);
- try {
- response.setStatus(HttpResponseStatus.OK);
- IHyracksClientConnection hcc = (IHyracksClientConnection) ctx.get(HYRACKS_CONNECTION_ATTR);
- IHyracksDataset hds = (IHyracksDataset) ctx.get(HYRACKS_DATASET_ATTR);
- if (hds == null) {
- synchronized (ctx) {
- hds = (IHyracksDataset) ctx.get(HYRACKS_DATASET_ATTR);
- if (hds == null) {
- hds = new HyracksDataset(hcc, ResultReader.FRAME_SIZE, ResultReader.NUM_READERS);
- ctx.put(HYRACKS_DATASET_ATTR, hds);
- }
+ doHandle(response, query, sessionConfig, resultDelivery);
+ } catch (Exception e) {
+ response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
+ LOGGER.log(Level.WARNING, "Failure handling request", e);
+ return;
+ }
+ }
+
+ private void doHandle(IServletResponse response, String query, SessionConfig sessionConfig,
+ ResultDelivery resultDelivery) throws JsonProcessingException {
+ try {
+ response.setStatus(HttpResponseStatus.OK);
+ IHyracksClientConnection hcc = (IHyracksClientConnection) ctx.get(HYRACKS_CONNECTION_ATTR);
+ IHyracksDataset hds = (IHyracksDataset) ctx.get(HYRACKS_DATASET_ATTR);
+ if (hds == null) {
+ synchronized (ctx) {
+ hds = (IHyracksDataset) ctx.get(HYRACKS_DATASET_ATTR);
+ if (hds == null) {
+ hds = new HyracksDataset(hcc, ResultReader.FRAME_SIZE, ResultReader.NUM_READERS);
+ ctx.put(HYRACKS_DATASET_ATTR, hds);
}
}
- IParser parser = parserFactory.createParser(query);
- List<Statement> aqlStatements = parser.parse();
- validate(aqlStatements);
- MetadataManager.INSTANCE.init();
- IStatementExecutor translator =
- statementExecutorFactory.create(aqlStatements, sessionConfig, compilationProvider);
- translator.compileAndExecute(hcc, hds, resultDelivery);
- } catch (AsterixException | TokenMgrError | org.apache.asterix.aqlplus.parser.TokenMgrError pe) {
- GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, pe.getMessage(), pe);
- String errorMessage = ResultUtil.buildParseExceptionMessage(pe, query);
- ObjectNode errorResp =
- ResultUtil.getErrorResponse(2, errorMessage, "", ResultUtil.extractFullStackTrace(pe));
- sessionConfig.out().write(new ObjectMapper().writeValueAsString(errorResp));
- response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
- } catch (Exception e) {
- GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, e.getMessage(), e);
- ResultUtil.apiErrorHandler(sessionConfig.out(), e);
- response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
}
- } catch (Exception e) {
- LOGGER.log(Level.WARNING, "Failure handling request", e);
+ IParser parser = parserFactory.createParser(query);
+ List<Statement> aqlStatements = parser.parse();
+ validate(aqlStatements);
+ MetadataManager.INSTANCE.init();
+ IStatementExecutor translator =
+ statementExecutorFactory.create(aqlStatements, sessionConfig, compilationProvider);
+ translator.compileAndExecute(hcc, hds, resultDelivery);
+ } catch (AsterixException | TokenMgrError | org.apache.asterix.aqlplus.parser.TokenMgrError pe) {
response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
- return;
+ GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, pe.getMessage(), pe);
+ String errorMessage = ResultUtil.buildParseExceptionMessage(pe, query);
+ ObjectNode errorResp =
+ ResultUtil.getErrorResponse(2, errorMessage, "", ResultUtil.extractFullStackTrace(pe));
+ sessionConfig.out().write(new ObjectMapper().writeValueAsString(errorResp));
+ } catch (Exception e) {
+ GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, e.getMessage(), e);
+ response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
+ ResultUtil.apiErrorHandler(sessionConfig.out(), e);
}
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ShutdownApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ShutdownApiServlet.java
index 25be651..dc48288 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ShutdownApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ShutdownApiServlet.java
@@ -30,10 +30,11 @@
import org.apache.asterix.common.config.GlobalConfig;
import org.apache.asterix.runtime.util.ClusterStateManager;
import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.http.api.IServlet;
+import org.apache.hyracks.http.api.IServletRequest;
+import org.apache.hyracks.http.api.IServletResponse;
import org.apache.hyracks.http.server.AbstractServlet;
-import org.apache.hyracks.http.server.IServlet;
-import org.apache.hyracks.http.server.IServletRequest;
-import org.apache.hyracks.http.server.IServletResponse;
+import org.apache.hyracks.http.server.util.ServletUtils;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
@@ -70,8 +71,7 @@
}, "Shutdown Servlet Worker");
try {
- IServletResponse.setContentType(response, IServlet.ContentType.APPLICATION_JSON,
- IServlet.Encoding.UTF8);
+ ServletUtils.setContentType(response, IServlet.ContentType.APPLICATION_JSON, IServlet.Encoding.UTF8);
} catch (IOException e) {
LOGGER.log(Level.WARNING, "Failure handling request", e);
response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/UpdateApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/UpdateApiServlet.java
index 1c173d2..0a0e680 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/UpdateApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/UpdateApiServlet.java
@@ -23,7 +23,7 @@
import org.apache.asterix.compiler.provider.ILangCompilationProvider;
import org.apache.asterix.lang.common.base.Statement;
import org.apache.asterix.translator.IStatementExecutorFactory;
-import org.apache.hyracks.http.server.IServletRequest;
+import org.apache.hyracks.http.api.IServletRequest;
public class UpdateApiServlet extends RestApiServlet {
private static final byte ALLOWED_CATEGORIES = Statement.Category.QUERY | Statement.Category.UPDATE;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/VersionApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/VersionApiServlet.java
index e5ceccc..5899660 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/VersionApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/VersionApiServlet.java
@@ -28,10 +28,11 @@
import java.util.logging.Logger;
import org.apache.asterix.runtime.util.AppContextInfo;
+import org.apache.hyracks.http.api.IServlet;
+import org.apache.hyracks.http.api.IServletRequest;
+import org.apache.hyracks.http.api.IServletResponse;
import org.apache.hyracks.http.server.AbstractServlet;
-import org.apache.hyracks.http.server.IServlet;
-import org.apache.hyracks.http.server.IServletRequest;
-import org.apache.hyracks.http.server.IServletResponse;
+import org.apache.hyracks.http.server.util.ServletUtils;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -61,7 +62,7 @@
responseObject.put(e.getKey(), e.getValue());
}
try {
- IServletResponse.setContentType(response, IServlet.ContentType.TEXT_PLAIN, IServlet.Encoding.UTF8);
+ ServletUtils.setContentType(response, IServlet.ContentType.TEXT_PLAIN, IServlet.Encoding.UTF8);
} catch (IOException e) {
LOGGER.log(Level.WARNING, "Failure handling request", e);
response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
index 3a8852e..b17a722 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
@@ -72,8 +72,8 @@
import org.apache.hyracks.api.messages.IMessageBroker;
import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.control.common.controllers.CCConfig;
+import org.apache.hyracks.http.api.IServlet;
import org.apache.hyracks.http.server.HttpServer;
-import org.apache.hyracks.http.server.IServlet;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.servlet.ServletMapping;
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorApiLetTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorApiLetTest.java
index b1fdab5..d34a9cf 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorApiLetTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorApiLetTest.java
@@ -43,8 +43,8 @@
import org.apache.hyracks.api.comm.NetworkAddress;
import org.apache.hyracks.api.io.FileSplit;
import org.apache.hyracks.api.io.ManagedFileSplit;
-import org.apache.hyracks.http.server.IServletRequest;
-import org.apache.hyracks.http.server.IServletResponse;
+import org.apache.hyracks.http.api.IServletRequest;
+import org.apache.hyracks.http.api.IServletResponse;
import org.junit.Assert;
import org.junit.Test;
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/VersionApiLetTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/VersionApiLetTest.java
index 936b717..b482948 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/VersionApiLetTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/VersionApiLetTest.java
@@ -35,8 +35,8 @@
import org.apache.asterix.runtime.util.AppContextInfo;
import org.apache.asterix.test.runtime.SqlppExecutionTest;
import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.http.server.IServletRequest;
-import org.apache.hyracks.http.server.IServletResponse;
+import org.apache.hyracks.http.api.IServletRequest;
+import org.apache.hyracks.http.api.IServletResponse;
import org.junit.Assert;
import org.junit.Test;
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/IServlet.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IServlet.java
similarity index 98%
rename from hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/IServlet.java
rename to hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IServlet.java
index 5691fd9..b079d36 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/IServlet.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IServlet.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.hyracks.http.server;
+package org.apache.hyracks.http.api;
import java.util.concurrent.ConcurrentMap;
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/IServletRequest.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IServletRequest.java
similarity index 67%
rename from hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/IServletRequest.java
rename to hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IServletRequest.java
index 8aebd07..610c3d1 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/IServletRequest.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IServletRequest.java
@@ -16,10 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.hyracks.http.server;
-
-import java.util.List;
-import java.util.Map;
+package org.apache.hyracks.http.api;
import io.netty.handler.codec.http.FullHttpRequest;
@@ -34,6 +31,7 @@
/**
* Get a request parameter
+ *
* @param name
* @return the parameter or null if not found
*/
@@ -41,23 +39,21 @@
/**
* Get a request header
+ *
* @param name
* @return the header or null if not found
*/
String getHeader(CharSequence name);
- static String getParameter(Map<String, List<String>> parameters, CharSequence name) {
- List<String> parameter = parameters.get(name);
- if (parameter == null) {
- return null;
- } else if (parameter.size() == 1) {
- return parameter.get(0);
- } else {
- StringBuilder aString = new StringBuilder(parameter.get(0));
- for (int i = 1; i < parameter.size(); i++) {
- aString.append(",").append(parameter.get(i));
- }
- return aString.toString();
- }
+ /**
+ * Get a request header if found, return the default value, otherwise
+ *
+ * @param name
+ * @param defaultValue
+ * @return the header or defaultValue if not found
+ */
+ default String getHeader(CharSequence name, String defaultValue) {
+ String value = getHeader(name);
+ return value == null ? defaultValue : value;
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/IServletResponse.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IServletResponse.java
similarity index 60%
rename from hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/IServletResponse.java
rename to hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IServletResponse.java
index 342e643..1a7c65f 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/IServletResponse.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IServletResponse.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.hyracks.http.server;
+package org.apache.hyracks.http.api;
import java.io.Closeable;
import java.io.IOException;
@@ -24,54 +24,62 @@
import java.io.PrintWriter;
import io.netty.channel.ChannelFuture;
-import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpResponseStatus;
/**
- * A response to an instance of IServLetRequest
+ * A response to an instance of IServletRequest
*/
public interface IServletResponse extends Closeable {
/**
* Set a response header
+ *
* @param name
+ * the name of the header
* @param value
+ * the value of the header
* @return
- * @throws Exception
+ * the servlet response with the header set
+ * @throws IOException
*/
IServletResponse setHeader(CharSequence name, Object value) throws IOException;
/**
- * Get the output writer for the response
- * @return
- * @throws Exception
- */
- PrintWriter writer();
-
- /**
- * Send the last http response if any and return the future
- * @return
- * @throws Exception
- */
- ChannelFuture future() throws IOException;
-
- /**
* Set the status of the http response
+ *
* @param status
*/
void setStatus(HttpResponseStatus status);
/**
+ * Get the output writer for the response which writes to the response output stream
+ *
+ * @return the response writer
+ */
+ PrintWriter writer();
+
+ /**
* Get the output stream for the response
- * @return
+ *
+ * @return the response output stream
*/
OutputStream outputStream();
- public static void setContentType(IServletResponse response, String type, String charset) throws IOException {
- response.setHeader(HttpHeaderNames.CONTENT_TYPE, type + "; charset=" + charset);
- }
+ /**
+ * Get last content future
+ * Must only be called after the servlet response has been closed
+ * Used to listen to events about the last content sent through the network
+ * For example, to close the connection after the event has been completed
+ * lastContentFuture().addListener(ChannelFutureListener.CLOSE);
+ *
+ * @return
+ * @throws IOException
+ */
+ ChannelFuture lastContentFuture() throws IOException;
- public static void setContentType(IServletResponse response, String type) throws IOException {
- response.setHeader(HttpHeaderNames.CONTENT_TYPE, type);
- }
+ /**
+ * Notifies the response that the channel has become writable
+ * became writable or unwritable. Used for flow control
+ */
+ void notifyChannelWritable();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/AbstractServlet.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/AbstractServlet.java
index 22bbc50..7d24994 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/AbstractServlet.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/AbstractServlet.java
@@ -20,6 +20,9 @@
import java.util.concurrent.ConcurrentMap;
+import org.apache.hyracks.http.api.IServlet;
+import org.apache.hyracks.http.api.IServletRequest;
+
public abstract class AbstractServlet implements IServlet {
protected final String[] paths;
protected final ConcurrentMap<String, Object> ctx;
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java
index 984122b..65cdd52 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java
@@ -20,6 +20,8 @@
import java.io.IOException;
import java.io.OutputStream;
+import java.util.logging.Level;
+import java.util.logging.Logger;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
@@ -28,26 +30,24 @@
public class ChunkedNettyOutputStream extends OutputStream {
+ private static final Logger LOGGER = Logger.getLogger(ChunkedNettyOutputStream.class.getName());
private final ChannelHandlerContext ctx;
private final ChunkedResponse response;
private ByteBuf buffer;
- public ChunkedNettyOutputStream(ChannelHandlerContext ctx, int chunkSize,
- ChunkedResponse response) {
+ public ChunkedNettyOutputStream(ChannelHandlerContext ctx, int chunkSize, ChunkedResponse response) {
this.response = response;
this.ctx = ctx;
buffer = ctx.alloc().buffer(chunkSize);
}
@Override
- public synchronized void write(byte[] b, int off, int len) {
- if ((off < 0) || (off > b.length) || (len < 0) ||
- ((off + len) > b.length)) {
+ public void write(byte[] b, int off, int len) throws IOException {
+ if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length)) {
throw new IndexOutOfBoundsException();
} else if (len == 0) {
return;
}
-
if (len > buffer.capacity()) {
flush();
flush(b, off, len);
@@ -64,45 +64,68 @@
}
@Override
- public synchronized void write(int b) {
- if (buffer.writableBytes() > 0) {
- buffer.writeByte(b);
- } else {
+ public void write(int b) throws IOException {
+ if (!buffer.isWritable()) {
flush();
- buffer.writeByte(b);
}
+ buffer.writeByte(b);
}
@Override
- public synchronized void close() throws IOException {
- flush();
- buffer.release();
+ public void close() throws IOException {
+ if (response.isHeaderSent() || response.status() != HttpResponseStatus.OK) {
+ flush();
+ buffer.release();
+ } else {
+ response.fullReponse(buffer);
+ }
super.close();
}
@Override
- public synchronized void flush() {
+ public void flush() throws IOException {
+ ensureWritable();
if (buffer.readableBytes() > 0) {
- int size = buffer.capacity();
if (response.status() == HttpResponseStatus.OK) {
- response.flush();
+ int size = buffer.capacity();
+ response.beforeFlush();
DefaultHttpContent content = new DefaultHttpContent(buffer);
- ctx.write(content);
+ ctx.write(content, ctx.channel().voidPromise());
+ buffer = ctx.alloc().buffer(size);
} else {
- response.error(buffer);
+ ByteBuf aBuffer = ctx.alloc().buffer(buffer.readableBytes());
+ aBuffer.writeBytes(buffer);
+ response.error(aBuffer);
}
- buffer = ctx.alloc().buffer(size);
}
}
- private synchronized void flush(byte[] buf, int offset, int len) {
+ private void flush(byte[] buf, int offset, int len) throws IOException {
+ ensureWritable();
ByteBuf aBuffer = ctx.alloc().buffer(len);
aBuffer.writeBytes(buf, offset, len);
if (response.status() == HttpResponseStatus.OK) {
- response.flush();
- ctx.write(new DefaultHttpContent(aBuffer));
+ response.beforeFlush();
+ ctx.write(new DefaultHttpContent(aBuffer), ctx.channel().voidPromise());
} else {
response.error(aBuffer);
}
}
+
+ private synchronized void ensureWritable() throws IOException {
+ while (!ctx.channel().isWritable()) {
+ try {
+ ctx.flush();
+ wait();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOGGER.log(Level.WARNING, "Interupted while waiting for channel to be writable", e);
+ throw new IOException(e);
+ }
+ }
+ }
+
+ public synchronized void resume() {
+ notifyAll();
+ }
}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java
index 19c2664..cb0cd80 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java
@@ -18,25 +18,44 @@
*/
package org.apache.hyracks.http.server;
-import static io.netty.handler.codec.http.HttpResponseStatus.OK;
-import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
-
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintWriter;
+import org.apache.hyracks.http.api.IServletResponse;
+
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
+import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpUtil;
+import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.LastHttpContent;
+/**
+ * A chunked http response. Here is how it is expected to work:
+ * If the response is a success aka 200 and is less than chunkSize, then it is sent as a single http response message
+ * If the response is larger than the chunkSize and the response status is 200, then it is sent as chunks of chunkSize.
+ * If the response status is non 200, then it is always sent as a single http response message.
+ * If the response status is non 200, then output buffered before setting the response status is discarded.
+ * If flush() is called on the writer and even if it is less than chunkSize, then the initial response will be sent
+ * with headers, followed by the buffered bytes as the first chunk.
+ * When chunking, an output buffer is allocated only when the previous buffer has been sent
+ * If an error occurs after sending the first chunk, the connection will close abruptly.
+ *
+ * Here is a breakdown of the possible cases.
+ * 1. smaller than chunkSize, no error -> full response
+ * 2. smaller than chunkSize, error -> full response
+ * 3. larger than chunkSize, error after header-> close connection. release buffer and release error
+ * 4. larger than chunkSize, no error. -> header, data, empty response
+ */
public class ChunkedResponse implements IServletResponse {
private final ChannelHandlerContext ctx;
private final ChunkedNettyOutputStream outputStream;
@@ -45,12 +64,13 @@
private boolean headerSent;
private ByteBuf error;
private ChannelFuture future;
+ private boolean done;
- public ChunkedResponse(ChannelHandlerContext ctx, FullHttpRequest request) {
+ public ChunkedResponse(ChannelHandlerContext ctx, FullHttpRequest request, int chunkSize) {
this.ctx = ctx;
- outputStream = new ChunkedNettyOutputStream(ctx, 4096, this);
+ outputStream = new ChunkedNettyOutputStream(ctx, chunkSize, this);
writer = new PrintWriter(outputStream);
- response = new DefaultHttpResponse(HTTP_1_1, OK);
+ response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.INTERNAL_SERVER_ERROR);
response.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED);
if (HttpUtil.isKeepAlive(request)) {
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
@@ -67,7 +87,7 @@
}
@Override
- public ChannelFuture future() {
+ public ChannelFuture lastContentFuture() {
return future;
}
@@ -78,25 +98,45 @@
@Override
public void close() throws IOException {
- if (error == null) {
- writer.close();
- future = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
+ writer.close();
+ if (error == null && response.status() == HttpResponseStatus.OK) {
+ if (!done) {
+ future = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
+ }
+ } else {
+ // There was an error
+ if (headerSent) {
+ if (error != null) {
+ error.release();
+ }
+ future = ctx.channel().close();
+ } else {
+ // we didn't send anything to the user, we need to send an unchunked error response
+ fullResponse(response.protocolVersion(), response.status(),
+ error == null ? ctx.alloc().buffer(0, 0) : error, response.headers());
+ }
}
+ done = true;
}
public HttpResponseStatus status() {
return response.status();
}
- public void flush() {
+ public void beforeFlush() {
if (!headerSent && response.status() == HttpResponseStatus.OK) {
- ctx.writeAndFlush(response);
+ ctx.write(response, ctx.channel().voidPromise());
headerSent = true;
}
}
public void error(ByteBuf error) {
- this.error = error;
+ if (this.error == null) {
+ this.error = error;
+ } else {
+ this.error.capacity(this.error.capacity() + error.capacity());
+ this.error.writeBytes(error);
+ }
}
@Override
@@ -106,8 +146,28 @@
@Override
public void setStatus(HttpResponseStatus status) {
- // update the response
- // close the stream
- // write the response
+ response.setStatus(status);
+ }
+
+ public boolean isHeaderSent() {
+ return headerSent;
+ }
+
+ public void fullReponse(ByteBuf buffer) {
+ fullResponse(response.protocolVersion(), response.status(), buffer, response.headers());
+ }
+
+ private void fullResponse(HttpVersion version, HttpResponseStatus status, ByteBuf buffer, HttpHeaders headers) {
+ DefaultFullHttpResponse fullResponse = new DefaultFullHttpResponse(version, status, buffer);
+ fullResponse.headers().set(headers);
+ fullResponse.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, buffer.readableBytes());
+ future = ctx.writeAndFlush(fullResponse);
+ headerSent = true;
+ done = true;
+ }
+
+ @Override
+ public void notifyChannelWritable() {
+ outputStream.resume();
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/FullResponse.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/FullResponse.java
index 245f28a..1023686 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/FullResponse.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/FullResponse.java
@@ -23,6 +23,8 @@
import java.io.OutputStream;
import java.io.PrintWriter;
+import org.apache.hyracks.http.api.IServletResponse;
+
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
@@ -80,7 +82,7 @@
}
@Override
- public ChannelFuture future() throws IOException {
+ public ChannelFuture lastContentFuture() throws IOException {
return future;
}
@@ -93,4 +95,10 @@
public void setStatus(HttpResponseStatus status) {
response.setStatus(status);
}
+
+ @Override
+ public void notifyChannelWritable() {
+ // Do nothing.
+ // This response is sent as a single piece
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/GetRequest.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/GetRequest.java
index 0b80a78..8d308d3 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/GetRequest.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/GetRequest.java
@@ -21,6 +21,9 @@
import java.util.List;
import java.util.Map;
+import org.apache.hyracks.http.api.IServletRequest;
+import org.apache.hyracks.http.server.util.ServletUtils;
+
import io.netty.handler.codec.http.FullHttpRequest;
public class GetRequest implements IServletRequest {
@@ -39,12 +42,11 @@
@Override
public String getParameter(CharSequence name) {
- return IServletRequest.getParameter(parameters, name);
+ return ServletUtils.getParameter(parameters, name);
}
@Override
public String getHeader(CharSequence name) {
return request.headers().get(name);
}
-
}
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestHandler.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestHandler.java
new file mode 100644
index 0000000..418cd26
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestHandler.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.http.server;
+
+import java.io.IOException;
+import java.util.concurrent.Callable;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.hyracks.http.api.IServlet;
+import org.apache.hyracks.http.api.IServletRequest;
+import org.apache.hyracks.http.api.IServletResponse;
+
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpUtil;
+
+public class HttpRequestHandler implements Callable<Void> {
+ private static final Logger LOGGER = Logger.getLogger(HttpRequestHandler.class.getName());
+ private final ChannelHandlerContext ctx;
+ private final IServlet servlet;
+ private final IServletRequest request;
+ private final IServletResponse response;
+
+ public HttpRequestHandler(ChannelHandlerContext ctx, IServlet servlet, IServletRequest request, int chunkSize) {
+ this.ctx = ctx;
+ this.servlet = servlet;
+ this.request = request;
+ response = chunkSize == 0 ? new FullResponse(ctx, request.getHttpRequest())
+ : new ChunkedResponse(ctx, request.getHttpRequest(), chunkSize);
+ request.getHttpRequest().retain();
+ }
+
+ @Override
+ public Void call() throws Exception {
+ try {
+ ChannelFuture lastContentFuture = handle();
+ if (!HttpUtil.isKeepAlive(request.getHttpRequest())) {
+ lastContentFuture.addListener(ChannelFutureListener.CLOSE);
+ }
+ } catch (Throwable th) { //NOSONAR
+ LOGGER.log(Level.SEVERE, "Failure handling HTTP Request", th);
+ ctx.close();
+ } finally {
+ request.getHttpRequest().release();
+ }
+ return null;
+ }
+
+ private ChannelFuture handle() throws IOException {
+ try {
+ servlet.handle(request, response);
+ } catch (Exception e) {
+ LOGGER.log(Level.WARNING, "Failure during handling of an IServletRequest", e);
+ response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
+ } finally {
+ response.close();
+ }
+ return response.lastContentFuture();
+ }
+
+ public void notifyChannelWritable() {
+ response.notifyChannelWritable();
+ }
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
index f7f55bd..302e5f3 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
@@ -23,12 +23,19 @@
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
+import org.apache.hyracks.http.api.IServlet;
+
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
+import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
+import io.netty.channel.WriteBufferWaterMark;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.logging.LogLevel;
@@ -36,6 +43,8 @@
public class HttpServer {
// Constants
+ private static final int LOW_WRITE_BUFFER_WATER_MARK = 8 * 1024;
+ private static final int HIGH_WRITE_BUFFER_WATER_MARK = 32 * 1024;
private static final Logger LOGGER = Logger.getLogger(HttpServer.class.getName());
private static final int FAILED = -1;
private static final int STOPPED = 0;
@@ -44,23 +53,26 @@
private static final int STOPPING = 3;
// Final members
private final Object lock = new Object();
+ private final AtomicInteger threadId = new AtomicInteger();
private final ConcurrentMap<String, Object> ctx;
private final List<IServlet> lets;
private final EventLoopGroup bossGroup;
private final EventLoopGroup workerGroup;
private final int port;
+ private final ExecutorService executor;
// Mutable members
private volatile int state = STOPPED;
private Channel channel;
private Throwable cause;
- public HttpServer(EventLoopGroup bossGroup, EventLoopGroup workerGroup,
- int port) {
+ public HttpServer(EventLoopGroup bossGroup, EventLoopGroup workerGroup, int port) {
this.bossGroup = bossGroup;
this.workerGroup = workerGroup;
this.port = port;
ctx = new ConcurrentHashMap<>();
lets = new ArrayList<>();
+ executor = Executors.newFixedThreadPool(16,
+ runnable -> new Thread(runnable, "HttpExecutor(port:" + port + ")-" + threadId.getAndIncrement()));
}
public final void start() throws Exception { // NOSONAR
@@ -158,7 +170,6 @@
lets.add(let);
}
-
protected void doStart() throws InterruptedException {
/*
* This is a hacky way to ensure that ILets with more specific paths are checked first.
@@ -172,14 +183,13 @@
*/
Collections.sort(lets, (l1, l2) -> l2.getPaths()[0].length() - l1.getPaths()[0].length());
ServerBootstrap b = new ServerBootstrap();
- b.group(bossGroup, workerGroup)
- .channel(NioServerSocketChannel.class)
- .handler(new LoggingHandler(LogLevel.INFO))
- .childHandler(new HttpServerInitializer(this));
+ b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
+ .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,
+ new WriteBufferWaterMark(LOW_WRITE_BUFFER_WATER_MARK, HIGH_WRITE_BUFFER_WATER_MARK))
+ .handler(new LoggingHandler(LogLevel.INFO)).childHandler(new HttpServerInitializer(this));
channel = b.bind(port).sync().channel();
}
-
protected void doStop() throws InterruptedException {
channel.close();
channel.closeFuture().sync();
@@ -212,14 +222,18 @@
return true;
}
} else if (c == '*') {
- return path.regionMatches(path.length() - pathSpec.length() + 1,
- pathSpec, 1, pathSpec.length() - 1);
+ return path.regionMatches(path.length() - pathSpec.length() + 1, pathSpec, 1, pathSpec.length() - 1);
}
return false;
}
+
private static boolean isPathWildcardMatch(String pathSpec, String path) {
int cpl = pathSpec.length() - 2;
return (pathSpec.endsWith("/*") && path.regionMatches(0, pathSpec, 0, cpl))
&& (path.length() == cpl || '/' == path.charAt(cpl));
}
+
+ public ExecutorService getExecutor() {
+ return executor;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
index c8ed937..2268c2c 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
@@ -18,13 +18,12 @@
*/
package org.apache.hyracks.http.server;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
-import io.netty.channel.ChannelFuture;
+import org.apache.hyracks.http.api.IServlet;
+import org.apache.hyracks.http.server.util.ServletUtils;
+
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
@@ -32,21 +31,17 @@
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpResponseStatus;
-import io.netty.handler.codec.http.HttpUtil;
-import io.netty.handler.codec.http.QueryStringDecoder;
-import io.netty.handler.codec.http.multipart.Attribute;
-import io.netty.handler.codec.http.multipart.HttpPostRequestDecoder;
-import io.netty.handler.codec.http.multipart.InterfaceHttpData;
-import io.netty.handler.codec.http.multipart.InterfaceHttpData.HttpDataType;
-import io.netty.handler.codec.http.multipart.MixedAttribute;
public class HttpServerHandler extends SimpleChannelInboundHandler<Object> {
private static final Logger LOGGER = Logger.getLogger(HttpServerHandler.class.getName());
protected final HttpServer server;
+ protected final int chunkSize;
+ protected HttpRequestHandler handler;
- public HttpServerHandler(HttpServer server) {
+ public HttpServerHandler(HttpServer server, int chunkSize) {
this.server = server;
+ this.chunkSize = chunkSize;
}
@Override
@@ -55,35 +50,29 @@
}
@Override
+ public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
+ if (ctx.channel().isWritable()) {
+ handler.notifyChannelWritable();
+ }
+ super.channelWritabilityChanged(ctx);
+ }
+
+ @Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
try {
- FullHttpRequest http = (FullHttpRequest) msg;
- IServlet servlet = server.getServlet(http);
+ FullHttpRequest request = (FullHttpRequest) msg;
+ IServlet servlet = server.getServlet(request);
if (servlet == null) {
- DefaultHttpResponse response = new DefaultHttpResponse(http.protocolVersion(),
- HttpResponseStatus.NOT_FOUND);
- ctx.write(response).addListener(ChannelFutureListener.CLOSE);
+ DefaultHttpResponse notFound =
+ new DefaultHttpResponse(request.protocolVersion(), HttpResponseStatus.NOT_FOUND);
+ ctx.write(notFound).addListener(ChannelFutureListener.CLOSE);
+ } else if (request.method() != HttpMethod.GET && request.method() != HttpMethod.POST) {
+ DefaultHttpResponse notAllowed =
+ new DefaultHttpResponse(request.protocolVersion(), HttpResponseStatus.METHOD_NOT_ALLOWED);
+ ctx.write(notAllowed).addListener(ChannelFutureListener.CLOSE);
} else {
- if (http.method() != HttpMethod.GET && http.method() != HttpMethod.POST) {
- DefaultHttpResponse response = new DefaultHttpResponse(http.protocolVersion(),
- HttpResponseStatus.METHOD_NOT_ALLOWED);
- ctx.write(response).addListener(ChannelFutureListener.CLOSE);
- return;
- }
- IServletRequest request = http.method() == HttpMethod.GET ? get(http) : post(http);
- IServletResponse response = new FullResponse(ctx, http);
- try {
- servlet.handle(request, response);
- } catch (Throwable th) { // NOSONAR
- LOGGER.log(Level.WARNING, "Failure during handling of an IServLetRequest", th);
- response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
- } finally {
- response.close();
- }
- ChannelFuture lastContentFuture = response.future();
- if (!HttpUtil.isKeepAlive(http)) {
- lastContentFuture.addListener(ChannelFutureListener.CLOSE);
- }
+ handler = new HttpRequestHandler(ctx, servlet, ServletUtils.toServletRequest(request), chunkSize);
+ server.getExecutor().submit(handler);
}
} catch (Exception e) {
LOGGER.log(Level.SEVERE, "Failure handling HTTP Request", e);
@@ -91,38 +80,6 @@
}
}
- public static IServletRequest post(FullHttpRequest request) throws IOException {
- List<String> names = new ArrayList<>();
- List<String> values = new ArrayList<>();
- HttpPostRequestDecoder decoder = null;
- try {
- decoder = new HttpPostRequestDecoder(request);
- } catch (Exception e) {
- //ignore. this means that the body of the POST request does not have key value pairs
- LOGGER.log(Level.WARNING, "Failed to decode a post message. Fix the API not to have queries as POST body",
- e);
- }
- if (decoder != null) {
- try {
- List<InterfaceHttpData> bodyHttpDatas = decoder.getBodyHttpDatas();
- for (InterfaceHttpData data : bodyHttpDatas) {
- if (data.getHttpDataType().equals(HttpDataType.Attribute)) {
- Attribute attr = (MixedAttribute) data;
- names.add(data.getName());
- values.add(attr.getValue());
- }
- }
- } finally {
- decoder.destroy();
- }
- }
- return new PostRequest(request, new QueryStringDecoder(request.uri()).parameters(), names, values);
- }
-
- public static IServletRequest get(FullHttpRequest request) throws IOException {
- return new GetRequest(request, new QueryStringDecoder(request.uri()).parameters());
- }
-
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
LOGGER.log(Level.SEVERE, "Failure handling HTTP Request", cause);
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerInitializer.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerInitializer.java
index 3b32ee6..bc67865 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerInitializer.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerInitializer.java
@@ -27,9 +27,10 @@
public class HttpServerInitializer extends ChannelInitializer<SocketChannel> {
- private static final int MAX_CHUNK_SIZE = 262144;
- private static final int MAX_HEADER_SIZE = 262144;
- private static final int MAX_INITIAL_LINE_LENGTH = 131072;
+ private static final int MAX_REQUEST_CHUNK_SIZE = 262144;
+ private static final int MAX_REQUEST_HEADER_SIZE = 262144;
+ private static final int MAX_REQUEST_INITIAL_LINE_LENGTH = 131072;
+ private static final int RESPONSE_CHUNK_SIZE = 4096;
private HttpServer server;
public HttpServerInitializer(HttpServer server) {
@@ -39,9 +40,10 @@
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
- p.addLast(new HttpRequestDecoder(MAX_INITIAL_LINE_LENGTH, MAX_HEADER_SIZE, MAX_CHUNK_SIZE));
+ p.addLast(new HttpRequestDecoder(MAX_REQUEST_INITIAL_LINE_LENGTH, MAX_REQUEST_HEADER_SIZE,
+ MAX_REQUEST_CHUNK_SIZE));
p.addLast(new HttpResponseEncoder());
p.addLast(new HttpObjectAggregator(Integer.MAX_VALUE));
- p.addLast(new HttpServerHandler(server));
+ p.addLast(new HttpServerHandler(server, RESPONSE_CHUNK_SIZE));
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/PostRequest.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/PostRequest.java
index 99f338c..338ef40 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/PostRequest.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/PostRequest.java
@@ -21,6 +21,9 @@
import java.util.List;
import java.util.Map;
+import org.apache.hyracks.http.api.IServletRequest;
+import org.apache.hyracks.http.server.util.ServletUtils;
+
import io.netty.handler.codec.http.FullHttpRequest;
public class PostRequest implements IServletRequest {
@@ -49,7 +52,7 @@
return values.get(i);
}
}
- return IServletRequest.getParameter(parameters, name);
+ return ServletUtils.getParameter(parameters, name);
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/util/ServletUtils.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/util/ServletUtils.java
new file mode 100644
index 0000000..1ffab6d
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/util/ServletUtils.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.http.server.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.hyracks.http.api.IServletRequest;
+import org.apache.hyracks.http.api.IServletResponse;
+import org.apache.hyracks.http.server.GetRequest;
+import org.apache.hyracks.http.server.PostRequest;
+
+import io.netty.handler.codec.http.FullHttpRequest;
+import io.netty.handler.codec.http.HttpHeaderNames;
+import io.netty.handler.codec.http.HttpMethod;
+import io.netty.handler.codec.http.QueryStringDecoder;
+import io.netty.handler.codec.http.multipart.Attribute;
+import io.netty.handler.codec.http.multipart.HttpPostRequestDecoder;
+import io.netty.handler.codec.http.multipart.InterfaceHttpData;
+import io.netty.handler.codec.http.multipart.InterfaceHttpData.HttpDataType;
+import io.netty.handler.codec.http.multipart.MixedAttribute;
+
+public class ServletUtils {
+ private static final Logger LOGGER = Logger.getLogger(ServletUtils.class.getName());
+
+ private ServletUtils() {
+ }
+
+ public static String getParameter(Map<String, List<String>> parameters, CharSequence name) {
+ List<String> parameter = parameters.get(name);
+ if (parameter == null) {
+ return null;
+ } else if (parameter.size() == 1) {
+ return parameter.get(0);
+ } else {
+ StringBuilder aString = new StringBuilder(parameter.get(0));
+ for (int i = 1; i < parameter.size(); i++) {
+ aString.append(",").append(parameter.get(i));
+ }
+ return aString.toString();
+ }
+ }
+
+ public static IServletRequest post(FullHttpRequest request) throws IOException {
+ List<String> names = new ArrayList<>();
+ List<String> values = new ArrayList<>();
+ HttpPostRequestDecoder decoder = null;
+ try {
+ decoder = new HttpPostRequestDecoder(request);
+ } catch (Exception e) {
+ //ignore. this means that the body of the POST request does not have key value pairs
+ LOGGER.log(Level.WARNING, "Failed to decode a post message. Fix the API not to have queries as POST body",
+ e);
+ }
+ if (decoder != null) {
+ try {
+ List<InterfaceHttpData> bodyHttpDatas = decoder.getBodyHttpDatas();
+ for (InterfaceHttpData data : bodyHttpDatas) {
+ if (data.getHttpDataType().equals(HttpDataType.Attribute)) {
+ Attribute attr = (MixedAttribute) data;
+ names.add(data.getName());
+ values.add(attr.getValue());
+ }
+ }
+ } finally {
+ decoder.destroy();
+ }
+ }
+ return new PostRequest(request, new QueryStringDecoder(request.uri()).parameters(), names, values);
+ }
+
+ public static IServletRequest get(FullHttpRequest request) throws IOException {
+ return new GetRequest(request, new QueryStringDecoder(request.uri()).parameters());
+ }
+
+ public static IServletRequest toServletRequest(FullHttpRequest request) throws IOException {
+ return request.method() == HttpMethod.GET ? ServletUtils.get(request) : ServletUtils.post(request);
+ }
+
+ public static void setContentType(IServletResponse response, String type, String charset) throws IOException {
+ response.setHeader(HttpHeaderNames.CONTENT_TYPE, type + "; charset=" + charset);
+ }
+
+ public static void setContentType(IServletResponse response, String type) throws IOException {
+ response.setHeader(HttpHeaderNames.CONTENT_TYPE, type);
+ }
+}