[ASTERIXDB-1955][CLUS][RT] Refactor, Keep-Alive

- Add getFaultToleranceStrategy to ICcApplicationContext interface,
  eliminate casts to implementation where interface suffices
- Minor refactoring in ResultUtil
- Don't close keep-alive connections in case of Unauthorized (401)
  status

Change-Id: Id75dd55861976390b1098a496ff2c0345c991389
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1855
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ResultUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ResultUtil.java
index f004446..fa2f667 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ResultUtil.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ResultUtil.java
@@ -116,19 +116,26 @@
     }
 
     public static void printError(PrintWriter pw, Throwable e, boolean comma) {
+        printError(pw, e, 1, comma);
+    }
+
+    public static void printError(PrintWriter pw, Throwable e, int code, boolean comma) {
         Throwable rootCause = getRootCause(e);
-        final boolean addStack = false;
-        pw.print("\t\"");
-        pw.print(AbstractQueryApiServlet.ResultFields.ERRORS.str());
-        pw.print("\": [{ \n");
-        printField(pw, QueryServiceServlet.ErrorField.CODE.str(), "1");
         String msg = rootCause.getMessage();
         if (!(rootCause instanceof AlgebricksException || rootCause instanceof HyracksException
                 || rootCause instanceof TokenMgrError
                 || rootCause instanceof org.apache.asterix.aqlplus.parser.TokenMgrError)) {
             msg = rootCause.getClass().getSimpleName() + (msg == null ? "" : ": " + msg);
         }
-        printField(pw, QueryServiceServlet.ErrorField.MSG.str(), JSONUtil.escape(msg), addStack);
+        printError(pw, msg, code, comma);
+    }
+
+    public static void printError(PrintWriter pw, String msg, int code, boolean comma) {
+        pw.print("\t\"");
+        pw.print(AbstractQueryApiServlet.ResultFields.ERRORS.str());
+        pw.print("\": [{ \n");
+        printField(pw, QueryServiceServlet.ErrorField.CODE.str(), code);
+        printField(pw, QueryServiceServlet.ErrorField.MSG.str(), JSONUtil.escape(msg), false);
         pw.print(comma ? "\t}],\n" : "\t}]\n");
     }
 
@@ -136,6 +143,10 @@
         printField(pw, name, value, true);
     }
 
+    public static void printField(PrintWriter pw, String name, long value) {
+        printField(pw, name, value, true);
+    }
+
     public static void printField(PrintWriter pw, String name, String value, boolean comma) {
         printFieldInternal(pw, name, "\"" + value + "\"", comma);
     }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java
index 03c7ac6..a01d70a 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java
@@ -21,7 +21,6 @@
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
 import org.apache.asterix.common.replication.INCLifecycleMessage;
-import org.apache.asterix.runtime.utils.CcApplicationContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public class NCLifecycleTaskReportMessage implements INCLifecycleMessage, ICcAddressedMessage {
@@ -38,7 +37,7 @@
 
     @Override
     public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
-        ((CcApplicationContext) appCtx).getFaultToleranceStrategy().process(this);
+        appCtx.getFaultToleranceStrategy().process(this);
     }
 
     public String getNodeId() {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskRequestMessage.java
index cfe999c..21dee9c 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskRequestMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskRequestMessage.java
@@ -26,7 +26,6 @@
 import org.apache.asterix.common.messaging.api.INCMessageBroker;
 import org.apache.asterix.common.replication.INCLifecycleMessage;
 import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
-import org.apache.asterix.runtime.utils.CcApplicationContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.control.nc.NodeControllerService;
 
@@ -54,7 +53,7 @@
 
     @Override
     public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
-        ((CcApplicationContext) appCtx).getFaultToleranceStrategy().process(this);
+        appCtx.getFaultToleranceStrategy().process(this);
     }
 
     public SystemState getState() {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index 21afdf1..35e0466 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -62,6 +62,7 @@
 import org.apache.asterix.common.config.ExternalProperties;
 import org.apache.asterix.common.config.MetadataProperties;
 import org.apache.asterix.common.context.IStorageComponentProvider;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.library.ILibraryManager;
 import org.apache.asterix.common.replication.IFaultToleranceStrategy;
 import org.apache.asterix.common.replication.IReplicationStrategy;
@@ -342,7 +343,7 @@
     }
 
     @Override
-    public CcApplicationContext getApplicationContext() {
+    public ICcApplicationContext getApplicationContext() {
         return appCtx;
     }
 
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/ResultExtractor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/ResultExtractor.java
index 76b3510..0bba635 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/ResultExtractor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/ResultExtractor.java
@@ -36,7 +36,7 @@
 /**
  * extracts results from the response of the QueryServiceServlet.
  * As the response is not necessarily valid JSON, non-JSON content has to be extracted in some cases.
- * The current implementation creates a toomany copies of the data to be usable for larger results.
+ * The current implementation creates a too many copies of the data to be usable for larger results.
  */
 public class ResultExtractor {
 
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
index b7adfde..249bd56 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
@@ -21,6 +21,7 @@
 import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.common.cluster.IGlobalRecoveryManager;
 import org.apache.asterix.common.context.IStorageComponentProvider;
+import org.apache.asterix.common.replication.IFaultToleranceStrategy;
 import org.apache.asterix.common.transactions.IResourceIdManager;
 import org.apache.hyracks.api.application.ICCServiceContext;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
@@ -57,6 +58,11 @@
     IGlobalRecoveryManager getGlobalRecoveryManager();
 
     /**
+     * @return the fault tolerance strategy in use for the cluster
+     */
+    IFaultToleranceStrategy getFaultToleranceStrategy();
+
+    /**
      * @return the active lifecycle listener at Cluster controller
      */
     IJobLifecycleListener getActiveLifecycleListener();
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
index f9b54dc..4cd243c 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
@@ -204,6 +204,7 @@
         return metadataBootstrapSupplier.get();
     }
 
+    @Override
     public IFaultToleranceStrategy getFaultToleranceStrategy() {
         return ftStrategy;
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java
index 8fd444b..1c30991 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java
@@ -66,6 +66,7 @@
     private ByteBuf error;
     private ChannelFuture future;
     private boolean done;
+    private final boolean keepAlive;
 
     public ChunkedResponse(ChannelHandlerContext ctx, FullHttpRequest request, int chunkSize) {
         this.ctx = ctx;
@@ -73,7 +74,8 @@
         writer = new PrintWriter(outputStream);
         response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.INTERNAL_SERVER_ERROR);
         response.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED);
-        if (HttpUtil.isKeepAlive(request)) {
+        keepAlive = HttpUtil.isKeepAlive(request);
+        if (keepAlive) {
             response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
         }
     }
@@ -112,13 +114,17 @@
                 }
                 future = ctx.channel().close();
             } else {
+                if (keepAlive && response.status() != HttpResponseStatus.UNAUTHORIZED) {
+                    response.headers().remove(HttpHeaderNames.CONNECTION);
+                }
                 // we didn't send anything to the user, we need to send an unchunked error response
                 fullResponse(response.protocolVersion(), response.status(),
                         error == null ? ctx.alloc().buffer(0, 0) : error, response.headers());
             }
-
-            // since the request failed, we need to close the channel on complete
-            future.addListener(ChannelFutureListener.CLOSE);
+            if (response.status() != HttpResponseStatus.UNAUTHORIZED) {
+                // since the request failed, we need to close the channel on complete
+                future.addListener(ChannelFutureListener.CLOSE);
+            }
         }
         done = true;
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/FullResponse.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/FullResponse.java
index 1023686..598048e 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/FullResponse.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/FullResponse.java
@@ -61,11 +61,15 @@
     public void close() throws IOException {
         writer.close();
         FullHttpResponse fullResponse = response.replace(Unpooled.copiedBuffer(baos.toByteArray()));
-        if (keepAlive && response.status() == HttpResponseStatus.OK) {
-            fullResponse.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, fullResponse.content().readableBytes());
+        if (keepAlive) {
+            if (response.status() == HttpResponseStatus.OK || response.status() == HttpResponseStatus.UNAUTHORIZED) {
+                fullResponse.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, fullResponse.content().readableBytes());
+            } else {
+                fullResponse.headers().remove(HttpHeaderNames.CONNECTION);
+            }
         }
         future = ctx.writeAndFlush(fullResponse);
-        if (response.status() != HttpResponseStatus.OK) {
+        if (response.status() != HttpResponseStatus.OK && response.status() != HttpResponseStatus.UNAUTHORIZED) {
             future.addListener(ChannelFutureListener.CLOSE);
         }
     }