Merge "Merge branch 'gerrit/trinity' into 'master'"
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
index 28549f3..9f2cbdd 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
@@ -18,6 +18,9 @@
  */
 package org.apache.asterix.translator;
 
+import static org.apache.hyracks.api.job.resource.IJobCapacityController.JobSubmissionStatus.QUEUE;
+
+import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.asterix.common.api.ICommonRequestParameters;
@@ -26,9 +29,10 @@
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.job.resource.IClusterCapacity;
-import org.apache.hyracks.control.cc.ClusterControllerService;
-import org.apache.hyracks.control.cc.job.JobRun;
+import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
+import org.apache.hyracks.api.job.resource.IReadOnlyClusterCapacity;
+import org.apache.hyracks.api.util.ExceptionUtils;
 
 import com.fasterxml.jackson.databind.node.ObjectNode;
 
@@ -37,17 +41,17 @@
     protected final long creationTime = System.nanoTime();
     protected final Thread executor;
     protected final String statement;
-    protected final ClusterControllerService ccService;
     protected final String clientContextId;
+    protected final JobState jobState;
     protected volatile JobId jobId;
-    private String plan; // can be null
+    private volatile String plan; // can be null
 
-    public ClientRequest(ICommonRequestParameters requestParameters, ICcApplicationContext appCtx) {
+    public ClientRequest(ICommonRequestParameters requestParameters) {
         super(requestParameters.getRequestReference());
         this.clientContextId = requestParameters.getClientContextId();
         this.statement = requestParameters.getStatement();
         this.executor = Thread.currentThread();
-        this.ccService = (ClusterControllerService) appCtx.getServiceContext().getControllerService();
+        this.jobState = new JobState();
     }
 
     @Override
@@ -100,37 +104,55 @@
         return json;
     }
 
-    private void putJobDetails(ObjectNode json) {
-        if (jobId == null) {
-            json.putNull("jobId");
-        } else {
-            try {
-                json.put("jobId", jobId.toString());
-                JobRun jobRun = ccService.getJobManager().get(jobId);
-                if (jobRun != null) {
-                    json.put("jobStatus", String.valueOf(jobRun.getStatus()));
-                    putJobRequiredResources(json, jobRun);
-                    putTimes(json, jobRun);
-                }
-            } catch (Throwable th) {
-                // ignore
-            }
+    @Override
+    public void jobCreated(JobId jobId, IReadOnlyClusterCapacity requiredClusterCapacity,
+            IJobCapacityController.JobSubmissionStatus status) {
+        jobState.createTime = System.currentTimeMillis();
+        jobState.status = status == QUEUE ? JobStatus.PENDING : JobStatus.RUNNING;
+        jobState.requiredCPUs = requiredClusterCapacity.getAggregatedCores();
+        jobState.requiredMemoryInBytes = requiredClusterCapacity.getAggregatedMemoryByteSize();
+    }
+
+    @Override
+    public void jobStarted(JobId jobId) {
+        jobState.startTime = System.currentTimeMillis();
+        jobState.status = JobStatus.RUNNING;
+    }
+
+    @Override
+    public void jobFinished(JobId jobId, JobStatus jobStatus, List<Exception> exceptions) {
+        jobState.endTime = System.currentTimeMillis();
+        jobState.status = jobStatus;
+        if (exceptions != null && !exceptions.isEmpty()) {
+            jobState.errorMsg = processException(exceptions.get(0));
         }
     }
 
-    private static void putTimes(ObjectNode json, JobRun jobRun) {
-        AMutableDateTime dateTime = new AMutableDateTime(0);
-        putTime(json, jobRun.getCreateTime(), "jobCreateTime", dateTime);
-        putTime(json, jobRun.getStartTime(), "jobStartTime", dateTime);
-        putTime(json, jobRun.getEndTime(), "jobEndTime", dateTime);
-        json.put("jobQueueTime", TimeUnit.MILLISECONDS.toSeconds(jobRun.getQueueWaitTimeInMillis()));
+    protected String processException(Exception e) {
+        return ExceptionUtils.unwrap(e).getMessage();
     }
 
-    private static void putJobRequiredResources(ObjectNode json, JobRun jobRun) {
-        IClusterCapacity jobCapacity = jobRun.getJobSpecification().getRequiredClusterCapacity();
-        if (jobCapacity != null) {
-            json.put("jobRequiredCPUs", jobCapacity.getAggregatedCores());
-            json.put("jobRequiredMemory", jobCapacity.getAggregatedMemoryByteSize());
+    private void putJobDetails(ObjectNode json) {
+        try {
+            json.put("jobId", jobId != null ? jobId.toString() : null);
+            putJobState(json, jobState);
+        } catch (Throwable th) {
+            // ignore
+        }
+    }
+
+    private static void putJobState(ObjectNode json, JobState state) {
+        AMutableDateTime dateTime = new AMutableDateTime(0);
+        putTime(json, state.createTime, "jobCreateTime", dateTime);
+        putTime(json, state.startTime, "jobStartTime", dateTime);
+        putTime(json, state.endTime, "jobEndTime", dateTime);
+        long queueTime = (state.startTime > 0 ? state.startTime : System.currentTimeMillis()) - state.createTime;
+        json.put("jobQueueTime", TimeUnit.MILLISECONDS.toSeconds(queueTime));
+        json.put("jobStatus", String.valueOf(state.status));
+        json.put("jobRequiredCPUs", state.requiredCPUs);
+        json.put("jobRequiredMemory", state.requiredMemoryInBytes);
+        if (state.errorMsg != null) {
+            json.put("error", state.errorMsg);
         }
     }
 
@@ -140,4 +162,14 @@
             json.put(label, dateTime.toSimpleString());
         }
     }
+
+    static class JobState {
+        volatile long createTime;
+        volatile long startTime;
+        volatile long endTime;
+        volatile long requiredMemoryInBytes;
+        volatile int requiredCPUs;
+        volatile JobStatus status;
+        volatile String errorMsg;
+    }
 }
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java
index 6ded860..524af43 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java
@@ -27,7 +27,6 @@
 import org.apache.asterix.common.api.IRequestReference;
 import org.apache.asterix.common.api.ISchedulableClientRequest;
 import org.apache.asterix.common.api.RequestReference;
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.http.HttpHeaders;
 import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -48,9 +47,8 @@
     }
 
     @Override
-    public IClientRequest requestReceived(ICommonRequestParameters requestParameters, ICcApplicationContext appCtx)
-            throws HyracksDataException {
-        return new ClientRequest(requestParameters, appCtx);
+    public IClientRequest requestReceived(ICommonRequestParameters requestParameters) throws HyracksDataException {
+        return new ClientRequest(requestParameters);
     }
 
     @Override
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
index 165cc75..93531bd 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
@@ -166,7 +166,7 @@
             return resultStatus;
         }
 
-        HttpResponseStatus getHttpStatus() {
+        public HttpResponseStatus getHttpStatus() {
             return httpResponseStatus;
         }
 
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
index 8821c67..4d654d5 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
@@ -41,6 +41,7 @@
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
 import org.apache.hyracks.api.util.SingleThreadEventProcessor;
 import org.apache.hyracks.util.ExitUtil;
 import org.apache.logging.log4j.Level;
@@ -89,7 +90,8 @@
     // *** IJobLifecycleListener
 
     @Override
-    public void notifyJobCreation(JobId jobId, JobSpecification jobSpecification) throws HyracksDataException {
+    public void notifyJobCreation(JobId jobId, JobSpecification jobSpecification,
+            IJobCapacityController.JobSubmissionStatus status) throws HyracksDataException {
         Object property = jobSpecification.getProperty(ACTIVE_ENTITY_PROPERTY_NAME);
         if (!(property instanceof EntityId)) {
             if (property != null) {
@@ -119,7 +121,7 @@
     }
 
     @Override
-    public synchronized void notifyJobStart(JobId jobId) throws HyracksException {
+    public synchronized void notifyJobStart(JobId jobId, JobSpecification spec) throws HyracksException {
         EntityId entityId = jobId2EntityId.get(jobId);
         if (entityId != null) {
             add(new ActiveEvent(jobId, Kind.JOB_STARTED, entityId, null));
@@ -128,8 +130,8 @@
     }
 
     @Override
-    public synchronized void notifyJobFinish(JobId jobId, JobStatus jobStatus, List<Exception> exceptions)
-            throws HyracksException {
+    public synchronized void notifyJobFinish(JobId jobId, JobSpecification spec, JobStatus jobStatus,
+            List<Exception> exceptions) throws HyracksException {
         EntityId entityId = jobId2EntityId.get(jobId);
         if (entityId != null) {
             LOGGER.debug("notified of ingestion job finish {}", jobId);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java
index 9d1e108..c85e3b0 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java
@@ -41,6 +41,7 @@
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
 import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.common.controllers.CCConfig;
 import org.apache.hyracks.control.nc.io.IOManager;
@@ -223,7 +224,13 @@
     }
 
     @Override
-    public void notifyJobCreation(JobId jobId, JobSpecification spec) throws HyracksException {
+    public void notifyJobCreation(JobId jobId, JobSpecification spec, IJobCapacityController.JobSubmissionStatus status)
+            throws HyracksException {
+
+    }
+
+    @Override
+    public void notifyJobStart(JobId jobId, JobSpecification spec) throws HyracksException {
         GlobalTxInfo globalTxInfo = (GlobalTxInfo) spec.getProperty(GlOBAL_TX_PROPERTY_NAME);
         if (globalTxInfo != null) {
             beginTransaction(jobId, globalTxInfo.getNumNodes(), globalTxInfo.getNumPartitions(),
@@ -232,11 +239,8 @@
     }
 
     @Override
-    public void notifyJobStart(JobId jobId) throws HyracksException {
+    public void notifyJobFinish(JobId jobId, JobSpecification spec, JobStatus jobStatus, List<Exception> exceptions)
+            throws HyracksException {
 
     }
-
-    @Override
-    public void notifyJobFinish(JobId jobId, JobStatus jobStatus, List<Exception> exceptions) throws HyracksException {
-    }
 }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 3035496..2ef3657 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -558,6 +558,9 @@
                                 "Unexpected statement: " + kind);
                 }
             }
+        } catch (Exception ex) {
+            this.appCtx.getRequestTracker().incrementFailedRequests();
+            throw ex;
         } finally {
             // async queries are completed after their job completes
             if (ResultDelivery.ASYNC != resultDelivery) {
@@ -5409,6 +5412,7 @@
             appCtx.getReceptionist().ensureSchedulable(schedulableRequest);
             // ensure request not cancelled before running job
             ensureNotCancelled(clientRequest);
+            jobSpec.setRequestId(clientRequest.getId());
             if (atomicStatement != null) {
                 Dataset ds = metadataProvider.findDataset(((InsertStatement) atomicStatement).getDatabaseName(),
                         ((InsertStatement) atomicStatement).getDataverseName(),
@@ -5576,7 +5580,7 @@
     }
 
     protected void trackRequest(IRequestParameters requestParameters) throws HyracksDataException {
-        final IClientRequest clientRequest = appCtx.getReceptionist().requestReceived(requestParameters, appCtx);
+        final IClientRequest clientRequest = appCtx.getReceptionist().requestReceived(requestParameters);
         this.appCtx.getRequestTracker().track(clientRequest);
     }
 
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index 64c16e9..ba3ffcd 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -202,6 +202,7 @@
         ccServiceCtx.setDistributedState(proxy);
         MetadataManager.initialize(proxy, metadataProperties, appCtx);
         ccServiceCtx.addJobLifecycleListener(appCtx.getActiveNotificationHandler());
+        ccServiceCtx.addJobLifecycleListener(appCtx.getRequestTracker());
 
         // create event loop groups
         webManager = new WebManager();
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java
index 0e063ca..673611d 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java
@@ -79,7 +79,7 @@
         final RequestReference requestReference = RequestReference.of("1", "node1", System.currentTimeMillis());
         RequestParameters requestParameters = new RequestParameters(requestReference, "select 1", null, null, null,
                 null, null, "1", null, null, null, true);
-        ClientRequest request = new ClientRequest(requestParameters, appCtx);
+        ClientRequest request = new ClientRequest(requestParameters);
         request.setJobId(new JobId(1));
         request.markCancellable();
         tracker.track(request);
@@ -96,7 +96,7 @@
         final RequestReference requestReference2 = RequestReference.of("2", "node1", System.currentTimeMillis());
         requestParameters = new RequestParameters(requestReference2, "select 1", null, null, null, null, null, "2",
                 null, null, null, true);
-        ClientRequest request2 = new ClientRequest(requestParameters, appCtx);
+        ClientRequest request2 = new ClientRequest(requestParameters);
         request2.setJobId(new JobId(2));
         request2.markCancellable();
         tracker.track(request2);
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
index 7d66760..9ff0514 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
@@ -54,6 +54,7 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -137,8 +138,9 @@
         TestUserActor user = new TestUserActor("Xikui", mdProvider, null);
         Action start = user.startActivity(eventsListener);
         startingSubscriber.sync();
-        activeJobNotificationHandler.notifyJobCreation(jobId, jobSpec);
-        activeJobNotificationHandler.notifyJobStart(jobId);
+        activeJobNotificationHandler.notifyJobCreation(jobId, jobSpec,
+                IJobCapacityController.JobSubmissionStatus.EXECUTE);
+        activeJobNotificationHandler.notifyJobStart(jobId, jobSpec);
         try {
             eventsListener.refreshStats(1000);
         } catch (HyracksDataException e) {
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestClusterControllerActor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestClusterControllerActor.java
index 52d4225..883a0cb 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestClusterControllerActor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestClusterControllerActor.java
@@ -28,6 +28,7 @@
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
 import org.mockito.Mockito;
 
 public class TestClusterControllerActor extends Actor {
@@ -49,8 +50,8 @@
                 JobSpecification jobSpecification = Mockito.mock(JobSpecification.class);
                 Mockito.when(jobSpecification.getProperty(ActiveNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME))
                         .thenReturn(entityId);
-                handler.notifyJobCreation(jobId, jobSpecification);
-                handler.notifyJobStart(jobId);
+                handler.notifyJobCreation(jobId, jobSpecification, IJobCapacityController.JobSubmissionStatus.EXECUTE);
+                handler.notifyJobStart(jobId, null);
             }
         };
         add(startJob);
@@ -72,7 +73,7 @@
         Action delivery = new Action() {
             @Override
             protected void doExecute(MetadataProvider actorMdProvider) throws Exception {
-                handler.notifyJobFinish(jobId, jobStatus, exceptions);
+                handler.notifyJobFinish(jobId, null, jobStatus, exceptions);
             }
         };
         add(delivery);
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
index 3157c64..8d2c91a 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
@@ -18,8 +18,14 @@
  */
 package org.apache.asterix.common.api;
 
+import java.util.List;
+
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
+import org.apache.hyracks.api.job.resource.IReadOnlyClusterCapacity;
 
 import com.fasterxml.jackson.databind.node.ObjectNode;
 
@@ -97,4 +103,30 @@
      */
     ObjectNode asJson();
 
+    /**
+     * Called when the job is created.
+     *
+     * @param jobId the job id
+     * @param requiredClusterCapacity the required resources by the job
+     * @param status the status of the job; whether it will be executed or queued
+     */
+    void jobCreated(JobId jobId, IReadOnlyClusterCapacity requiredClusterCapacity,
+            IJobCapacityController.JobSubmissionStatus status);
+
+    /**
+     * Called when the job starts running.
+     *
+     * @param jobId the job id
+     */
+    void jobStarted(JobId jobId);
+
+    /**
+     * Called when the job finishes.
+     *
+     * @param jobId the job id
+     * @param jobStatus the final job status
+     * @param exceptions exceptions encountered if any
+     */
+    void jobFinished(JobId jobId, JobStatus jobStatus, List<Exception> exceptions);
+
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java
index 57bff8f..04adce9 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java
@@ -18,7 +18,6 @@
  */
 package org.apache.asterix.common.api;
 
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.http.api.IServletRequest;
@@ -36,13 +35,11 @@
     /**
      * Generates a {@link IClientRequest} based on the requests parameters
      *
-     * @param requestParameters
-     * @param appCtx
+     * @param requestParameters the request parameters
      * @return the client request
-     * @throws HyracksDataException
+     * @throws HyracksDataException HyracksDataException
      */
-    IClientRequest requestReceived(ICommonRequestParameters requestParameters, ICcApplicationContext appCtx)
-            throws HyracksDataException;
+    IClientRequest requestReceived(ICommonRequestParameters requestParameters) throws HyracksDataException;
 
     /**
      * Ensures a client's request can be executed before its job is started
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestTracker.java
index 0019015..230eac3 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestTracker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestTracker.java
@@ -21,8 +21,9 @@
 import java.util.Collection;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IJobLifecycleListener;
 
-public interface IRequestTracker {
+public interface IRequestTracker extends IJobLifecycleListener {
 
     /**
      * Starts tracking {@code request}
@@ -81,4 +82,14 @@
      * @return the total number of requests since cluster start/restart
      */
     long getTotalNumberOfRequests();
+
+    /**
+     * increments total number of failed requests
+     */
+    void incrementFailedRequests();
+
+    /**
+     * @return the total number of failed requests
+     */
+    long getTotalNumberOfFailedRequests();
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/NodeJobTracker.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/NodeJobTracker.java
index 56d3eea..d8a8b1a 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/NodeJobTracker.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/NodeJobTracker.java
@@ -40,6 +40,7 @@
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
 import org.apache.hyracks.util.annotations.ThreadSafe;
 
 @ThreadSafe
@@ -48,17 +49,19 @@
     private final Map<String, Set<JobId>> nodeJobs = new HashMap<>();
 
     @Override
-    public synchronized void notifyJobCreation(JobId jobId, JobSpecification spec) {
+    public synchronized void notifyJobCreation(JobId jobId, JobSpecification spec,
+            IJobCapacityController.JobSubmissionStatus status) {
         getJobParticipatingNodes(spec, null).stream().map(nodeJobs::get).forEach(jobsSet -> jobsSet.add(jobId));
     }
 
     @Override
-    public synchronized void notifyJobStart(JobId jobId) {
+    public synchronized void notifyJobStart(JobId jobId, JobSpecification spec) {
         // nothing to do
     }
 
     @Override
-    public synchronized void notifyJobFinish(JobId jobId, JobStatus jobStatus, List<Exception> exceptions) {
+    public synchronized void notifyJobFinish(JobId jobId, JobSpecification spec, JobStatus jobStatus,
+            List<Exception> exceptions) {
         nodeJobs.values().forEach(jobsSet -> jobsSet.remove(jobId));
     }
 
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
index c9425c6..5771201 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
@@ -21,6 +21,7 @@
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
@@ -31,6 +32,11 @@
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.commons.collections4.queue.CircularFifoQueue;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
 
 public class RequestTracker implements IRequestTracker {
 
@@ -39,11 +45,13 @@
     private final CircularFifoQueue<IClientRequest> completedRequests;
     private final ICcApplicationContext ccAppCtx;
     private final AtomicLong numRequests;
+    private final AtomicLong numOfFailedRequests;
 
     public RequestTracker(ICcApplicationContext ccAppCtx) {
         this.ccAppCtx = ccAppCtx;
         completedRequests = new CircularFifoQueue<>(ccAppCtx.getExternalProperties().getRequestsArchiveSize());
         numRequests = new AtomicLong(0);
+        numOfFailedRequests = new AtomicLong(0);
     }
 
     @Override
@@ -120,4 +128,48 @@
     public long getTotalNumberOfRequests() {
         return numRequests.get();
     }
+
+    @Override
+    public void incrementFailedRequests() {
+        numOfFailedRequests.incrementAndGet();
+    }
+
+    @Override
+    public long getTotalNumberOfFailedRequests() {
+        return numOfFailedRequests.get();
+    }
+
+    public void notifyJobCreation(JobId jobId, JobSpecification spec, IJobCapacityController.JobSubmissionStatus status)
+            throws HyracksException {
+        String requestId = spec.getRequestId();
+        if (requestId != null) {
+            IClientRequest clientRequest = runningRequests.get(requestId);
+            if (clientRequest != null) {
+                clientRequest.jobCreated(jobId, spec.getRequiredClusterCapacity(), status);
+            }
+        }
+    }
+
+    @Override
+    public void notifyJobStart(JobId jobId, JobSpecification spec) throws HyracksException {
+        String requestId = spec.getRequestId();
+        if (requestId != null) {
+            IClientRequest clientRequest = runningRequests.get(requestId);
+            if (clientRequest != null) {
+                clientRequest.jobStarted(jobId);
+            }
+        }
+    }
+
+    @Override
+    public void notifyJobFinish(JobId jobId, JobSpecification spec, JobStatus jobStatus, List<Exception> exceptions)
+            throws HyracksException {
+        String requestId = spec.getRequestId();
+        if (requestId != null) {
+            IClientRequest clientRequest = runningRequests.get(requestId);
+            if (clientRequest != null) {
+                clientRequest.jobFinished(jobId, jobStatus, exceptions);
+            }
+        }
+    }
 }
diff --git a/asterixdb/asterix-runtime/src/test/java/org/apache/asterix/runtime/job/listener/NodeJobTrackerTest.java b/asterixdb/asterix-runtime/src/test/java/org/apache/asterix/runtime/job/listener/NodeJobTrackerTest.java
index be75ecb..82d8cb7 100644
--- a/asterixdb/asterix-runtime/src/test/java/org/apache/asterix/runtime/job/listener/NodeJobTrackerTest.java
+++ b/asterixdb/asterix-runtime/src/test/java/org/apache/asterix/runtime/job/listener/NodeJobTrackerTest.java
@@ -26,6 +26,7 @@
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
 import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.Mockito;
@@ -50,18 +51,18 @@
         jobSpec.getUserConstraints().add(new Constraint(lValueMock, unknownLocation));
 
         JobId jobId = new JobId(1);
-        nodeJobTracker.notifyJobCreation(jobId, jobSpec);
+        nodeJobTracker.notifyJobCreation(jobId, jobSpec, IJobCapacityController.JobSubmissionStatus.EXECUTE);
         // make sure nc1 has a pending job
         Assert.assertTrue(nodeJobTracker.getPendingJobs(nc1).size() == 1);
         Assert.assertTrue(nodeJobTracker.getPendingJobs(unknown).isEmpty());
         Assert.assertTrue(nodeJobTracker.getPendingJobs(nc2).isEmpty());
-        nodeJobTracker.notifyJobFinish(jobId, JobStatus.TERMINATED, null);
+        nodeJobTracker.notifyJobFinish(jobId, jobSpec, JobStatus.TERMINATED, null);
         // make sure nc1 doesn't have pending jobs anymore
         Assert.assertTrue(nodeJobTracker.getPendingJobs(nc1).isEmpty());
 
         // make sure node doesn't have pending jobs after failure
         jobId = new JobId(2);
-        nodeJobTracker.notifyJobCreation(jobId, jobSpec);
+        nodeJobTracker.notifyJobCreation(jobId, jobSpec, IJobCapacityController.JobSubmissionStatus.EXECUTE);
         Assert.assertTrue(nodeJobTracker.getPendingJobs(nc1).size() == 1);
         nodeJobTracker.notifyNodeFailure(Collections.singleton(nc1));
         Assert.assertTrue(nodeJobTracker.getPendingJobs(nc1).isEmpty());
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobLifecycleListener.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobLifecycleListener.java
index 338c331..6773dde 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobLifecycleListener.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobLifecycleListener.java
@@ -21,6 +21,7 @@
 import java.util.List;
 
 import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
 
 /**
  * A listener for job related events
@@ -29,27 +30,33 @@
     /**
      * Notify the listener that a job has been created
      *
-     * @param jobId
-     * @param spec
-     * @throws HyracksException
+     * @param jobId the job id
+     * @param spec the job specification
+     * @param status the status of the job; whether it will be executed or queued
+     * @throws HyracksException HyracksException
      */
-    void notifyJobCreation(JobId jobId, JobSpecification spec) throws HyracksException;
+    void notifyJobCreation(JobId jobId, JobSpecification spec, IJobCapacityController.JobSubmissionStatus status)
+            throws HyracksException;
 
     /**
      * Notify the listener that the job has started on the cluster controller
      *
-     * @param jobId
-     * @throws HyracksException
+     * @param jobId the job id
+     * @param spec the job specification
+     *
+     * @throws HyracksException HyracksException
      */
-    void notifyJobStart(JobId jobId) throws HyracksException;
+    void notifyJobStart(JobId jobId, JobSpecification spec) throws HyracksException;
 
     /**
      * Notify the listener that the job has been terminated, passing exceptions in case of failure
      *
-     * @param jobId
-     * @param jobStatus
-     * @param exceptions
-     * @throws HyracksException
+     * @param jobId the job id
+     * @param spec the job specification
+     * @param jobStatus the job status
+     * @param exceptions the job exceptions
+     * @throws HyracksException HyracksException
      */
-    void notifyJobFinish(JobId jobId, JobStatus jobStatus, List<Exception> exceptions) throws HyracksException;
+    void notifyJobFinish(JobId jobId, JobSpecification spec, JobStatus jobStatus, List<Exception> exceptions)
+            throws HyracksException;
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java
index 8165e49..f644703 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java
@@ -82,6 +82,8 @@
 
     private long maxWarnings;
 
+    private String requestId;
+
     private IJobletEventListenerFactory jobletEventListenerFactory;
 
     private IGlobalJobDataFactory globalJobDataFactory;
@@ -258,6 +260,14 @@
         this.connectorPolicyAssignmentPolicy = connectorPolicyAssignmentPolicy;
     }
 
+    public void setRequestId(String requestId) {
+        this.requestId = requestId;
+    }
+
+    public String getRequestId() {
+        return requestId;
+    }
+
     public void setFrameSize(int frameSize) {
         this.frameSize = frameSize;
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCServiceContext.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCServiceContext.java
index de166dd..555d37e 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCServiceContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCServiceContext.java
@@ -37,6 +37,7 @@
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
 import org.apache.hyracks.api.service.IControllerService;
 import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.common.application.ServiceContext;
@@ -79,22 +80,23 @@
         jobLifecycleListeners.add(jobLifecycleListener);
     }
 
-    public synchronized void notifyJobStart(JobId jobId) throws HyracksException {
+    public synchronized void notifyJobStart(JobId jobId, JobSpecification spec) throws HyracksException {
         for (IJobLifecycleListener l : jobLifecycleListeners) {
-            l.notifyJobStart(jobId);
+            l.notifyJobStart(jobId, spec);
         }
     }
 
-    public synchronized void notifyJobFinish(JobId jobId, JobStatus jobStatus, List<Exception> exceptions)
-            throws HyracksException {
+    public synchronized void notifyJobFinish(JobId jobId, JobSpecification spec, JobStatus jobStatus,
+            List<Exception> exceptions) throws HyracksException {
         for (IJobLifecycleListener l : jobLifecycleListeners) {
-            l.notifyJobFinish(jobId, jobStatus, exceptions);
+            l.notifyJobFinish(jobId, spec, jobStatus, exceptions);
         }
     }
 
-    public synchronized void notifyJobCreation(JobId jobId, JobSpecification spec) throws HyracksException {
+    public synchronized void notifyJobCreation(JobId jobId, JobSpecification spec,
+            IJobCapacityController.JobSubmissionStatus status) throws HyracksException {
         for (IJobLifecycleListener l : jobLifecycleListeners) {
-            l.notifyJobCreation(jobId, spec);
+            l.notifyJobCreation(jobId, spec, status);
         }
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
index 3574acd..1b0f377 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
@@ -115,7 +115,7 @@
 
     public void startJob() throws HyracksException {
         startRunnableActivityClusters();
-        ccs.getContext().notifyJobStart(jobRun.getJobId());
+        ccs.getContext().notifyJobStart(jobRun.getJobId(), jobRun.getJobSpecification());
     }
 
     public void cancelJob(IResultCallback<Void> callback) throws HyracksException {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java
index b2795d4..f65b261 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java
@@ -130,4 +130,24 @@
      * @return the maximum number of jobs to queue before rejecting new jobs
      */
     int getJobQueueCapacity();
+
+    /**
+     * @return total successful jobs
+     */
+    long getSuccessfulJobs();
+
+    /**
+     * @return total failed jobs
+     */
+    long getTotalFailedJobs();
+
+    /**
+     * @return total cancelled jobs
+     */
+    long getTotalCancelledJobs();
+
+    /**
+     * @return total rejected jobs
+     */
+    long getTotalRejectedJobs();
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
index 4882f4a..9165953 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
@@ -30,10 +30,12 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.exceptions.IError;
 import org.apache.hyracks.api.job.ActivityClusterGraph;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
@@ -66,11 +68,19 @@
     private final Map<JobId, JobRun> runMapArchive;
     private final Map<JobId, List<Exception>> runMapHistory;
     private final IJobCapacityController jobCapacityController;
+    private final AtomicLong successfulJobs;
+    private final AtomicLong totalFailedJobs;
+    private final AtomicLong totalCancelledJobs;
+    private final AtomicLong totalRejectedJobs;
     private IJobQueue jobQueue;
 
     public JobManager(CCConfig ccConfig, ClusterControllerService ccs, IJobCapacityController jobCapacityController) {
         this.ccs = ccs;
         this.jobCapacityController = jobCapacityController;
+        this.successfulJobs = new AtomicLong();
+        this.totalFailedJobs = new AtomicLong();
+        this.totalCancelledJobs = new AtomicLong();
+        this.totalRejectedJobs = new AtomicLong();
         try {
             Constructor<?> jobQueueConstructor = this.getClass().getClassLoader().loadClass(ccConfig.getJobQueueClass())
                     .getConstructor(IJobManager.class, IJobCapacityController.class);
@@ -84,7 +94,7 @@
             jobQueue = new FIFOJobQueue(this, jobCapacityController);
         }
         activeRunMap = new HashMap<>();
-        runMapArchive = new LinkedHashMap<JobId, JobRun>() {
+        runMapArchive = new LinkedHashMap<>() {
             private static final long serialVersionUID = -1406441385508773629L;
 
             @Override
@@ -92,7 +102,7 @@
                 return size() > ccConfig.getJobHistorySize();
             }
         };
-        runMapHistory = new LinkedHashMap<JobId, List<Exception>>() {
+        runMapHistory = new LinkedHashMap<>() {
             private static final long serialVersionUID = 7572062687032652986L;
             /** history size + 1 is for the case when history size = 0 */
             private final int allowedSize = 100 * (ccConfig.getJobHistorySize() + 1);
@@ -108,18 +118,24 @@
     public void add(JobRun jobRun) throws HyracksException {
         checkJob(jobRun);
         JobSpecification job = jobRun.getJobSpecification();
-        IJobCapacityController.JobSubmissionStatus status = jobCapacityController.allocate(job);
-        CCServiceContext serviceCtx = ccs.getContext();
-        serviceCtx.notifyJobCreation(jobRun.getJobId(), job);
-        switch (status) {
-            case QUEUE:
-                queueJob(jobRun);
-                break;
-            case EXECUTE:
-                executeJob(jobRun);
-                break;
-            default:
-                throw new IllegalStateException("unknown submission status: " + status);
+        IJobCapacityController.JobSubmissionStatus status;
+        try {
+            status = jobCapacityController.allocate(job);
+            CCServiceContext serviceCtx = ccs.getContext();
+            serviceCtx.notifyJobCreation(jobRun.getJobId(), job, status);
+            switch (status) {
+                case QUEUE:
+                    queueJob(jobRun);
+                    break;
+                case EXECUTE:
+                    executeJob(jobRun);
+                    break;
+                default:
+                    throw new IllegalStateException("unknown submission status: " + status);
+            }
+        } catch (HyracksDataException ex) {
+            handleException(ex);
+            throw ex;
         }
     }
 
@@ -132,11 +148,13 @@
             // trigger JobCleanupWork and JobCleanupNotificationWork which will update the lifecyle of the job.
             // Therefore, we do not remove the job out of activeRunMap here.
             jobRun.getExecutor().cancelJob(callback);
+            incrementCancelledJobs();
             return;
         }
         // Removes a pending job.
         JobRun jobRun = jobQueue.remove(jobId);
         if (jobRun != null) {
+            incrementCancelledJobs();
             List<Exception> exceptions =
                     Collections.singletonList(HyracksException.create(ErrorCode.JOB_CANCELED, jobId));
             // Since the job has not been executed, we only need to update its status and lifecyle here.
@@ -146,7 +164,8 @@
             CCServiceContext serviceCtx = ccs.getContext();
             if (serviceCtx != null) {
                 try {
-                    serviceCtx.notifyJobFinish(jobId, JobStatus.FAILURE_BEFORE_EXECUTION, exceptions);
+                    serviceCtx.notifyJobFinish(jobId, jobRun.getJobSpecification(), JobStatus.FAILURE_BEFORE_EXECUTION,
+                            exceptions);
                 } catch (Exception e) {
                     LOGGER.error("Exception notifying cancel on pending job {}", jobId, e);
                     throw HyracksDataException.create(e);
@@ -219,11 +238,18 @@
     @Override
     public void finalComplete(JobRun run) throws HyracksException {
         checkJob(run);
+        if (run.getPendingStatus() == JobStatus.FAILURE) {
+            incrementFailedJobs();
+        } else if (run.getPendingStatus() == JobStatus.TERMINATED) {
+            incrementSuccessfulJobs();
+        }
+
         JobId jobId = run.getJobId();
         Throwable caughtException = null;
         CCServiceContext serviceCtx = ccs.getContext();
         try {
-            serviceCtx.notifyJobFinish(jobId, run.getPendingStatus(), run.getPendingExceptions());
+            serviceCtx.notifyJobFinish(jobId, run.getJobSpecification(), run.getPendingStatus(),
+                    run.getPendingExceptions());
         } catch (Exception e) {
             LOGGER.error("Exception notifying job finish {}", jobId, e);
             caughtException = e;
@@ -301,6 +327,26 @@
         return ccs.getCCConfig().getJobQueueCapacity();
     }
 
+    @Override
+    public long getSuccessfulJobs() {
+        return successfulJobs.get();
+    }
+
+    @Override
+    public long getTotalFailedJobs() {
+        return totalFailedJobs.get();
+    }
+
+    @Override
+    public long getTotalCancelledJobs() {
+        return totalCancelledJobs.get();
+    }
+
+    @Override
+    public long getTotalRejectedJobs() {
+        return totalRejectedJobs.get();
+    }
+
     private void pickJobsToRun() throws HyracksException {
         List<JobRun> selectedRuns = jobQueue.pull();
         for (JobRun run : selectedRuns) {
@@ -356,4 +402,31 @@
         final JobSpecification job = jobRun.getJobSpecification();
         jobCapacityController.release(job);
     }
+
+    private void handleException(HyracksException ex) {
+        if (ex.getError().isPresent()) {
+            IError error = ex.getError().get();
+            switch ((ErrorCode) error) {
+                case JOB_QUEUE_FULL:
+                case JOB_REQUIREMENTS_EXCEED_CAPACITY:
+                    incrementRejectedJobs();
+            }
+        }
+    }
+
+    private void incrementSuccessfulJobs() {
+        successfulJobs.incrementAndGet();
+    }
+
+    private void incrementFailedJobs() {
+        totalFailedJobs.incrementAndGet();
+    }
+
+    private void incrementCancelledJobs() {
+        totalCancelledJobs.incrementAndGet();
+    }
+
+    private void incrementRejectedJobs() {
+        totalRejectedJobs.incrementAndGet();
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
index 9f8a7e2..a0c2ce4 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
@@ -34,6 +34,7 @@
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
 import org.apache.hyracks.api.result.IJobResultCallback;
 import org.apache.hyracks.api.result.IResultMetadata;
 import org.apache.hyracks.api.result.IResultStateRecord;
@@ -78,7 +79,8 @@
     }
 
     @Override
-    public synchronized void notifyJobCreation(JobId jobId, JobSpecification spec) throws HyracksException {
+    public synchronized void notifyJobCreation(JobId jobId, JobSpecification spec,
+            IJobCapacityController.JobSubmissionStatus status) throws HyracksException {
         if (LOGGER.isDebugEnabled()) {
             LOGGER.debug(getClass().getSimpleName() + " notified of new job " + jobId);
         }
@@ -89,12 +91,13 @@
     }
 
     @Override
-    public synchronized void notifyJobStart(JobId jobId) throws HyracksException {
+    public synchronized void notifyJobStart(JobId jobId, JobSpecification spec) throws HyracksException {
         jobResultLocations.get(jobId).getRecord().start();
     }
 
     @Override
-    public void notifyJobFinish(JobId jobId, JobStatus jobStatus, List<Exception> exceptions) throws HyracksException {
+    public void notifyJobFinish(JobId jobId, JobSpecification spec, JobStatus jobStatus, List<Exception> exceptions)
+            throws HyracksException {
         if (exceptions == null || exceptions.isEmpty()) {
             final ResultJobRecord resultJobRecord = getResultJobRecord(jobId);
             if (resultJobRecord == null) {
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TestJobLifecycleListener.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TestJobLifecycleListener.java
index 008be29..19fdcfe 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TestJobLifecycleListener.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TestJobLifecycleListener.java
@@ -30,6 +30,7 @@
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -47,7 +48,8 @@
     private final Set<JobId> finishWithoutStart = new HashSet<>();
 
     @Override
-    public void notifyJobCreation(JobId jobId, JobSpecification spec) throws HyracksException {
+    public void notifyJobCreation(JobId jobId, JobSpecification spec, IJobCapacityController.JobSubmissionStatus status)
+            throws HyracksException {
         if (created.containsKey(jobId)) {
             LOGGER.log(Level.WARN, "Job " + jobId + "has been created before");
             increment(doubleCreated, jobId);
@@ -62,7 +64,7 @@
     }
 
     @Override
-    public void notifyJobStart(JobId jobId) throws HyracksException {
+    public void notifyJobStart(JobId jobId, JobSpecification spec) throws HyracksException {
         if (!created.containsKey(jobId)) {
             LOGGER.log(Level.WARN, "Job " + jobId + "has not been created");
             startWithoutCreate.add(jobId);
@@ -75,7 +77,8 @@
     }
 
     @Override
-    public void notifyJobFinish(JobId jobId, JobStatus jobStatus, List<Exception> exceptions) throws HyracksException {
+    public void notifyJobFinish(JobId jobId, JobSpecification spec, JobStatus jobStatus, List<Exception> exceptions)
+            throws HyracksException {
         if (!started.contains(jobId)) {
             LOGGER.log(Level.WARN, "Job " + jobId + "has not been started");
             finishWithoutStart.add(jobId);