[ASTERIXDB-3343][API] Capture job state changes in client requests

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:
Instead of fetching the job state for a client request from
the job manager, keep track of the request's job state while
the job transitions from one state to another. Otherwise,
the job archive in the job manager could have already recycled
by the time someone tries to fetch the details of completed requests.

- make IRequestTracker extends IJobLifecycleListener.
- add the client request id as part of the job spec.
  this client request id will be used to report back
  to the request tracker about the job associated with
  the request id.

Change-Id: I638682d48651ba0e771c7590ec875a3af1050ae3
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18279
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Tested-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
index 28549f3..9f2cbdd 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
@@ -18,6 +18,9 @@
  */
 package org.apache.asterix.translator;
 
+import static org.apache.hyracks.api.job.resource.IJobCapacityController.JobSubmissionStatus.QUEUE;
+
+import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.asterix.common.api.ICommonRequestParameters;
@@ -26,9 +29,10 @@
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.job.resource.IClusterCapacity;
-import org.apache.hyracks.control.cc.ClusterControllerService;
-import org.apache.hyracks.control.cc.job.JobRun;
+import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
+import org.apache.hyracks.api.job.resource.IReadOnlyClusterCapacity;
+import org.apache.hyracks.api.util.ExceptionUtils;
 
 import com.fasterxml.jackson.databind.node.ObjectNode;
 
@@ -37,17 +41,17 @@
     protected final long creationTime = System.nanoTime();
     protected final Thread executor;
     protected final String statement;
-    protected final ClusterControllerService ccService;
     protected final String clientContextId;
+    protected final JobState jobState;
     protected volatile JobId jobId;
-    private String plan; // can be null
+    private volatile String plan; // can be null
 
-    public ClientRequest(ICommonRequestParameters requestParameters, ICcApplicationContext appCtx) {
+    public ClientRequest(ICommonRequestParameters requestParameters) {
         super(requestParameters.getRequestReference());
         this.clientContextId = requestParameters.getClientContextId();
         this.statement = requestParameters.getStatement();
         this.executor = Thread.currentThread();
-        this.ccService = (ClusterControllerService) appCtx.getServiceContext().getControllerService();
+        this.jobState = new JobState();
     }
 
     @Override
@@ -100,37 +104,55 @@
         return json;
     }
 
-    private void putJobDetails(ObjectNode json) {
-        if (jobId == null) {
-            json.putNull("jobId");
-        } else {
-            try {
-                json.put("jobId", jobId.toString());
-                JobRun jobRun = ccService.getJobManager().get(jobId);
-                if (jobRun != null) {
-                    json.put("jobStatus", String.valueOf(jobRun.getStatus()));
-                    putJobRequiredResources(json, jobRun);
-                    putTimes(json, jobRun);
-                }
-            } catch (Throwable th) {
-                // ignore
-            }
+    @Override
+    public void jobCreated(JobId jobId, IReadOnlyClusterCapacity requiredClusterCapacity,
+            IJobCapacityController.JobSubmissionStatus status) {
+        jobState.createTime = System.currentTimeMillis();
+        jobState.status = status == QUEUE ? JobStatus.PENDING : JobStatus.RUNNING;
+        jobState.requiredCPUs = requiredClusterCapacity.getAggregatedCores();
+        jobState.requiredMemoryInBytes = requiredClusterCapacity.getAggregatedMemoryByteSize();
+    }
+
+    @Override
+    public void jobStarted(JobId jobId) {
+        jobState.startTime = System.currentTimeMillis();
+        jobState.status = JobStatus.RUNNING;
+    }
+
+    @Override
+    public void jobFinished(JobId jobId, JobStatus jobStatus, List<Exception> exceptions) {
+        jobState.endTime = System.currentTimeMillis();
+        jobState.status = jobStatus;
+        if (exceptions != null && !exceptions.isEmpty()) {
+            jobState.errorMsg = processException(exceptions.get(0));
         }
     }
 
-    private static void putTimes(ObjectNode json, JobRun jobRun) {
-        AMutableDateTime dateTime = new AMutableDateTime(0);
-        putTime(json, jobRun.getCreateTime(), "jobCreateTime", dateTime);
-        putTime(json, jobRun.getStartTime(), "jobStartTime", dateTime);
-        putTime(json, jobRun.getEndTime(), "jobEndTime", dateTime);
-        json.put("jobQueueTime", TimeUnit.MILLISECONDS.toSeconds(jobRun.getQueueWaitTimeInMillis()));
+    protected String processException(Exception e) {
+        return ExceptionUtils.unwrap(e).getMessage();
     }
 
-    private static void putJobRequiredResources(ObjectNode json, JobRun jobRun) {
-        IClusterCapacity jobCapacity = jobRun.getJobSpecification().getRequiredClusterCapacity();
-        if (jobCapacity != null) {
-            json.put("jobRequiredCPUs", jobCapacity.getAggregatedCores());
-            json.put("jobRequiredMemory", jobCapacity.getAggregatedMemoryByteSize());
+    private void putJobDetails(ObjectNode json) {
+        try {
+            json.put("jobId", jobId != null ? jobId.toString() : null);
+            putJobState(json, jobState);
+        } catch (Throwable th) {
+            // ignore
+        }
+    }
+
+    private static void putJobState(ObjectNode json, JobState state) {
+        AMutableDateTime dateTime = new AMutableDateTime(0);
+        putTime(json, state.createTime, "jobCreateTime", dateTime);
+        putTime(json, state.startTime, "jobStartTime", dateTime);
+        putTime(json, state.endTime, "jobEndTime", dateTime);
+        long queueTime = (state.startTime > 0 ? state.startTime : System.currentTimeMillis()) - state.createTime;
+        json.put("jobQueueTime", TimeUnit.MILLISECONDS.toSeconds(queueTime));
+        json.put("jobStatus", String.valueOf(state.status));
+        json.put("jobRequiredCPUs", state.requiredCPUs);
+        json.put("jobRequiredMemory", state.requiredMemoryInBytes);
+        if (state.errorMsg != null) {
+            json.put("error", state.errorMsg);
         }
     }
 
@@ -140,4 +162,14 @@
             json.put(label, dateTime.toSimpleString());
         }
     }
+
+    static class JobState {
+        volatile long createTime;
+        volatile long startTime;
+        volatile long endTime;
+        volatile long requiredMemoryInBytes;
+        volatile int requiredCPUs;
+        volatile JobStatus status;
+        volatile String errorMsg;
+    }
 }
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java
index 6893d5c..84bee6a 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java
@@ -27,7 +27,6 @@
 import org.apache.asterix.common.api.IRequestReference;
 import org.apache.asterix.common.api.ISchedulableClientRequest;
 import org.apache.asterix.common.api.RequestReference;
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.http.HttpHeaders;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.http.api.IServletRequest;
@@ -47,9 +46,8 @@
     }
 
     @Override
-    public IClientRequest requestReceived(ICommonRequestParameters requestParameters, ICcApplicationContext appCtx)
-            throws HyracksDataException {
-        return new ClientRequest(requestParameters, appCtx);
+    public IClientRequest requestReceived(ICommonRequestParameters requestParameters) throws HyracksDataException {
+        return new ClientRequest(requestParameters);
     }
 
     @Override
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
index 8821c67..4d654d5 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
@@ -41,6 +41,7 @@
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
 import org.apache.hyracks.api.util.SingleThreadEventProcessor;
 import org.apache.hyracks.util.ExitUtil;
 import org.apache.logging.log4j.Level;
@@ -89,7 +90,8 @@
     // *** IJobLifecycleListener
 
     @Override
-    public void notifyJobCreation(JobId jobId, JobSpecification jobSpecification) throws HyracksDataException {
+    public void notifyJobCreation(JobId jobId, JobSpecification jobSpecification,
+            IJobCapacityController.JobSubmissionStatus status) throws HyracksDataException {
         Object property = jobSpecification.getProperty(ACTIVE_ENTITY_PROPERTY_NAME);
         if (!(property instanceof EntityId)) {
             if (property != null) {
@@ -119,7 +121,7 @@
     }
 
     @Override
-    public synchronized void notifyJobStart(JobId jobId) throws HyracksException {
+    public synchronized void notifyJobStart(JobId jobId, JobSpecification spec) throws HyracksException {
         EntityId entityId = jobId2EntityId.get(jobId);
         if (entityId != null) {
             add(new ActiveEvent(jobId, Kind.JOB_STARTED, entityId, null));
@@ -128,8 +130,8 @@
     }
 
     @Override
-    public synchronized void notifyJobFinish(JobId jobId, JobStatus jobStatus, List<Exception> exceptions)
-            throws HyracksException {
+    public synchronized void notifyJobFinish(JobId jobId, JobSpecification spec, JobStatus jobStatus,
+            List<Exception> exceptions) throws HyracksException {
         EntityId entityId = jobId2EntityId.get(jobId);
         if (entityId != null) {
             LOGGER.debug("notified of ingestion job finish {}", jobId);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 3cac3d8..c89097b 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -4869,6 +4869,7 @@
             appCtx.getReceptionist().ensureSchedulable(schedulableRequest);
             // ensure request not cancelled before running job
             ensureNotCancelled(clientRequest);
+            jobSpec.setRequestId(clientRequest.getId());
             final JobId jobId = JobUtils.runJob(hcc, jobSpec, jobFlags, false);
             if (LOGGER.isInfoEnabled()) {
                 LOGGER.info("Created job {} for query uuid:{}, clientContextID:{}", jobId,
@@ -5240,7 +5241,7 @@
     }
 
     protected void trackRequest(IRequestParameters requestParameters) throws HyracksDataException {
-        final IClientRequest clientRequest = appCtx.getReceptionist().requestReceived(requestParameters, appCtx);
+        final IClientRequest clientRequest = appCtx.getReceptionist().requestReceived(requestParameters);
         this.appCtx.getRequestTracker().track(clientRequest);
     }
 
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index 1b011ae..31ec44c 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -171,6 +171,7 @@
         ccServiceCtx.setDistributedState(proxy);
         MetadataManager.initialize(proxy, metadataProperties, appCtx);
         ccServiceCtx.addJobLifecycleListener(appCtx.getActiveNotificationHandler());
+        ccServiceCtx.addJobLifecycleListener(appCtx.getRequestTracker());
 
         // create event loop groups
         webManager = new WebManager();
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java
index 0e063ca..673611d 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java
@@ -79,7 +79,7 @@
         final RequestReference requestReference = RequestReference.of("1", "node1", System.currentTimeMillis());
         RequestParameters requestParameters = new RequestParameters(requestReference, "select 1", null, null, null,
                 null, null, "1", null, null, null, true);
-        ClientRequest request = new ClientRequest(requestParameters, appCtx);
+        ClientRequest request = new ClientRequest(requestParameters);
         request.setJobId(new JobId(1));
         request.markCancellable();
         tracker.track(request);
@@ -96,7 +96,7 @@
         final RequestReference requestReference2 = RequestReference.of("2", "node1", System.currentTimeMillis());
         requestParameters = new RequestParameters(requestReference2, "select 1", null, null, null, null, null, "2",
                 null, null, null, true);
-        ClientRequest request2 = new ClientRequest(requestParameters, appCtx);
+        ClientRequest request2 = new ClientRequest(requestParameters);
         request2.setJobId(new JobId(2));
         request2.markCancellable();
         tracker.track(request2);
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
index 126edb1..4e5d531 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
@@ -53,6 +53,7 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -135,8 +136,9 @@
         TestUserActor user = new TestUserActor("Xikui", mdProvider, null);
         Action start = user.startActivity(eventsListener);
         startingSubscriber.sync();
-        activeJobNotificationHandler.notifyJobCreation(jobId, jobSpec);
-        activeJobNotificationHandler.notifyJobStart(jobId);
+        activeJobNotificationHandler.notifyJobCreation(jobId, jobSpec,
+                IJobCapacityController.JobSubmissionStatus.EXECUTE);
+        activeJobNotificationHandler.notifyJobStart(jobId, jobSpec);
         try {
             eventsListener.refreshStats(1000);
         } catch (HyracksDataException e) {
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestClusterControllerActor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestClusterControllerActor.java
index 52d4225..883a0cb 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestClusterControllerActor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestClusterControllerActor.java
@@ -28,6 +28,7 @@
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
 import org.mockito.Mockito;
 
 public class TestClusterControllerActor extends Actor {
@@ -49,8 +50,8 @@
                 JobSpecification jobSpecification = Mockito.mock(JobSpecification.class);
                 Mockito.when(jobSpecification.getProperty(ActiveNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME))
                         .thenReturn(entityId);
-                handler.notifyJobCreation(jobId, jobSpecification);
-                handler.notifyJobStart(jobId);
+                handler.notifyJobCreation(jobId, jobSpecification, IJobCapacityController.JobSubmissionStatus.EXECUTE);
+                handler.notifyJobStart(jobId, null);
             }
         };
         add(startJob);
@@ -72,7 +73,7 @@
         Action delivery = new Action() {
             @Override
             protected void doExecute(MetadataProvider actorMdProvider) throws Exception {
-                handler.notifyJobFinish(jobId, jobStatus, exceptions);
+                handler.notifyJobFinish(jobId, null, jobStatus, exceptions);
             }
         };
         add(delivery);
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
index 3157c64..8d2c91a 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
@@ -18,8 +18,14 @@
  */
 package org.apache.asterix.common.api;
 
+import java.util.List;
+
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
+import org.apache.hyracks.api.job.resource.IReadOnlyClusterCapacity;
 
 import com.fasterxml.jackson.databind.node.ObjectNode;
 
@@ -97,4 +103,30 @@
      */
     ObjectNode asJson();
 
+    /**
+     * Called when the job is created.
+     *
+     * @param jobId the job id
+     * @param requiredClusterCapacity the required resources by the job
+     * @param status the status of the job; whether it will be executed or queued
+     */
+    void jobCreated(JobId jobId, IReadOnlyClusterCapacity requiredClusterCapacity,
+            IJobCapacityController.JobSubmissionStatus status);
+
+    /**
+     * Called when the job starts running.
+     *
+     * @param jobId the job id
+     */
+    void jobStarted(JobId jobId);
+
+    /**
+     * Called when the job finishes.
+     *
+     * @param jobId the job id
+     * @param jobStatus the final job status
+     * @param exceptions exceptions encountered if any
+     */
+    void jobFinished(JobId jobId, JobStatus jobStatus, List<Exception> exceptions);
+
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java
index f0b4944..153eea9 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java
@@ -18,7 +18,6 @@
  */
 package org.apache.asterix.common.api;
 
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.http.api.IServletRequest;
 
@@ -35,13 +34,11 @@
     /**
      * Generates a {@link IClientRequest} based on the requests parameters
      *
-     * @param requestParameters
-     * @param appCtx
+     * @param requestParameters the request parameters
      * @return the client request
-     * @throws HyracksDataException
+     * @throws HyracksDataException HyracksDataException
      */
-    IClientRequest requestReceived(ICommonRequestParameters requestParameters, ICcApplicationContext appCtx)
-            throws HyracksDataException;
+    IClientRequest requestReceived(ICommonRequestParameters requestParameters) throws HyracksDataException;
 
     /**
      * Ensures a client's request can be executed before its job is started
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestTracker.java
index b2dc309..230eac3 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestTracker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestTracker.java
@@ -21,8 +21,9 @@
 import java.util.Collection;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IJobLifecycleListener;
 
-public interface IRequestTracker {
+public interface IRequestTracker extends IJobLifecycleListener {
 
     /**
      * Starts tracking {@code request}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/NodeJobTracker.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/NodeJobTracker.java
index a2ebd9a..02f20f8 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/NodeJobTracker.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/NodeJobTracker.java
@@ -36,6 +36,7 @@
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
 import org.apache.hyracks.util.annotations.ThreadSafe;
 
 @ThreadSafe
@@ -44,17 +45,19 @@
     private final Map<String, Set<JobId>> nodeJobs = new HashMap<>();
 
     @Override
-    public synchronized void notifyJobCreation(JobId jobId, JobSpecification spec) {
+    public synchronized void notifyJobCreation(JobId jobId, JobSpecification spec,
+            IJobCapacityController.JobSubmissionStatus status) {
         getJobParticipatingNodes(spec).stream().map(nodeJobs::get).forEach(jobsSet -> jobsSet.add(jobId));
     }
 
     @Override
-    public synchronized void notifyJobStart(JobId jobId) {
+    public synchronized void notifyJobStart(JobId jobId, JobSpecification spec) {
         // nothing to do
     }
 
     @Override
-    public synchronized void notifyJobFinish(JobId jobId, JobStatus jobStatus, List<Exception> exceptions) {
+    public synchronized void notifyJobFinish(JobId jobId, JobSpecification spec, JobStatus jobStatus,
+            List<Exception> exceptions) {
         nodeJobs.values().forEach(jobsSet -> jobsSet.remove(jobId));
     }
 
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
index c16f825..5771201 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
@@ -21,6 +21,7 @@
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
@@ -31,6 +32,11 @@
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.commons.collections4.queue.CircularFifoQueue;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
 
 public class RequestTracker implements IRequestTracker {
 
@@ -132,4 +138,38 @@
     public long getTotalNumberOfFailedRequests() {
         return numOfFailedRequests.get();
     }
+
+    public void notifyJobCreation(JobId jobId, JobSpecification spec, IJobCapacityController.JobSubmissionStatus status)
+            throws HyracksException {
+        String requestId = spec.getRequestId();
+        if (requestId != null) {
+            IClientRequest clientRequest = runningRequests.get(requestId);
+            if (clientRequest != null) {
+                clientRequest.jobCreated(jobId, spec.getRequiredClusterCapacity(), status);
+            }
+        }
+    }
+
+    @Override
+    public void notifyJobStart(JobId jobId, JobSpecification spec) throws HyracksException {
+        String requestId = spec.getRequestId();
+        if (requestId != null) {
+            IClientRequest clientRequest = runningRequests.get(requestId);
+            if (clientRequest != null) {
+                clientRequest.jobStarted(jobId);
+            }
+        }
+    }
+
+    @Override
+    public void notifyJobFinish(JobId jobId, JobSpecification spec, JobStatus jobStatus, List<Exception> exceptions)
+            throws HyracksException {
+        String requestId = spec.getRequestId();
+        if (requestId != null) {
+            IClientRequest clientRequest = runningRequests.get(requestId);
+            if (clientRequest != null) {
+                clientRequest.jobFinished(jobId, jobStatus, exceptions);
+            }
+        }
+    }
 }
diff --git a/asterixdb/asterix-runtime/src/test/java/org/apache/asterix/runtime/job/listener/NodeJobTrackerTest.java b/asterixdb/asterix-runtime/src/test/java/org/apache/asterix/runtime/job/listener/NodeJobTrackerTest.java
index be75ecb..82d8cb7 100644
--- a/asterixdb/asterix-runtime/src/test/java/org/apache/asterix/runtime/job/listener/NodeJobTrackerTest.java
+++ b/asterixdb/asterix-runtime/src/test/java/org/apache/asterix/runtime/job/listener/NodeJobTrackerTest.java
@@ -26,6 +26,7 @@
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
 import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.Mockito;
@@ -50,18 +51,18 @@
         jobSpec.getUserConstraints().add(new Constraint(lValueMock, unknownLocation));
 
         JobId jobId = new JobId(1);
-        nodeJobTracker.notifyJobCreation(jobId, jobSpec);
+        nodeJobTracker.notifyJobCreation(jobId, jobSpec, IJobCapacityController.JobSubmissionStatus.EXECUTE);
         // make sure nc1 has a pending job
         Assert.assertTrue(nodeJobTracker.getPendingJobs(nc1).size() == 1);
         Assert.assertTrue(nodeJobTracker.getPendingJobs(unknown).isEmpty());
         Assert.assertTrue(nodeJobTracker.getPendingJobs(nc2).isEmpty());
-        nodeJobTracker.notifyJobFinish(jobId, JobStatus.TERMINATED, null);
+        nodeJobTracker.notifyJobFinish(jobId, jobSpec, JobStatus.TERMINATED, null);
         // make sure nc1 doesn't have pending jobs anymore
         Assert.assertTrue(nodeJobTracker.getPendingJobs(nc1).isEmpty());
 
         // make sure node doesn't have pending jobs after failure
         jobId = new JobId(2);
-        nodeJobTracker.notifyJobCreation(jobId, jobSpec);
+        nodeJobTracker.notifyJobCreation(jobId, jobSpec, IJobCapacityController.JobSubmissionStatus.EXECUTE);
         Assert.assertTrue(nodeJobTracker.getPendingJobs(nc1).size() == 1);
         nodeJobTracker.notifyNodeFailure(Collections.singleton(nc1));
         Assert.assertTrue(nodeJobTracker.getPendingJobs(nc1).isEmpty());
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobLifecycleListener.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobLifecycleListener.java
index 338c331..6773dde 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobLifecycleListener.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobLifecycleListener.java
@@ -21,6 +21,7 @@
 import java.util.List;
 
 import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
 
 /**
  * A listener for job related events
@@ -29,27 +30,33 @@
     /**
      * Notify the listener that a job has been created
      *
-     * @param jobId
-     * @param spec
-     * @throws HyracksException
+     * @param jobId the job id
+     * @param spec the job specification
+     * @param status the status of the job; whether it will be executed or queued
+     * @throws HyracksException HyracksException
      */
-    void notifyJobCreation(JobId jobId, JobSpecification spec) throws HyracksException;
+    void notifyJobCreation(JobId jobId, JobSpecification spec, IJobCapacityController.JobSubmissionStatus status)
+            throws HyracksException;
 
     /**
      * Notify the listener that the job has started on the cluster controller
      *
-     * @param jobId
-     * @throws HyracksException
+     * @param jobId the job id
+     * @param spec the job specification
+     *
+     * @throws HyracksException HyracksException
      */
-    void notifyJobStart(JobId jobId) throws HyracksException;
+    void notifyJobStart(JobId jobId, JobSpecification spec) throws HyracksException;
 
     /**
      * Notify the listener that the job has been terminated, passing exceptions in case of failure
      *
-     * @param jobId
-     * @param jobStatus
-     * @param exceptions
-     * @throws HyracksException
+     * @param jobId the job id
+     * @param spec the job specification
+     * @param jobStatus the job status
+     * @param exceptions the job exceptions
+     * @throws HyracksException HyracksException
      */
-    void notifyJobFinish(JobId jobId, JobStatus jobStatus, List<Exception> exceptions) throws HyracksException;
+    void notifyJobFinish(JobId jobId, JobSpecification spec, JobStatus jobStatus, List<Exception> exceptions)
+            throws HyracksException;
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java
index 2c51d3d..2a11d79 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java
@@ -82,6 +82,8 @@
 
     private long maxWarnings;
 
+    private String requestId;
+
     private IJobletEventListenerFactory jobletEventListenerFactory;
 
     private IGlobalJobDataFactory globalJobDataFactory;
@@ -258,6 +260,14 @@
         this.connectorPolicyAssignmentPolicy = connectorPolicyAssignmentPolicy;
     }
 
+    public void setRequestId(String requestId) {
+        this.requestId = requestId;
+    }
+
+    public String getRequestId() {
+        return requestId;
+    }
+
     public void setFrameSize(int frameSize) {
         this.frameSize = frameSize;
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCServiceContext.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCServiceContext.java
index de166dd..555d37e 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCServiceContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCServiceContext.java
@@ -37,6 +37,7 @@
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
 import org.apache.hyracks.api.service.IControllerService;
 import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.common.application.ServiceContext;
@@ -79,22 +80,23 @@
         jobLifecycleListeners.add(jobLifecycleListener);
     }
 
-    public synchronized void notifyJobStart(JobId jobId) throws HyracksException {
+    public synchronized void notifyJobStart(JobId jobId, JobSpecification spec) throws HyracksException {
         for (IJobLifecycleListener l : jobLifecycleListeners) {
-            l.notifyJobStart(jobId);
+            l.notifyJobStart(jobId, spec);
         }
     }
 
-    public synchronized void notifyJobFinish(JobId jobId, JobStatus jobStatus, List<Exception> exceptions)
-            throws HyracksException {
+    public synchronized void notifyJobFinish(JobId jobId, JobSpecification spec, JobStatus jobStatus,
+            List<Exception> exceptions) throws HyracksException {
         for (IJobLifecycleListener l : jobLifecycleListeners) {
-            l.notifyJobFinish(jobId, jobStatus, exceptions);
+            l.notifyJobFinish(jobId, spec, jobStatus, exceptions);
         }
     }
 
-    public synchronized void notifyJobCreation(JobId jobId, JobSpecification spec) throws HyracksException {
+    public synchronized void notifyJobCreation(JobId jobId, JobSpecification spec,
+            IJobCapacityController.JobSubmissionStatus status) throws HyracksException {
         for (IJobLifecycleListener l : jobLifecycleListeners) {
-            l.notifyJobCreation(jobId, spec);
+            l.notifyJobCreation(jobId, spec, status);
         }
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
index 3574acd..1b0f377 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
@@ -115,7 +115,7 @@
 
     public void startJob() throws HyracksException {
         startRunnableActivityClusters();
-        ccs.getContext().notifyJobStart(jobRun.getJobId());
+        ccs.getContext().notifyJobStart(jobRun.getJobId(), jobRun.getJobSpecification());
     }
 
     public void cancelJob(IResultCallback<Void> callback) throws HyracksException {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
index 703294c..9165953 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
@@ -122,7 +122,7 @@
         try {
             status = jobCapacityController.allocate(job);
             CCServiceContext serviceCtx = ccs.getContext();
-            serviceCtx.notifyJobCreation(jobRun.getJobId(), job);
+            serviceCtx.notifyJobCreation(jobRun.getJobId(), job, status);
             switch (status) {
                 case QUEUE:
                     queueJob(jobRun);
@@ -164,7 +164,8 @@
             CCServiceContext serviceCtx = ccs.getContext();
             if (serviceCtx != null) {
                 try {
-                    serviceCtx.notifyJobFinish(jobId, JobStatus.FAILURE_BEFORE_EXECUTION, exceptions);
+                    serviceCtx.notifyJobFinish(jobId, jobRun.getJobSpecification(), JobStatus.FAILURE_BEFORE_EXECUTION,
+                            exceptions);
                 } catch (Exception e) {
                     LOGGER.error("Exception notifying cancel on pending job {}", jobId, e);
                     throw HyracksDataException.create(e);
@@ -247,7 +248,8 @@
         Throwable caughtException = null;
         CCServiceContext serviceCtx = ccs.getContext();
         try {
-            serviceCtx.notifyJobFinish(jobId, run.getPendingStatus(), run.getPendingExceptions());
+            serviceCtx.notifyJobFinish(jobId, run.getJobSpecification(), run.getPendingStatus(),
+                    run.getPendingExceptions());
         } catch (Exception e) {
             LOGGER.error("Exception notifying job finish {}", jobId, e);
             caughtException = e;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
index 9f8a7e2..a0c2ce4 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
@@ -34,6 +34,7 @@
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
 import org.apache.hyracks.api.result.IJobResultCallback;
 import org.apache.hyracks.api.result.IResultMetadata;
 import org.apache.hyracks.api.result.IResultStateRecord;
@@ -78,7 +79,8 @@
     }
 
     @Override
-    public synchronized void notifyJobCreation(JobId jobId, JobSpecification spec) throws HyracksException {
+    public synchronized void notifyJobCreation(JobId jobId, JobSpecification spec,
+            IJobCapacityController.JobSubmissionStatus status) throws HyracksException {
         if (LOGGER.isDebugEnabled()) {
             LOGGER.debug(getClass().getSimpleName() + " notified of new job " + jobId);
         }
@@ -89,12 +91,13 @@
     }
 
     @Override
-    public synchronized void notifyJobStart(JobId jobId) throws HyracksException {
+    public synchronized void notifyJobStart(JobId jobId, JobSpecification spec) throws HyracksException {
         jobResultLocations.get(jobId).getRecord().start();
     }
 
     @Override
-    public void notifyJobFinish(JobId jobId, JobStatus jobStatus, List<Exception> exceptions) throws HyracksException {
+    public void notifyJobFinish(JobId jobId, JobSpecification spec, JobStatus jobStatus, List<Exception> exceptions)
+            throws HyracksException {
         if (exceptions == null || exceptions.isEmpty()) {
             final ResultJobRecord resultJobRecord = getResultJobRecord(jobId);
             if (resultJobRecord == null) {
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TestJobLifecycleListener.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TestJobLifecycleListener.java
index 008be29..4d4635a 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TestJobLifecycleListener.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TestJobLifecycleListener.java
@@ -30,6 +30,7 @@
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -47,7 +48,7 @@
     private final Set<JobId> finishWithoutStart = new HashSet<>();
 
     @Override
-    public void notifyJobCreation(JobId jobId, JobSpecification spec) throws HyracksException {
+    public void notifyJobCreation(JobId jobId, JobSpecification spec, IJobCapacityController.JobSubmissionStatus status) throws HyracksException {
         if (created.containsKey(jobId)) {
             LOGGER.log(Level.WARN, "Job " + jobId + "has been created before");
             increment(doubleCreated, jobId);
@@ -62,7 +63,7 @@
     }
 
     @Override
-    public void notifyJobStart(JobId jobId) throws HyracksException {
+    public void notifyJobStart(JobId jobId, JobSpecification spec) throws HyracksException {
         if (!created.containsKey(jobId)) {
             LOGGER.log(Level.WARN, "Job " + jobId + "has not been created");
             startWithoutCreate.add(jobId);
@@ -75,7 +76,7 @@
     }
 
     @Override
-    public void notifyJobFinish(JobId jobId, JobStatus jobStatus, List<Exception> exceptions) throws HyracksException {
+    public void notifyJobFinish(JobId jobId, JobSpecification spec, JobStatus jobStatus, List<Exception> exceptions) throws HyracksException {
         if (!started.contains(jobId)) {
             LOGGER.log(Level.WARN, "Job " + jobId + "has not been started");
             finishWithoutStart.add(jobId);