[ASTERIXDB-1955][CLUS][RT] Refactor, Keep-Alive
- Add getFaultToleranceStrategy to ICcApplicationContext interface,
eliminate casts to implementation where interface suffices
- Minor refactoring in ResultUtil
- Don't close keep-alive connections in case of Unauthorized (401)
status
Change-Id: Id75dd55861976390b1098a496ff2c0345c991389
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1855
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ResultUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ResultUtil.java
index f004446..fa2f667 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ResultUtil.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ResultUtil.java
@@ -116,19 +116,26 @@
}
public static void printError(PrintWriter pw, Throwable e, boolean comma) {
+ printError(pw, e, 1, comma);
+ }
+
+ public static void printError(PrintWriter pw, Throwable e, int code, boolean comma) {
Throwable rootCause = getRootCause(e);
- final boolean addStack = false;
- pw.print("\t\"");
- pw.print(AbstractQueryApiServlet.ResultFields.ERRORS.str());
- pw.print("\": [{ \n");
- printField(pw, QueryServiceServlet.ErrorField.CODE.str(), "1");
String msg = rootCause.getMessage();
if (!(rootCause instanceof AlgebricksException || rootCause instanceof HyracksException
|| rootCause instanceof TokenMgrError
|| rootCause instanceof org.apache.asterix.aqlplus.parser.TokenMgrError)) {
msg = rootCause.getClass().getSimpleName() + (msg == null ? "" : ": " + msg);
}
- printField(pw, QueryServiceServlet.ErrorField.MSG.str(), JSONUtil.escape(msg), addStack);
+ printError(pw, msg, code, comma);
+ }
+
+ public static void printError(PrintWriter pw, String msg, int code, boolean comma) {
+ pw.print("\t\"");
+ pw.print(AbstractQueryApiServlet.ResultFields.ERRORS.str());
+ pw.print("\": [{ \n");
+ printField(pw, QueryServiceServlet.ErrorField.CODE.str(), code);
+ printField(pw, QueryServiceServlet.ErrorField.MSG.str(), JSONUtil.escape(msg), false);
pw.print(comma ? "\t}],\n" : "\t}]\n");
}
@@ -136,6 +143,10 @@
printField(pw, name, value, true);
}
+ public static void printField(PrintWriter pw, String name, long value) {
+ printField(pw, name, value, true);
+ }
+
public static void printField(PrintWriter pw, String name, String value, boolean comma) {
printFieldInternal(pw, name, "\"" + value + "\"", comma);
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java
index 03c7ac6..a01d70a 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java
@@ -21,7 +21,6 @@
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
import org.apache.asterix.common.replication.INCLifecycleMessage;
-import org.apache.asterix.runtime.utils.CcApplicationContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
public class NCLifecycleTaskReportMessage implements INCLifecycleMessage, ICcAddressedMessage {
@@ -38,7 +37,7 @@
@Override
public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
- ((CcApplicationContext) appCtx).getFaultToleranceStrategy().process(this);
+ appCtx.getFaultToleranceStrategy().process(this);
}
public String getNodeId() {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskRequestMessage.java
index cfe999c..21dee9c 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskRequestMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskRequestMessage.java
@@ -26,7 +26,6 @@
import org.apache.asterix.common.messaging.api.INCMessageBroker;
import org.apache.asterix.common.replication.INCLifecycleMessage;
import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
-import org.apache.asterix.runtime.utils.CcApplicationContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.control.nc.NodeControllerService;
@@ -54,7 +53,7 @@
@Override
public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
- ((CcApplicationContext) appCtx).getFaultToleranceStrategy().process(this);
+ appCtx.getFaultToleranceStrategy().process(this);
}
public SystemState getState() {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index 21afdf1..35e0466 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -62,6 +62,7 @@
import org.apache.asterix.common.config.ExternalProperties;
import org.apache.asterix.common.config.MetadataProperties;
import org.apache.asterix.common.context.IStorageComponentProvider;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.library.ILibraryManager;
import org.apache.asterix.common.replication.IFaultToleranceStrategy;
import org.apache.asterix.common.replication.IReplicationStrategy;
@@ -342,7 +343,7 @@
}
@Override
- public CcApplicationContext getApplicationContext() {
+ public ICcApplicationContext getApplicationContext() {
return appCtx;
}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/ResultExtractor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/ResultExtractor.java
index 76b3510..0bba635 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/ResultExtractor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/ResultExtractor.java
@@ -36,7 +36,7 @@
/**
* extracts results from the response of the QueryServiceServlet.
* As the response is not necessarily valid JSON, non-JSON content has to be extracted in some cases.
- * The current implementation creates a toomany copies of the data to be usable for larger results.
+ * The current implementation creates a too many copies of the data to be usable for larger results.
*/
public class ResultExtractor {
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
index b7adfde..249bd56 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
@@ -21,6 +21,7 @@
import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.common.cluster.IGlobalRecoveryManager;
import org.apache.asterix.common.context.IStorageComponentProvider;
+import org.apache.asterix.common.replication.IFaultToleranceStrategy;
import org.apache.asterix.common.transactions.IResourceIdManager;
import org.apache.hyracks.api.application.ICCServiceContext;
import org.apache.hyracks.api.client.IHyracksClientConnection;
@@ -57,6 +58,11 @@
IGlobalRecoveryManager getGlobalRecoveryManager();
/**
+ * @return the fault tolerance strategy in use for the cluster
+ */
+ IFaultToleranceStrategy getFaultToleranceStrategy();
+
+ /**
* @return the active lifecycle listener at Cluster controller
*/
IJobLifecycleListener getActiveLifecycleListener();
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
index f9b54dc..4cd243c 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
@@ -204,6 +204,7 @@
return metadataBootstrapSupplier.get();
}
+ @Override
public IFaultToleranceStrategy getFaultToleranceStrategy() {
return ftStrategy;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java
index 8fd444b..1c30991 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java
@@ -66,6 +66,7 @@
private ByteBuf error;
private ChannelFuture future;
private boolean done;
+ private final boolean keepAlive;
public ChunkedResponse(ChannelHandlerContext ctx, FullHttpRequest request, int chunkSize) {
this.ctx = ctx;
@@ -73,7 +74,8 @@
writer = new PrintWriter(outputStream);
response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.INTERNAL_SERVER_ERROR);
response.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED);
- if (HttpUtil.isKeepAlive(request)) {
+ keepAlive = HttpUtil.isKeepAlive(request);
+ if (keepAlive) {
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
}
}
@@ -112,13 +114,17 @@
}
future = ctx.channel().close();
} else {
+ if (keepAlive && response.status() != HttpResponseStatus.UNAUTHORIZED) {
+ response.headers().remove(HttpHeaderNames.CONNECTION);
+ }
// we didn't send anything to the user, we need to send an unchunked error response
fullResponse(response.protocolVersion(), response.status(),
error == null ? ctx.alloc().buffer(0, 0) : error, response.headers());
}
-
- // since the request failed, we need to close the channel on complete
- future.addListener(ChannelFutureListener.CLOSE);
+ if (response.status() != HttpResponseStatus.UNAUTHORIZED) {
+ // since the request failed, we need to close the channel on complete
+ future.addListener(ChannelFutureListener.CLOSE);
+ }
}
done = true;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/FullResponse.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/FullResponse.java
index 1023686..598048e 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/FullResponse.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/FullResponse.java
@@ -61,11 +61,15 @@
public void close() throws IOException {
writer.close();
FullHttpResponse fullResponse = response.replace(Unpooled.copiedBuffer(baos.toByteArray()));
- if (keepAlive && response.status() == HttpResponseStatus.OK) {
- fullResponse.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, fullResponse.content().readableBytes());
+ if (keepAlive) {
+ if (response.status() == HttpResponseStatus.OK || response.status() == HttpResponseStatus.UNAUTHORIZED) {
+ fullResponse.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, fullResponse.content().readableBytes());
+ } else {
+ fullResponse.headers().remove(HttpHeaderNames.CONNECTION);
+ }
}
future = ctx.writeAndFlush(fullResponse);
- if (response.status() != HttpResponseStatus.OK) {
+ if (response.status() != HttpResponseStatus.OK && response.status() != HttpResponseStatus.UNAUTHORIZED) {
future.addListener(ChannelFutureListener.CLOSE);
}
}