[NO ISSUE]: Track more requests + jobs counts
Change-Id: I8fa31a1e6bb6b1f1bcf90c59da646fc47546fc7c
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18263
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
index f84e45a..da21769 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
@@ -166,7 +166,7 @@
return resultStatus;
}
- HttpResponseStatus getHttpStatus() {
+ public HttpResponseStatus getHttpStatus() {
return httpResponseStatus;
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 1110cd3..3cac3d8 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -529,6 +529,9 @@
"Unexpected statement: " + kind);
}
}
+ } catch (Exception ex) {
+ this.appCtx.getRequestTracker().incrementFailedRequests();
+ throw ex;
} finally {
// async queries are completed after their job completes
if (ResultDelivery.ASYNC != resultDelivery) {
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestTracker.java
index 0019015..b2dc309 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestTracker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestTracker.java
@@ -81,4 +81,14 @@
* @return the total number of requests since cluster start/restart
*/
long getTotalNumberOfRequests();
+
+ /**
+ * increments total number of failed requests
+ */
+ void incrementFailedRequests();
+
+ /**
+ * @return the total number of failed requests
+ */
+ long getTotalNumberOfFailedRequests();
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
index c9425c6..c16f825 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
@@ -39,11 +39,13 @@
private final CircularFifoQueue<IClientRequest> completedRequests;
private final ICcApplicationContext ccAppCtx;
private final AtomicLong numRequests;
+ private final AtomicLong numOfFailedRequests;
public RequestTracker(ICcApplicationContext ccAppCtx) {
this.ccAppCtx = ccAppCtx;
completedRequests = new CircularFifoQueue<>(ccAppCtx.getExternalProperties().getRequestsArchiveSize());
numRequests = new AtomicLong(0);
+ numOfFailedRequests = new AtomicLong(0);
}
@Override
@@ -120,4 +122,14 @@
public long getTotalNumberOfRequests() {
return numRequests.get();
}
+
+ @Override
+ public void incrementFailedRequests() {
+ numOfFailedRequests.incrementAndGet();
+ }
+
+ @Override
+ public long getTotalNumberOfFailedRequests() {
+ return numOfFailedRequests.get();
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java
index b2795d4..f65b261 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java
@@ -130,4 +130,24 @@
* @return the maximum number of jobs to queue before rejecting new jobs
*/
int getJobQueueCapacity();
+
+ /**
+ * @return total successful jobs
+ */
+ long getSuccessfulJobs();
+
+ /**
+ * @return total failed jobs
+ */
+ long getTotalFailedJobs();
+
+ /**
+ * @return total cancelled jobs
+ */
+ long getTotalCancelledJobs();
+
+ /**
+ * @return total rejected jobs
+ */
+ long getTotalRejectedJobs();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
index 4882f4a..703294c 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
@@ -30,10 +30,12 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.exceptions.IError;
import org.apache.hyracks.api.job.ActivityClusterGraph;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
@@ -66,11 +68,19 @@
private final Map<JobId, JobRun> runMapArchive;
private final Map<JobId, List<Exception>> runMapHistory;
private final IJobCapacityController jobCapacityController;
+ private final AtomicLong successfulJobs;
+ private final AtomicLong totalFailedJobs;
+ private final AtomicLong totalCancelledJobs;
+ private final AtomicLong totalRejectedJobs;
private IJobQueue jobQueue;
public JobManager(CCConfig ccConfig, ClusterControllerService ccs, IJobCapacityController jobCapacityController) {
this.ccs = ccs;
this.jobCapacityController = jobCapacityController;
+ this.successfulJobs = new AtomicLong();
+ this.totalFailedJobs = new AtomicLong();
+ this.totalCancelledJobs = new AtomicLong();
+ this.totalRejectedJobs = new AtomicLong();
try {
Constructor<?> jobQueueConstructor = this.getClass().getClassLoader().loadClass(ccConfig.getJobQueueClass())
.getConstructor(IJobManager.class, IJobCapacityController.class);
@@ -84,7 +94,7 @@
jobQueue = new FIFOJobQueue(this, jobCapacityController);
}
activeRunMap = new HashMap<>();
- runMapArchive = new LinkedHashMap<JobId, JobRun>() {
+ runMapArchive = new LinkedHashMap<>() {
private static final long serialVersionUID = -1406441385508773629L;
@Override
@@ -92,7 +102,7 @@
return size() > ccConfig.getJobHistorySize();
}
};
- runMapHistory = new LinkedHashMap<JobId, List<Exception>>() {
+ runMapHistory = new LinkedHashMap<>() {
private static final long serialVersionUID = 7572062687032652986L;
/** history size + 1 is for the case when history size = 0 */
private final int allowedSize = 100 * (ccConfig.getJobHistorySize() + 1);
@@ -108,18 +118,24 @@
public void add(JobRun jobRun) throws HyracksException {
checkJob(jobRun);
JobSpecification job = jobRun.getJobSpecification();
- IJobCapacityController.JobSubmissionStatus status = jobCapacityController.allocate(job);
- CCServiceContext serviceCtx = ccs.getContext();
- serviceCtx.notifyJobCreation(jobRun.getJobId(), job);
- switch (status) {
- case QUEUE:
- queueJob(jobRun);
- break;
- case EXECUTE:
- executeJob(jobRun);
- break;
- default:
- throw new IllegalStateException("unknown submission status: " + status);
+ IJobCapacityController.JobSubmissionStatus status;
+ try {
+ status = jobCapacityController.allocate(job);
+ CCServiceContext serviceCtx = ccs.getContext();
+ serviceCtx.notifyJobCreation(jobRun.getJobId(), job);
+ switch (status) {
+ case QUEUE:
+ queueJob(jobRun);
+ break;
+ case EXECUTE:
+ executeJob(jobRun);
+ break;
+ default:
+ throw new IllegalStateException("unknown submission status: " + status);
+ }
+ } catch (HyracksDataException ex) {
+ handleException(ex);
+ throw ex;
}
}
@@ -132,11 +148,13 @@
// trigger JobCleanupWork and JobCleanupNotificationWork which will update the lifecyle of the job.
// Therefore, we do not remove the job out of activeRunMap here.
jobRun.getExecutor().cancelJob(callback);
+ incrementCancelledJobs();
return;
}
// Removes a pending job.
JobRun jobRun = jobQueue.remove(jobId);
if (jobRun != null) {
+ incrementCancelledJobs();
List<Exception> exceptions =
Collections.singletonList(HyracksException.create(ErrorCode.JOB_CANCELED, jobId));
// Since the job has not been executed, we only need to update its status and lifecyle here.
@@ -219,6 +237,12 @@
@Override
public void finalComplete(JobRun run) throws HyracksException {
checkJob(run);
+ if (run.getPendingStatus() == JobStatus.FAILURE) {
+ incrementFailedJobs();
+ } else if (run.getPendingStatus() == JobStatus.TERMINATED) {
+ incrementSuccessfulJobs();
+ }
+
JobId jobId = run.getJobId();
Throwable caughtException = null;
CCServiceContext serviceCtx = ccs.getContext();
@@ -301,6 +325,26 @@
return ccs.getCCConfig().getJobQueueCapacity();
}
+ @Override
+ public long getSuccessfulJobs() {
+ return successfulJobs.get();
+ }
+
+ @Override
+ public long getTotalFailedJobs() {
+ return totalFailedJobs.get();
+ }
+
+ @Override
+ public long getTotalCancelledJobs() {
+ return totalCancelledJobs.get();
+ }
+
+ @Override
+ public long getTotalRejectedJobs() {
+ return totalRejectedJobs.get();
+ }
+
private void pickJobsToRun() throws HyracksException {
List<JobRun> selectedRuns = jobQueue.pull();
for (JobRun run : selectedRuns) {
@@ -356,4 +400,31 @@
final JobSpecification job = jobRun.getJobSpecification();
jobCapacityController.release(job);
}
+
+ private void handleException(HyracksException ex) {
+ if (ex.getError().isPresent()) {
+ IError error = ex.getError().get();
+ switch ((ErrorCode) error) {
+ case JOB_QUEUE_FULL:
+ case JOB_REQUIREMENTS_EXCEED_CAPACITY:
+ incrementRejectedJobs();
+ }
+ }
+ }
+
+ private void incrementSuccessfulJobs() {
+ successfulJobs.incrementAndGet();
+ }
+
+ private void incrementFailedJobs() {
+ totalFailedJobs.incrementAndGet();
+ }
+
+ private void incrementCancelledJobs() {
+ totalCancelledJobs.incrementAndGet();
+ }
+
+ private void incrementRejectedJobs() {
+ totalRejectedJobs.incrementAndGet();
+ }
}