Merge "Merge branch 'gerrit/mad-hatter'" into cheshire-cat
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
index 99b6be1..0b0f262 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
@@ -68,7 +68,7 @@
     }
 
     @Override
-    protected void executeStatement(IRequestReference requestReference, String statementsText,
+    protected void executeStatement(IServletRequest request, IRequestReference requestReference, String statementsText,
             SessionOutput sessionOutput, ResultProperties resultProperties,
             IStatementExecutor.StatementProperties statementProperties, IStatementExecutor.Stats stats,
             QueryServiceRequestParameters param, RequestExecutionState executionState,
@@ -85,7 +85,7 @@
             long timeout = param.getTimeout();
             int stmtCategoryRestrictionMask = org.apache.asterix.app.translator.RequestParameters
                     .getStatementCategoryRestrictionMask(param.isReadOnly());
-            ExecuteStatementRequestMessage requestMsg = createRequestMessage(requestReference, statementsText,
+            ExecuteStatementRequestMessage requestMsg = createRequestMessage(request, requestReference, statementsText,
                     sessionOutput, resultProperties, param, optionalParameters, statementParameters, ncCtx,
                     responseFuture, queryLanguage, handleUrl, stmtCategoryRestrictionMask, false);
             executionState.start();
@@ -129,12 +129,12 @@
         buildResponseResults(responsePrinter, sessionOutput, responseMsg.getExecutionPlans(), warnings);
     }
 
-    protected ExecuteStatementRequestMessage createRequestMessage(IRequestReference requestReference,
-            String statementsText, SessionOutput sessionOutput, ResultProperties resultProperties,
-            QueryServiceRequestParameters param, Map<String, String> optionalParameters,
-            Map<String, byte[]> statementParameters, INCServiceContext ncCtx, MessageFuture responseFuture,
-            ILangExtension.Language queryLanguage, String handleUrl, int stmtCategoryRestrictionMask,
-            boolean forceDropDataset) {
+    protected ExecuteStatementRequestMessage createRequestMessage(IServletRequest request,
+            IRequestReference requestReference, String statementsText, SessionOutput sessionOutput,
+            ResultProperties resultProperties, QueryServiceRequestParameters param,
+            Map<String, String> optionalParameters, Map<String, byte[]> statementParameters, INCServiceContext ncCtx,
+            MessageFuture responseFuture, ILangExtension.Language queryLanguage, String handleUrl,
+            int stmtCategoryRestrictionMask, boolean forceDropDataset) {
         return new ExecuteStatementRequestMessage(ncCtx.getNodeId(), responseFuture.getFutureId(), queryLanguage,
                 statementsText, sessionOutput.config(), resultProperties.getNcToCcResultProperties(),
                 param.getClientContextID(), handleUrl, optionalParameters, statementParameters,
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
index dea46f2..f394794 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
@@ -301,9 +301,9 @@
                 IStatementExecutor.StatementProperties statementProperties =
                         new IStatementExecutor.StatementProperties();
                 response.setStatus(HttpResponseStatus.OK);
-                executeStatement(requestRef, statementsText, sessionOutput, resultProperties, statementProperties,
-                        stats, param, executionState, param.getOptionalParams(), statementParams, responsePrinter,
-                        warnings);
+                executeStatement(request, requestRef, statementsText, sessionOutput, resultProperties,
+                        statementProperties, stats, param, executionState, param.getOptionalParams(), statementParams,
+                        responsePrinter, warnings);
                 executionState.setStatus(ResultStatus.SUCCESS, HttpResponseStatus.OK);
             }
             errorCount = 0;
@@ -386,7 +386,7 @@
         return new ResultUtil.ParseOnlyResult(extVars);
     }
 
-    protected void executeStatement(IRequestReference requestReference, String statementsText,
+    protected void executeStatement(IServletRequest request, IRequestReference requestReference, String statementsText,
             SessionOutput sessionOutput, ResultProperties resultProperties,
             IStatementExecutor.StatementProperties statementProperties, Stats stats,
             QueryServiceRequestParameters param, RequestExecutionState executionState,
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
index 66a6dde..d3345fb 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
@@ -72,8 +72,8 @@
     public static final long DEFAULT_NC_TIMEOUT_MILLIS = TimeUnit.MILLISECONDS.toMillis(Long.MAX_VALUE);
     //TODO: Make configurable: https://issues.apache.org/jira/browse/ASTERIXDB-2063
     public static final long DEFAULT_QUERY_CANCELLATION_WAIT_MILLIS = TimeUnit.MINUTES.toMillis(1);
-    private final String requestNodeId;
-    private final long requestMessageId;
+    protected final String requestNodeId;
+    protected final long requestMessageId;
     private final ILangExtension.Language lang;
     private final String statementsText;
     private final SessionConfig sessionConfig;
@@ -178,6 +178,11 @@
         }
     }
 
+    protected CCMessageBroker getMessageBroker(ICcApplicationContext ccAppCtx) {
+        ICCServiceContext ccSrvContext = ccAppCtx.getServiceContext();
+        return (CCMessageBroker) ccSrvContext.getMessageBroker();
+    }
+
     static RuntimeDataException getRejectionReason(ClusterControllerService ccSrv, String requestNodeId) {
         if (ccSrv.getNodeManager().getNodeControllerState(requestNodeId) == null) {
             return new RuntimeDataException(ErrorCode.REJECT_NODE_UNREGISTERED);
@@ -191,7 +196,7 @@
         return null;
     }
 
-    static void sendRejection(RuntimeDataException reason, CCMessageBroker messageBroker, long requestMessageId,
+    protected static void sendRejection(Exception reason, CCMessageBroker messageBroker, long requestMessageId,
             String requestNodeId) {
         ExecuteStatementResponseMessage responseMsg = new ExecuteStatementResponseMessage(requestMessageId);
         responseMsg.setError(reason);