[NO ISSUE][OTH] Add request parameter to allow for immediate execution

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:
Add a request parameter to allow certain queries to
run immediately.

Change-Id: I60208c07200326a4957e908d8e7c9dcdb7bc3204
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/11145
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Reviewed-by: Michael Blow <mblow@apache.org>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IRequestParameters.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IRequestParameters.java
index 4ad1040..5c743ee 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IRequestParameters.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IRequestParameters.java
@@ -73,4 +73,6 @@
      * inconsistent.
      */
     boolean isForceDropDataset();
+
+    boolean isSkipAdmissionPolicy();
 }
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
index 083fa83..d60b791 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
@@ -273,6 +273,9 @@
      * @param statementParameters
      *            Statement parameters
      * @param statementRewriter
+     *            The statement rewriter
+     * @param requestParameters
+     *            The request parameters
      * @return the compiled {@code JobSpecification}
      * @throws AsterixException
      * @throws RemoteException
@@ -281,7 +284,8 @@
      */
     JobSpecification rewriteCompileQuery(IClusterInfoCollector clusterInfoCollector, MetadataProvider metadataProvider,
             Query query, ICompiledDmlStatement dmlStatement, Map<String, IAObject> statementParameters,
-            IStatementRewriter statementRewriter) throws RemoteException, AlgebricksException, ACIDException;
+            IStatementRewriter statementRewriter, IRequestParameters requestParameters)
+            throws RemoteException, AlgebricksException, ACIDException;
 
     /**
      * returns the active dataverse for an entity or a statement
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
index b3c7f54..920b0d9 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
@@ -77,6 +77,7 @@
 import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
 import org.apache.asterix.translator.CompiledStatements.ICompiledDmlStatement;
 import org.apache.asterix.translator.ExecutionPlans;
+import org.apache.asterix.translator.IRequestParameters;
 import org.apache.asterix.translator.SessionConfig;
 import org.apache.asterix.translator.SessionOutput;
 import org.apache.asterix.translator.SqlppExpressionToPlanTranslator;
@@ -205,7 +206,8 @@
     public JobSpecification compileQuery(IClusterInfoCollector clusterInfoCollector, MetadataProvider metadataProvider,
             Query query, int varCounter, String outputDatasetName, SessionOutput output,
             ICompiledDmlStatement statement, Map<VarIdentifier, IAObject> externalVars, IResponsePrinter printer,
-            IWarningCollector warningCollector) throws AlgebricksException, ACIDException {
+            IWarningCollector warningCollector, IRequestParameters requestParameters)
+            throws AlgebricksException, ACIDException {
 
         // establish facts
         final boolean isQuery = query != null;
@@ -307,15 +309,17 @@
         JobSpecification spec = compiler.createJob(metadataProvider.getApplicationContext(), jobEventListenerFactory);
 
         if (isQuery) {
-            // Sets a required capacity, only for read-only queries.
-            // DDLs and DMLs are considered not that frequent.
-            // limit the computation locations to the locations that will be used in the query
-            final INodeJobTracker nodeJobTracker = metadataProvider.getApplicationContext().getNodeJobTracker();
-            final AlgebricksAbsolutePartitionConstraint jobLocations =
-                    getJobLocations(spec, nodeJobTracker, computationLocations);
-            final IClusterCapacity jobRequiredCapacity =
-                    ResourceUtils.getRequiredCapacity(plan, jobLocations, physOptConf);
-            spec.setRequiredClusterCapacity(jobRequiredCapacity);
+            if (requestParameters == null || !requestParameters.isSkipAdmissionPolicy()) {
+                // Sets a required capacity, only for read-only queries.
+                // DDLs and DMLs are considered not that frequent.
+                // limit the computation locations to the locations that will be used in the query
+                final INodeJobTracker nodeJobTracker = metadataProvider.getApplicationContext().getNodeJobTracker();
+                final AlgebricksAbsolutePartitionConstraint jobLocations =
+                        getJobLocations(spec, nodeJobTracker, computationLocations);
+                final IClusterCapacity jobRequiredCapacity =
+                        ResourceUtils.getRequiredCapacity(plan, jobLocations, physOptConf);
+                spec.setRequiredClusterCapacity(jobRequiredCapacity);
+            }
         }
         if (isQuery && conf.is(SessionConfig.OOB_HYRACKS_JOB)) {
             generateJob(spec);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
index d3345fb..29ee76d 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
@@ -87,12 +87,24 @@
     private final ProfileType profileType;
     private final IRequestReference requestReference;
     private final boolean forceDropDataset;
+    private final boolean skipAdmissionPolicy;
 
     public ExecuteStatementRequestMessage(String requestNodeId, long requestMessageId, ILangExtension.Language lang,
             String statementsText, SessionConfig sessionConfig, ResultProperties resultProperties,
             String clientContextID, String handleUrl, Map<String, String> optionalParameters,
             Map<String, byte[]> statementParameters, boolean multiStatement, ProfileType profileType,
             int statementCategoryRestrictionMask, IRequestReference requestReference, boolean forceDropDataset) {
+        this(requestNodeId, requestMessageId, lang, statementsText, sessionConfig, resultProperties, clientContextID,
+                handleUrl, optionalParameters, statementParameters, multiStatement, profileType,
+                statementCategoryRestrictionMask, requestReference, forceDropDataset, false);
+    }
+
+    protected ExecuteStatementRequestMessage(String requestNodeId, long requestMessageId, ILangExtension.Language lang,
+            String statementsText, SessionConfig sessionConfig, ResultProperties resultProperties,
+            String clientContextID, String handleUrl, Map<String, String> optionalParameters,
+            Map<String, byte[]> statementParameters, boolean multiStatement, ProfileType profileType,
+            int statementCategoryRestrictionMask, IRequestReference requestReference, boolean forceDropDataset,
+            boolean skipAdmissionPolicy) {
         this.requestNodeId = requestNodeId;
         this.requestMessageId = requestMessageId;
         this.lang = lang;
@@ -108,6 +120,7 @@
         this.profileType = profileType;
         this.requestReference = requestReference;
         this.forceDropDataset = forceDropDataset;
+        this.skipAdmissionPolicy = skipAdmissionPolicy;
     }
 
     @Override
@@ -150,9 +163,10 @@
             final IStatementExecutor.StatementProperties statementProperties =
                     new IStatementExecutor.StatementProperties();
             Map<String, IAObject> stmtParams = RequestParameters.deserializeParameterValues(statementParameters);
-            final IRequestParameters requestParameters = new RequestParameters(requestReference, statementsText, null,
-                    resultProperties, stats, statementProperties, outMetadata, clientContextID, optionalParameters,
-                    stmtParams, multiStatement, statementCategoryRestrictionMask, forceDropDataset);
+            final IRequestParameters requestParameters =
+                    new RequestParameters(requestReference, statementsText, null, resultProperties, stats,
+                            statementProperties, outMetadata, clientContextID, optionalParameters, stmtParams,
+                            multiStatement, statementCategoryRestrictionMask, forceDropDataset, skipAdmissionPolicy);
             translator.compileAndExecute(ccApp.getHcc(), requestParameters);
             translator.getWarnings(warnings, maxWarnings - warnings.size());
             stats.updateTotalWarningsCount(parserTotalWarningsCount);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index d49649d..754e2b6 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -2781,7 +2781,7 @@
                     loadStmt.getDatasetName(), loadStmt.getAdapter(), properties, loadStmt.dataIsAlreadySorted());
             cls.setSourceLocation(stmt.getSourceLocation());
             JobSpecification spec = apiFramework.compileQuery(hcc, metadataProvider, null, 0, null, sessionOutput, cls,
-                    null, responsePrinter, warningCollector);
+                    null, responsePrinter, warningCollector, null);
             afterCompile();
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
@@ -2880,8 +2880,8 @@
             CompiledDeleteStatement clfrqs = new CompiledDeleteStatement(stmtDelete.getVariableExpr(), dataverseName,
                     datasetName, stmtDelete.getCondition(), stmtDelete.getVarCounter(), stmtDelete.getQuery());
             clfrqs.setSourceLocation(stmt.getSourceLocation());
-            JobSpecification jobSpec =
-                    rewriteCompileQuery(hcc, metadataProvider, clfrqs.getQuery(), clfrqs, stmtParams, stmtRewriter);
+            JobSpecification jobSpec = rewriteCompileQuery(hcc, metadataProvider, clfrqs.getQuery(), clfrqs, stmtParams,
+                    stmtRewriter, null);
             afterCompile();
 
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -2904,7 +2904,7 @@
     @Override
     public JobSpecification rewriteCompileQuery(IClusterInfoCollector clusterInfoCollector,
             MetadataProvider metadataProvider, Query query, ICompiledDmlStatement stmt,
-            Map<String, IAObject> stmtParams, IStatementRewriter stmtRewriter)
+            Map<String, IAObject> stmtParams, IStatementRewriter stmtRewriter, IRequestParameters requestParameters)
             throws AlgebricksException, ACIDException {
 
         Map<VarIdentifier, IAObject> externalVars = createExternalVariables(stmtParams, stmtRewriter);
@@ -2916,7 +2916,7 @@
         // Query Compilation (happens under the same ongoing metadata transaction)
         return apiFramework.compileQuery(clusterInfoCollector, metadataProvider, (Query) rewrittenResult.first,
                 rewrittenResult.second, stmt == null ? null : stmt.getDatasetName(), sessionOutput, stmt, externalVars,
-                responsePrinter, warningCollector);
+                responsePrinter, warningCollector, requestParameters);
     }
 
     private JobSpecification rewriteCompileInsertUpsert(IClusterInfoCollector clusterInfoCollector,
@@ -2955,7 +2955,7 @@
         // transaction)
         return apiFramework.compileQuery(clusterInfoCollector, metadataProvider, rewrittenInsertUpsert.getQuery(),
                 rewrittenResult.second, datasetName, sessionOutput, clfrqs, externalVars, responsePrinter,
-                warningCollector);
+                warningCollector, null);
     }
 
     protected void handleCreateFeedStatement(MetadataProvider metadataProvider, Statement stmt) throws Exception {
@@ -3419,8 +3419,8 @@
             boolean bActiveTxn = true;
             metadataProvider.setMetadataTxnContext(mdTxnCtx);
             try {
-                final JobSpecification jobSpec =
-                        rewriteCompileQuery(hcc, metadataProvider, query, null, stmtParams, stmtRewriter);
+                final JobSpecification jobSpec = rewriteCompileQuery(hcc, metadataProvider, query, null, stmtParams,
+                        stmtRewriter, requestParameters);
                 // update stats with count of compile-time warnings. needs to be adapted for multi-statement.
                 stats.updateTotalWarningsCount(warningCollector.getTotalWarningsCount());
                 afterCompile();
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/RequestParameters.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/RequestParameters.java
index d5ea685..6c8f21c 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/RequestParameters.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/RequestParameters.java
@@ -59,6 +59,7 @@
     private final int statementCategoryRestrictionMask;
     private final String statement;
     private final boolean forceDropDataset;
+    private final boolean skipAdmissionPolicy;
 
     public RequestParameters(IRequestReference requestReference, String statement, IResultSet resultSet,
             ResultProperties resultProperties, Stats stats, StatementProperties statementProperties,
@@ -83,6 +84,16 @@
             IStatementExecutor.ResultMetadata outMetadata, String clientContextId,
             Map<String, String> optionalParameters, Map<String, IAObject> statementParameters, boolean multiStatement,
             int statementCategoryRestrictionMask, boolean forceDropDataset) {
+        this(requestReference, statement, resultSet, resultProperties, stats, statementProperties, outMetadata,
+                clientContextId, optionalParameters, statementParameters, multiStatement,
+                statementCategoryRestrictionMask, forceDropDataset, false);
+    }
+
+    public RequestParameters(IRequestReference requestReference, String statement, IResultSet resultSet,
+            ResultProperties resultProperties, Stats stats, StatementProperties statementProperties,
+            IStatementExecutor.ResultMetadata outMetadata, String clientContextId,
+            Map<String, String> optionalParameters, Map<String, IAObject> statementParameters, boolean multiStatement,
+            int statementCategoryRestrictionMask, boolean forceDropDataset, boolean skipAdmissionPolicy) {
         this.requestReference = requestReference;
         this.statement = statement;
         this.resultSet = resultSet;
@@ -96,6 +107,7 @@
         this.multiStatement = multiStatement;
         this.statementCategoryRestrictionMask = statementCategoryRestrictionMask;
         this.forceDropDataset = forceDropDataset;
+        this.skipAdmissionPolicy = skipAdmissionPolicy;
     }
 
     @Override
@@ -149,6 +161,11 @@
     }
 
     @Override
+    public boolean isSkipAdmissionPolicy() {
+        return skipAdmissionPolicy;
+    }
+
+    @Override
     public Map<String, IAObject> getStatementParameters() {
         return statementParameters;
     }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
index dcd52a0..9472da5 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
@@ -253,7 +253,7 @@
             clfrqs = new CompiledStatements.CompiledUpsertStatement(feedConn.getDataverseName(),
                     feedConn.getDatasetName(), feedConnQuery, stmtUpsert.getVarCounter(), null, null);
         }
-        return statementExecutor.rewriteCompileQuery(hcc, metadataProvider, feedConnQuery, clfrqs, null, null);
+        return statementExecutor.rewriteCompileQuery(hcc, metadataProvider, feedConnQuery, clfrqs, null, null, null);
     }
 
     private static JobSpecification combineIntakeCollectJobs(MetadataProvider metadataProvider, Feed feed,