[NO ISSUE][OTH] Add request parameter to allow for immediate execution
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
Add a request parameter to allow certain queries to
run immediately.
Change-Id: I60208c07200326a4957e908d8e7c9dcdb7bc3204
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/11145
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Reviewed-by: Michael Blow <mblow@apache.org>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IRequestParameters.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IRequestParameters.java
index 4ad1040..5c743ee 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IRequestParameters.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IRequestParameters.java
@@ -73,4 +73,6 @@
* inconsistent.
*/
boolean isForceDropDataset();
+
+ boolean isSkipAdmissionPolicy();
}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
index 083fa83..d60b791 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
@@ -273,6 +273,9 @@
* @param statementParameters
* Statement parameters
* @param statementRewriter
+ * The statement rewriter
+ * @param requestParameters
+ * The request parameters
* @return the compiled {@code JobSpecification}
* @throws AsterixException
* @throws RemoteException
@@ -281,7 +284,8 @@
*/
JobSpecification rewriteCompileQuery(IClusterInfoCollector clusterInfoCollector, MetadataProvider metadataProvider,
Query query, ICompiledDmlStatement dmlStatement, Map<String, IAObject> statementParameters,
- IStatementRewriter statementRewriter) throws RemoteException, AlgebricksException, ACIDException;
+ IStatementRewriter statementRewriter, IRequestParameters requestParameters)
+ throws RemoteException, AlgebricksException, ACIDException;
/**
* returns the active dataverse for an entity or a statement
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
index b3c7f54..920b0d9 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
@@ -77,6 +77,7 @@
import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
import org.apache.asterix.translator.CompiledStatements.ICompiledDmlStatement;
import org.apache.asterix.translator.ExecutionPlans;
+import org.apache.asterix.translator.IRequestParameters;
import org.apache.asterix.translator.SessionConfig;
import org.apache.asterix.translator.SessionOutput;
import org.apache.asterix.translator.SqlppExpressionToPlanTranslator;
@@ -205,7 +206,8 @@
public JobSpecification compileQuery(IClusterInfoCollector clusterInfoCollector, MetadataProvider metadataProvider,
Query query, int varCounter, String outputDatasetName, SessionOutput output,
ICompiledDmlStatement statement, Map<VarIdentifier, IAObject> externalVars, IResponsePrinter printer,
- IWarningCollector warningCollector) throws AlgebricksException, ACIDException {
+ IWarningCollector warningCollector, IRequestParameters requestParameters)
+ throws AlgebricksException, ACIDException {
// establish facts
final boolean isQuery = query != null;
@@ -307,15 +309,17 @@
JobSpecification spec = compiler.createJob(metadataProvider.getApplicationContext(), jobEventListenerFactory);
if (isQuery) {
- // Sets a required capacity, only for read-only queries.
- // DDLs and DMLs are considered not that frequent.
- // limit the computation locations to the locations that will be used in the query
- final INodeJobTracker nodeJobTracker = metadataProvider.getApplicationContext().getNodeJobTracker();
- final AlgebricksAbsolutePartitionConstraint jobLocations =
- getJobLocations(spec, nodeJobTracker, computationLocations);
- final IClusterCapacity jobRequiredCapacity =
- ResourceUtils.getRequiredCapacity(plan, jobLocations, physOptConf);
- spec.setRequiredClusterCapacity(jobRequiredCapacity);
+ if (requestParameters == null || !requestParameters.isSkipAdmissionPolicy()) {
+ // Sets a required capacity, only for read-only queries.
+ // DDLs and DMLs are considered not that frequent.
+ // limit the computation locations to the locations that will be used in the query
+ final INodeJobTracker nodeJobTracker = metadataProvider.getApplicationContext().getNodeJobTracker();
+ final AlgebricksAbsolutePartitionConstraint jobLocations =
+ getJobLocations(spec, nodeJobTracker, computationLocations);
+ final IClusterCapacity jobRequiredCapacity =
+ ResourceUtils.getRequiredCapacity(plan, jobLocations, physOptConf);
+ spec.setRequiredClusterCapacity(jobRequiredCapacity);
+ }
}
if (isQuery && conf.is(SessionConfig.OOB_HYRACKS_JOB)) {
generateJob(spec);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
index d3345fb..29ee76d 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
@@ -87,12 +87,24 @@
private final ProfileType profileType;
private final IRequestReference requestReference;
private final boolean forceDropDataset;
+ private final boolean skipAdmissionPolicy;
public ExecuteStatementRequestMessage(String requestNodeId, long requestMessageId, ILangExtension.Language lang,
String statementsText, SessionConfig sessionConfig, ResultProperties resultProperties,
String clientContextID, String handleUrl, Map<String, String> optionalParameters,
Map<String, byte[]> statementParameters, boolean multiStatement, ProfileType profileType,
int statementCategoryRestrictionMask, IRequestReference requestReference, boolean forceDropDataset) {
+ this(requestNodeId, requestMessageId, lang, statementsText, sessionConfig, resultProperties, clientContextID,
+ handleUrl, optionalParameters, statementParameters, multiStatement, profileType,
+ statementCategoryRestrictionMask, requestReference, forceDropDataset, false);
+ }
+
+ protected ExecuteStatementRequestMessage(String requestNodeId, long requestMessageId, ILangExtension.Language lang,
+ String statementsText, SessionConfig sessionConfig, ResultProperties resultProperties,
+ String clientContextID, String handleUrl, Map<String, String> optionalParameters,
+ Map<String, byte[]> statementParameters, boolean multiStatement, ProfileType profileType,
+ int statementCategoryRestrictionMask, IRequestReference requestReference, boolean forceDropDataset,
+ boolean skipAdmissionPolicy) {
this.requestNodeId = requestNodeId;
this.requestMessageId = requestMessageId;
this.lang = lang;
@@ -108,6 +120,7 @@
this.profileType = profileType;
this.requestReference = requestReference;
this.forceDropDataset = forceDropDataset;
+ this.skipAdmissionPolicy = skipAdmissionPolicy;
}
@Override
@@ -150,9 +163,10 @@
final IStatementExecutor.StatementProperties statementProperties =
new IStatementExecutor.StatementProperties();
Map<String, IAObject> stmtParams = RequestParameters.deserializeParameterValues(statementParameters);
- final IRequestParameters requestParameters = new RequestParameters(requestReference, statementsText, null,
- resultProperties, stats, statementProperties, outMetadata, clientContextID, optionalParameters,
- stmtParams, multiStatement, statementCategoryRestrictionMask, forceDropDataset);
+ final IRequestParameters requestParameters =
+ new RequestParameters(requestReference, statementsText, null, resultProperties, stats,
+ statementProperties, outMetadata, clientContextID, optionalParameters, stmtParams,
+ multiStatement, statementCategoryRestrictionMask, forceDropDataset, skipAdmissionPolicy);
translator.compileAndExecute(ccApp.getHcc(), requestParameters);
translator.getWarnings(warnings, maxWarnings - warnings.size());
stats.updateTotalWarningsCount(parserTotalWarningsCount);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index d49649d..754e2b6 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -2781,7 +2781,7 @@
loadStmt.getDatasetName(), loadStmt.getAdapter(), properties, loadStmt.dataIsAlreadySorted());
cls.setSourceLocation(stmt.getSourceLocation());
JobSpecification spec = apiFramework.compileQuery(hcc, metadataProvider, null, 0, null, sessionOutput, cls,
- null, responsePrinter, warningCollector);
+ null, responsePrinter, warningCollector, null);
afterCompile();
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
bActiveTxn = false;
@@ -2880,8 +2880,8 @@
CompiledDeleteStatement clfrqs = new CompiledDeleteStatement(stmtDelete.getVariableExpr(), dataverseName,
datasetName, stmtDelete.getCondition(), stmtDelete.getVarCounter(), stmtDelete.getQuery());
clfrqs.setSourceLocation(stmt.getSourceLocation());
- JobSpecification jobSpec =
- rewriteCompileQuery(hcc, metadataProvider, clfrqs.getQuery(), clfrqs, stmtParams, stmtRewriter);
+ JobSpecification jobSpec = rewriteCompileQuery(hcc, metadataProvider, clfrqs.getQuery(), clfrqs, stmtParams,
+ stmtRewriter, null);
afterCompile();
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -2904,7 +2904,7 @@
@Override
public JobSpecification rewriteCompileQuery(IClusterInfoCollector clusterInfoCollector,
MetadataProvider metadataProvider, Query query, ICompiledDmlStatement stmt,
- Map<String, IAObject> stmtParams, IStatementRewriter stmtRewriter)
+ Map<String, IAObject> stmtParams, IStatementRewriter stmtRewriter, IRequestParameters requestParameters)
throws AlgebricksException, ACIDException {
Map<VarIdentifier, IAObject> externalVars = createExternalVariables(stmtParams, stmtRewriter);
@@ -2916,7 +2916,7 @@
// Query Compilation (happens under the same ongoing metadata transaction)
return apiFramework.compileQuery(clusterInfoCollector, metadataProvider, (Query) rewrittenResult.first,
rewrittenResult.second, stmt == null ? null : stmt.getDatasetName(), sessionOutput, stmt, externalVars,
- responsePrinter, warningCollector);
+ responsePrinter, warningCollector, requestParameters);
}
private JobSpecification rewriteCompileInsertUpsert(IClusterInfoCollector clusterInfoCollector,
@@ -2955,7 +2955,7 @@
// transaction)
return apiFramework.compileQuery(clusterInfoCollector, metadataProvider, rewrittenInsertUpsert.getQuery(),
rewrittenResult.second, datasetName, sessionOutput, clfrqs, externalVars, responsePrinter,
- warningCollector);
+ warningCollector, null);
}
protected void handleCreateFeedStatement(MetadataProvider metadataProvider, Statement stmt) throws Exception {
@@ -3419,8 +3419,8 @@
boolean bActiveTxn = true;
metadataProvider.setMetadataTxnContext(mdTxnCtx);
try {
- final JobSpecification jobSpec =
- rewriteCompileQuery(hcc, metadataProvider, query, null, stmtParams, stmtRewriter);
+ final JobSpecification jobSpec = rewriteCompileQuery(hcc, metadataProvider, query, null, stmtParams,
+ stmtRewriter, requestParameters);
// update stats with count of compile-time warnings. needs to be adapted for multi-statement.
stats.updateTotalWarningsCount(warningCollector.getTotalWarningsCount());
afterCompile();
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/RequestParameters.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/RequestParameters.java
index d5ea685..6c8f21c 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/RequestParameters.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/RequestParameters.java
@@ -59,6 +59,7 @@
private final int statementCategoryRestrictionMask;
private final String statement;
private final boolean forceDropDataset;
+ private final boolean skipAdmissionPolicy;
public RequestParameters(IRequestReference requestReference, String statement, IResultSet resultSet,
ResultProperties resultProperties, Stats stats, StatementProperties statementProperties,
@@ -83,6 +84,16 @@
IStatementExecutor.ResultMetadata outMetadata, String clientContextId,
Map<String, String> optionalParameters, Map<String, IAObject> statementParameters, boolean multiStatement,
int statementCategoryRestrictionMask, boolean forceDropDataset) {
+ this(requestReference, statement, resultSet, resultProperties, stats, statementProperties, outMetadata,
+ clientContextId, optionalParameters, statementParameters, multiStatement,
+ statementCategoryRestrictionMask, forceDropDataset, false);
+ }
+
+ public RequestParameters(IRequestReference requestReference, String statement, IResultSet resultSet,
+ ResultProperties resultProperties, Stats stats, StatementProperties statementProperties,
+ IStatementExecutor.ResultMetadata outMetadata, String clientContextId,
+ Map<String, String> optionalParameters, Map<String, IAObject> statementParameters, boolean multiStatement,
+ int statementCategoryRestrictionMask, boolean forceDropDataset, boolean skipAdmissionPolicy) {
this.requestReference = requestReference;
this.statement = statement;
this.resultSet = resultSet;
@@ -96,6 +107,7 @@
this.multiStatement = multiStatement;
this.statementCategoryRestrictionMask = statementCategoryRestrictionMask;
this.forceDropDataset = forceDropDataset;
+ this.skipAdmissionPolicy = skipAdmissionPolicy;
}
@Override
@@ -149,6 +161,11 @@
}
@Override
+ public boolean isSkipAdmissionPolicy() {
+ return skipAdmissionPolicy;
+ }
+
+ @Override
public Map<String, IAObject> getStatementParameters() {
return statementParameters;
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
index dcd52a0..9472da5 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
@@ -253,7 +253,7 @@
clfrqs = new CompiledStatements.CompiledUpsertStatement(feedConn.getDataverseName(),
feedConn.getDatasetName(), feedConnQuery, stmtUpsert.getVarCounter(), null, null);
}
- return statementExecutor.rewriteCompileQuery(hcc, metadataProvider, feedConnQuery, clfrqs, null, null);
+ return statementExecutor.rewriteCompileQuery(hcc, metadataProvider, feedConnQuery, clfrqs, null, null, null);
}
private static JobSpecification combineIntakeCollectJobs(MetadataProvider metadataProvider, Feed feed,