[ASTERIXDB-3343][API] Refactor and track other requests

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
Refactor code and track other requests:
INSERT/UPSERT/DELETE/COPY TO/COPY FROM and queries.

Change-Id: Iddd26895f0eb6b8008c3512025180ec620a2ca98
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18286
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 2ef3657..340cd57 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -4007,11 +4007,10 @@
                     spec.setProperty(GlobalTxManager.GlOBAL_TX_PROPERTY_NAME, new GlobalTxInfo(participatingDatasetIds,
                             numParticipatingNodes, numParticipatingPartitions));
                 }
-                jobId = JobUtils.runJob(hcc, spec, jobFlags, false);
+                String reqId = requestParameters.getRequestReference().getUuid();
                 final IRequestTracker requestTracker = appCtx.getRequestTracker();
-                final ClientRequest clientRequest =
-                        (ClientRequest) requestTracker.get(requestParameters.getRequestReference().getUuid());
-                clientRequest.setJobId(jobId);
+                final ClientRequest clientRequest = (ClientRequest) requestTracker.get(reqId);
+                jobId = runTrackJob(hcc, spec, jobFlags, reqId, requestParameters.getClientContextId(), clientRequest);
                 clientRequest.markCancellable();
                 String nameBefore = Thread.currentThread().getName();
                 try {
@@ -4152,8 +4151,9 @@
                 throw e;
             }
         };
+        String reqId = reqParams.getRequestReference().getUuid();
         IRequestTracker requestTracker = appCtx.getRequestTracker();
-        ClientRequest clientRequest = (ClientRequest) requestTracker.get(reqParams.getRequestReference().getUuid());
+        ClientRequest clientRequest = (ClientRequest) requestTracker.get(reqId);
         if (stmtInsertUpsert.getReturnExpression() != null) {
             deliverResult(hcc, resultSet, compiler, metadataProvider, locker, resultDelivery, outMetadata, stats,
                     reqParams, true, stmt, clientRequest);
@@ -4179,8 +4179,7 @@
                     jobSpec.setProperty(GlobalTxManager.GlOBAL_TX_PROPERTY_NAME, new GlobalTxInfo(
                             participatingDatasetIds, numParticipatingNodes, numParticipatingPartitions));
                 }
-                jobId = JobUtils.runJob(hcc, jobSpec, jobFlags, false);
-                clientRequest.setJobId(jobId);
+                jobId = runTrackJob(hcc, jobSpec, jobFlags, reqId, reqParams.getClientContextId(), clientRequest);
                 clientRequest.markCancellable();
                 String nameBefore = Thread.currentThread().getName();
                 try {
@@ -4248,11 +4247,11 @@
                     jobSpec.setProperty(GlobalTxManager.GlOBAL_TX_PROPERTY_NAME, new GlobalTxInfo(
                             participatingDatasetIds, numParticipatingNodes, numParticipatingPartitions));
                 }
-                jobId = JobUtils.runJob(hcc, jobSpec, jobFlags, false);
+                String reqId = requestParameters.getRequestReference().getUuid();
                 final IRequestTracker requestTracker = appCtx.getRequestTracker();
-                final ClientRequest clientRequest =
-                        (ClientRequest) requestTracker.get(requestParameters.getRequestReference().getUuid());
-                clientRequest.setJobId(jobId);
+                final ClientRequest clientRequest = (ClientRequest) requestTracker.get(reqId);
+                jobId = runTrackJob(hcc, jobSpec, jobFlags, reqId, requestParameters.getClientContextId(),
+                        clientRequest);
                 clientRequest.markCancellable();
                 String nameBefore = Thread.currentThread().getName();
                 try {
@@ -4280,6 +4279,15 @@
         }
     }
 
+    private static JobId runTrackJob(IHyracksClientConnection hcc, JobSpecification jobSpec, EnumSet<JobFlag> jobFlags,
+            String reqId, String clientCtxId, ClientRequest clientRequest) throws Exception {
+        jobSpec.setRequestId(reqId);
+        JobId jobId = JobUtils.runJob(hcc, jobSpec, jobFlags, false);
+        LOGGER.info("Created job {} for uuid:{}, clientContextID:{}", jobId, reqId, clientCtxId);
+        clientRequest.setJobId(jobId);
+        return jobId;
+    }
+
     @Override
     public JobSpecification rewriteCompileQuery(IClusterInfoCollector clusterInfoCollector,
             MetadataProvider metadataProvider, Query query, ICompiledDmlStatement stmt,
@@ -5393,9 +5401,9 @@
             IStatementCompiler compiler, IMetadataLocker locker, ResultDelivery resultDelivery, IResultPrinter printer,
             IRequestParameters requestParameters, boolean cancellable, ICcApplicationContext appCtx,
             MetadataProvider metadataProvider, Statement atomicStatement) throws Exception {
+        String reqId = requestParameters.getRequestReference().getUuid();
         final IRequestTracker requestTracker = appCtx.getRequestTracker();
-        final ClientRequest clientRequest =
-                (ClientRequest) requestTracker.get(requestParameters.getRequestReference().getUuid());
+        final ClientRequest clientRequest = (ClientRequest) requestTracker.get(reqId);
         if (cancellable) {
             clientRequest.markCancellable();
         }
@@ -5412,7 +5420,6 @@
             appCtx.getReceptionist().ensureSchedulable(schedulableRequest);
             // ensure request not cancelled before running job
             ensureNotCancelled(clientRequest);
-            jobSpec.setRequestId(clientRequest.getId());
             if (atomicStatement != null) {
                 Dataset ds = metadataProvider.findDataset(((InsertStatement) atomicStatement).getDatabaseName(),
                         ((InsertStatement) atomicStatement).getDataverseName(),
@@ -5430,12 +5437,7 @@
                             participatingDatasetIds, numParticipatingNodes, numParticipatingPartitions));
                 }
             }
-            jobId = JobUtils.runJob(hcc, jobSpec, jobFlags, false);
-            if (LOGGER.isInfoEnabled()) {
-                LOGGER.info("Created job {} for query uuid:{}, clientContextID:{}", jobId,
-                        requestParameters.getRequestReference().getUuid(), requestParameters.getClientContextId());
-            }
-            clientRequest.setJobId(jobId);
+            jobId = runTrackJob(hcc, jobSpec, jobFlags, reqId, requestParameters.getClientContextId(), clientRequest);
             if (jId != null) {
                 jId.setValue(jobId);
             }