[ASTERIXDB-3343][API] Refactor and track other requests
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
Refactor code and track other requests:
INSERT/UPSERT/DELETE/COPY TO/COPY FROM and queries.
Change-Id: Iddd26895f0eb6b8008c3512025180ec620a2ca98
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18286
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 2ef3657..340cd57 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -4007,11 +4007,10 @@
spec.setProperty(GlobalTxManager.GlOBAL_TX_PROPERTY_NAME, new GlobalTxInfo(participatingDatasetIds,
numParticipatingNodes, numParticipatingPartitions));
}
- jobId = JobUtils.runJob(hcc, spec, jobFlags, false);
+ String reqId = requestParameters.getRequestReference().getUuid();
final IRequestTracker requestTracker = appCtx.getRequestTracker();
- final ClientRequest clientRequest =
- (ClientRequest) requestTracker.get(requestParameters.getRequestReference().getUuid());
- clientRequest.setJobId(jobId);
+ final ClientRequest clientRequest = (ClientRequest) requestTracker.get(reqId);
+ jobId = runTrackJob(hcc, spec, jobFlags, reqId, requestParameters.getClientContextId(), clientRequest);
clientRequest.markCancellable();
String nameBefore = Thread.currentThread().getName();
try {
@@ -4152,8 +4151,9 @@
throw e;
}
};
+ String reqId = reqParams.getRequestReference().getUuid();
IRequestTracker requestTracker = appCtx.getRequestTracker();
- ClientRequest clientRequest = (ClientRequest) requestTracker.get(reqParams.getRequestReference().getUuid());
+ ClientRequest clientRequest = (ClientRequest) requestTracker.get(reqId);
if (stmtInsertUpsert.getReturnExpression() != null) {
deliverResult(hcc, resultSet, compiler, metadataProvider, locker, resultDelivery, outMetadata, stats,
reqParams, true, stmt, clientRequest);
@@ -4179,8 +4179,7 @@
jobSpec.setProperty(GlobalTxManager.GlOBAL_TX_PROPERTY_NAME, new GlobalTxInfo(
participatingDatasetIds, numParticipatingNodes, numParticipatingPartitions));
}
- jobId = JobUtils.runJob(hcc, jobSpec, jobFlags, false);
- clientRequest.setJobId(jobId);
+ jobId = runTrackJob(hcc, jobSpec, jobFlags, reqId, reqParams.getClientContextId(), clientRequest);
clientRequest.markCancellable();
String nameBefore = Thread.currentThread().getName();
try {
@@ -4248,11 +4247,11 @@
jobSpec.setProperty(GlobalTxManager.GlOBAL_TX_PROPERTY_NAME, new GlobalTxInfo(
participatingDatasetIds, numParticipatingNodes, numParticipatingPartitions));
}
- jobId = JobUtils.runJob(hcc, jobSpec, jobFlags, false);
+ String reqId = requestParameters.getRequestReference().getUuid();
final IRequestTracker requestTracker = appCtx.getRequestTracker();
- final ClientRequest clientRequest =
- (ClientRequest) requestTracker.get(requestParameters.getRequestReference().getUuid());
- clientRequest.setJobId(jobId);
+ final ClientRequest clientRequest = (ClientRequest) requestTracker.get(reqId);
+ jobId = runTrackJob(hcc, jobSpec, jobFlags, reqId, requestParameters.getClientContextId(),
+ clientRequest);
clientRequest.markCancellable();
String nameBefore = Thread.currentThread().getName();
try {
@@ -4280,6 +4279,15 @@
}
}
+ private static JobId runTrackJob(IHyracksClientConnection hcc, JobSpecification jobSpec, EnumSet<JobFlag> jobFlags,
+ String reqId, String clientCtxId, ClientRequest clientRequest) throws Exception {
+ jobSpec.setRequestId(reqId);
+ JobId jobId = JobUtils.runJob(hcc, jobSpec, jobFlags, false);
+ LOGGER.info("Created job {} for uuid:{}, clientContextID:{}", jobId, reqId, clientCtxId);
+ clientRequest.setJobId(jobId);
+ return jobId;
+ }
+
@Override
public JobSpecification rewriteCompileQuery(IClusterInfoCollector clusterInfoCollector,
MetadataProvider metadataProvider, Query query, ICompiledDmlStatement stmt,
@@ -5393,9 +5401,9 @@
IStatementCompiler compiler, IMetadataLocker locker, ResultDelivery resultDelivery, IResultPrinter printer,
IRequestParameters requestParameters, boolean cancellable, ICcApplicationContext appCtx,
MetadataProvider metadataProvider, Statement atomicStatement) throws Exception {
+ String reqId = requestParameters.getRequestReference().getUuid();
final IRequestTracker requestTracker = appCtx.getRequestTracker();
- final ClientRequest clientRequest =
- (ClientRequest) requestTracker.get(requestParameters.getRequestReference().getUuid());
+ final ClientRequest clientRequest = (ClientRequest) requestTracker.get(reqId);
if (cancellable) {
clientRequest.markCancellable();
}
@@ -5412,7 +5420,6 @@
appCtx.getReceptionist().ensureSchedulable(schedulableRequest);
// ensure request not cancelled before running job
ensureNotCancelled(clientRequest);
- jobSpec.setRequestId(clientRequest.getId());
if (atomicStatement != null) {
Dataset ds = metadataProvider.findDataset(((InsertStatement) atomicStatement).getDatabaseName(),
((InsertStatement) atomicStatement).getDataverseName(),
@@ -5430,12 +5437,7 @@
participatingDatasetIds, numParticipatingNodes, numParticipatingPartitions));
}
}
- jobId = JobUtils.runJob(hcc, jobSpec, jobFlags, false);
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info("Created job {} for query uuid:{}, clientContextID:{}", jobId,
- requestParameters.getRequestReference().getUuid(), requestParameters.getClientContextId());
- }
- clientRequest.setJobId(jobId);
+ jobId = runTrackJob(hcc, jobSpec, jobFlags, reqId, requestParameters.getClientContextId(), clientRequest);
if (jId != null) {
jId.setValue(jobId);
}