Merge "Merge branch 'gerrit/trinity' into 'master'"
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
index 28549f3..9f2cbdd 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
@@ -18,6 +18,9 @@
*/
package org.apache.asterix.translator;
+import static org.apache.hyracks.api.job.resource.IJobCapacityController.JobSubmissionStatus.QUEUE;
+
+import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.asterix.common.api.ICommonRequestParameters;
@@ -26,9 +29,10 @@
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.job.resource.IClusterCapacity;
-import org.apache.hyracks.control.cc.ClusterControllerService;
-import org.apache.hyracks.control.cc.job.JobRun;
+import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
+import org.apache.hyracks.api.job.resource.IReadOnlyClusterCapacity;
+import org.apache.hyracks.api.util.ExceptionUtils;
import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -37,17 +41,17 @@
protected final long creationTime = System.nanoTime();
protected final Thread executor;
protected final String statement;
- protected final ClusterControllerService ccService;
protected final String clientContextId;
+ protected final JobState jobState;
protected volatile JobId jobId;
- private String plan; // can be null
+ private volatile String plan; // can be null
- public ClientRequest(ICommonRequestParameters requestParameters, ICcApplicationContext appCtx) {
+ public ClientRequest(ICommonRequestParameters requestParameters) {
super(requestParameters.getRequestReference());
this.clientContextId = requestParameters.getClientContextId();
this.statement = requestParameters.getStatement();
this.executor = Thread.currentThread();
- this.ccService = (ClusterControllerService) appCtx.getServiceContext().getControllerService();
+ this.jobState = new JobState();
}
@Override
@@ -100,37 +104,55 @@
return json;
}
- private void putJobDetails(ObjectNode json) {
- if (jobId == null) {
- json.putNull("jobId");
- } else {
- try {
- json.put("jobId", jobId.toString());
- JobRun jobRun = ccService.getJobManager().get(jobId);
- if (jobRun != null) {
- json.put("jobStatus", String.valueOf(jobRun.getStatus()));
- putJobRequiredResources(json, jobRun);
- putTimes(json, jobRun);
- }
- } catch (Throwable th) {
- // ignore
- }
+ @Override
+ public void jobCreated(JobId jobId, IReadOnlyClusterCapacity requiredClusterCapacity,
+ IJobCapacityController.JobSubmissionStatus status) {
+ jobState.createTime = System.currentTimeMillis();
+ jobState.status = status == QUEUE ? JobStatus.PENDING : JobStatus.RUNNING;
+ jobState.requiredCPUs = requiredClusterCapacity.getAggregatedCores();
+ jobState.requiredMemoryInBytes = requiredClusterCapacity.getAggregatedMemoryByteSize();
+ }
+
+ @Override
+ public void jobStarted(JobId jobId) {
+ jobState.startTime = System.currentTimeMillis();
+ jobState.status = JobStatus.RUNNING;
+ }
+
+ @Override
+ public void jobFinished(JobId jobId, JobStatus jobStatus, List<Exception> exceptions) {
+ jobState.endTime = System.currentTimeMillis();
+ jobState.status = jobStatus;
+ if (exceptions != null && !exceptions.isEmpty()) {
+ jobState.errorMsg = processException(exceptions.get(0));
}
}
- private static void putTimes(ObjectNode json, JobRun jobRun) {
- AMutableDateTime dateTime = new AMutableDateTime(0);
- putTime(json, jobRun.getCreateTime(), "jobCreateTime", dateTime);
- putTime(json, jobRun.getStartTime(), "jobStartTime", dateTime);
- putTime(json, jobRun.getEndTime(), "jobEndTime", dateTime);
- json.put("jobQueueTime", TimeUnit.MILLISECONDS.toSeconds(jobRun.getQueueWaitTimeInMillis()));
+ protected String processException(Exception e) {
+ return ExceptionUtils.unwrap(e).getMessage();
}
- private static void putJobRequiredResources(ObjectNode json, JobRun jobRun) {
- IClusterCapacity jobCapacity = jobRun.getJobSpecification().getRequiredClusterCapacity();
- if (jobCapacity != null) {
- json.put("jobRequiredCPUs", jobCapacity.getAggregatedCores());
- json.put("jobRequiredMemory", jobCapacity.getAggregatedMemoryByteSize());
+ private void putJobDetails(ObjectNode json) {
+ try {
+ json.put("jobId", jobId != null ? jobId.toString() : null);
+ putJobState(json, jobState);
+ } catch (Throwable th) {
+ // ignore
+ }
+ }
+
+ private static void putJobState(ObjectNode json, JobState state) {
+ AMutableDateTime dateTime = new AMutableDateTime(0);
+ putTime(json, state.createTime, "jobCreateTime", dateTime);
+ putTime(json, state.startTime, "jobStartTime", dateTime);
+ putTime(json, state.endTime, "jobEndTime", dateTime);
+ long queueTime = (state.startTime > 0 ? state.startTime : System.currentTimeMillis()) - state.createTime;
+ json.put("jobQueueTime", TimeUnit.MILLISECONDS.toSeconds(queueTime));
+ json.put("jobStatus", String.valueOf(state.status));
+ json.put("jobRequiredCPUs", state.requiredCPUs);
+ json.put("jobRequiredMemory", state.requiredMemoryInBytes);
+ if (state.errorMsg != null) {
+ json.put("error", state.errorMsg);
}
}
@@ -140,4 +162,14 @@
json.put(label, dateTime.toSimpleString());
}
}
+
+ static class JobState {
+ volatile long createTime;
+ volatile long startTime;
+ volatile long endTime;
+ volatile long requiredMemoryInBytes;
+ volatile int requiredCPUs;
+ volatile JobStatus status;
+ volatile String errorMsg;
+ }
}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java
index 6ded860..524af43 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java
@@ -27,7 +27,6 @@
import org.apache.asterix.common.api.IRequestReference;
import org.apache.asterix.common.api.ISchedulableClientRequest;
import org.apache.asterix.common.api.RequestReference;
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.http.HttpHeaders;
import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -48,9 +47,8 @@
}
@Override
- public IClientRequest requestReceived(ICommonRequestParameters requestParameters, ICcApplicationContext appCtx)
- throws HyracksDataException {
- return new ClientRequest(requestParameters, appCtx);
+ public IClientRequest requestReceived(ICommonRequestParameters requestParameters) throws HyracksDataException {
+ return new ClientRequest(requestParameters);
}
@Override
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
index 165cc75..93531bd 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
@@ -166,7 +166,7 @@
return resultStatus;
}
- HttpResponseStatus getHttpStatus() {
+ public HttpResponseStatus getHttpStatus() {
return httpResponseStatus;
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
index 8821c67..4d654d5 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
@@ -41,6 +41,7 @@
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
import org.apache.hyracks.api.util.SingleThreadEventProcessor;
import org.apache.hyracks.util.ExitUtil;
import org.apache.logging.log4j.Level;
@@ -89,7 +90,8 @@
// *** IJobLifecycleListener
@Override
- public void notifyJobCreation(JobId jobId, JobSpecification jobSpecification) throws HyracksDataException {
+ public void notifyJobCreation(JobId jobId, JobSpecification jobSpecification,
+ IJobCapacityController.JobSubmissionStatus status) throws HyracksDataException {
Object property = jobSpecification.getProperty(ACTIVE_ENTITY_PROPERTY_NAME);
if (!(property instanceof EntityId)) {
if (property != null) {
@@ -119,7 +121,7 @@
}
@Override
- public synchronized void notifyJobStart(JobId jobId) throws HyracksException {
+ public synchronized void notifyJobStart(JobId jobId, JobSpecification spec) throws HyracksException {
EntityId entityId = jobId2EntityId.get(jobId);
if (entityId != null) {
add(new ActiveEvent(jobId, Kind.JOB_STARTED, entityId, null));
@@ -128,8 +130,8 @@
}
@Override
- public synchronized void notifyJobFinish(JobId jobId, JobStatus jobStatus, List<Exception> exceptions)
- throws HyracksException {
+ public synchronized void notifyJobFinish(JobId jobId, JobSpecification spec, JobStatus jobStatus,
+ List<Exception> exceptions) throws HyracksException {
EntityId entityId = jobId2EntityId.get(jobId);
if (entityId != null) {
LOGGER.debug("notified of ingestion job finish {}", jobId);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java
index 9d1e108..c85e3b0 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java
@@ -41,6 +41,7 @@
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.control.common.controllers.CCConfig;
import org.apache.hyracks.control.nc.io.IOManager;
@@ -223,7 +224,13 @@
}
@Override
- public void notifyJobCreation(JobId jobId, JobSpecification spec) throws HyracksException {
+ public void notifyJobCreation(JobId jobId, JobSpecification spec, IJobCapacityController.JobSubmissionStatus status)
+ throws HyracksException {
+
+ }
+
+ @Override
+ public void notifyJobStart(JobId jobId, JobSpecification spec) throws HyracksException {
GlobalTxInfo globalTxInfo = (GlobalTxInfo) spec.getProperty(GlOBAL_TX_PROPERTY_NAME);
if (globalTxInfo != null) {
beginTransaction(jobId, globalTxInfo.getNumNodes(), globalTxInfo.getNumPartitions(),
@@ -232,11 +239,8 @@
}
@Override
- public void notifyJobStart(JobId jobId) throws HyracksException {
+ public void notifyJobFinish(JobId jobId, JobSpecification spec, JobStatus jobStatus, List<Exception> exceptions)
+ throws HyracksException {
}
-
- @Override
- public void notifyJobFinish(JobId jobId, JobStatus jobStatus, List<Exception> exceptions) throws HyracksException {
- }
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 3035496..2ef3657 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -558,6 +558,9 @@
"Unexpected statement: " + kind);
}
}
+ } catch (Exception ex) {
+ this.appCtx.getRequestTracker().incrementFailedRequests();
+ throw ex;
} finally {
// async queries are completed after their job completes
if (ResultDelivery.ASYNC != resultDelivery) {
@@ -5409,6 +5412,7 @@
appCtx.getReceptionist().ensureSchedulable(schedulableRequest);
// ensure request not cancelled before running job
ensureNotCancelled(clientRequest);
+ jobSpec.setRequestId(clientRequest.getId());
if (atomicStatement != null) {
Dataset ds = metadataProvider.findDataset(((InsertStatement) atomicStatement).getDatabaseName(),
((InsertStatement) atomicStatement).getDataverseName(),
@@ -5576,7 +5580,7 @@
}
protected void trackRequest(IRequestParameters requestParameters) throws HyracksDataException {
- final IClientRequest clientRequest = appCtx.getReceptionist().requestReceived(requestParameters, appCtx);
+ final IClientRequest clientRequest = appCtx.getReceptionist().requestReceived(requestParameters);
this.appCtx.getRequestTracker().track(clientRequest);
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index 64c16e9..ba3ffcd 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -202,6 +202,7 @@
ccServiceCtx.setDistributedState(proxy);
MetadataManager.initialize(proxy, metadataProperties, appCtx);
ccServiceCtx.addJobLifecycleListener(appCtx.getActiveNotificationHandler());
+ ccServiceCtx.addJobLifecycleListener(appCtx.getRequestTracker());
// create event loop groups
webManager = new WebManager();
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java
index 0e063ca..673611d 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java
@@ -79,7 +79,7 @@
final RequestReference requestReference = RequestReference.of("1", "node1", System.currentTimeMillis());
RequestParameters requestParameters = new RequestParameters(requestReference, "select 1", null, null, null,
null, null, "1", null, null, null, true);
- ClientRequest request = new ClientRequest(requestParameters, appCtx);
+ ClientRequest request = new ClientRequest(requestParameters);
request.setJobId(new JobId(1));
request.markCancellable();
tracker.track(request);
@@ -96,7 +96,7 @@
final RequestReference requestReference2 = RequestReference.of("2", "node1", System.currentTimeMillis());
requestParameters = new RequestParameters(requestReference2, "select 1", null, null, null, null, null, "2",
null, null, null, true);
- ClientRequest request2 = new ClientRequest(requestParameters, appCtx);
+ ClientRequest request2 = new ClientRequest(requestParameters);
request2.setJobId(new JobId(2));
request2.markCancellable();
tracker.track(request2);
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
index 7d66760..9ff0514 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
@@ -54,6 +54,7 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -137,8 +138,9 @@
TestUserActor user = new TestUserActor("Xikui", mdProvider, null);
Action start = user.startActivity(eventsListener);
startingSubscriber.sync();
- activeJobNotificationHandler.notifyJobCreation(jobId, jobSpec);
- activeJobNotificationHandler.notifyJobStart(jobId);
+ activeJobNotificationHandler.notifyJobCreation(jobId, jobSpec,
+ IJobCapacityController.JobSubmissionStatus.EXECUTE);
+ activeJobNotificationHandler.notifyJobStart(jobId, jobSpec);
try {
eventsListener.refreshStats(1000);
} catch (HyracksDataException e) {
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestClusterControllerActor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestClusterControllerActor.java
index 52d4225..883a0cb 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestClusterControllerActor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestClusterControllerActor.java
@@ -28,6 +28,7 @@
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
import org.mockito.Mockito;
public class TestClusterControllerActor extends Actor {
@@ -49,8 +50,8 @@
JobSpecification jobSpecification = Mockito.mock(JobSpecification.class);
Mockito.when(jobSpecification.getProperty(ActiveNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME))
.thenReturn(entityId);
- handler.notifyJobCreation(jobId, jobSpecification);
- handler.notifyJobStart(jobId);
+ handler.notifyJobCreation(jobId, jobSpecification, IJobCapacityController.JobSubmissionStatus.EXECUTE);
+ handler.notifyJobStart(jobId, null);
}
};
add(startJob);
@@ -72,7 +73,7 @@
Action delivery = new Action() {
@Override
protected void doExecute(MetadataProvider actorMdProvider) throws Exception {
- handler.notifyJobFinish(jobId, jobStatus, exceptions);
+ handler.notifyJobFinish(jobId, null, jobStatus, exceptions);
}
};
add(delivery);
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
index 3157c64..8d2c91a 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
@@ -18,8 +18,14 @@
*/
package org.apache.asterix.common.api;
+import java.util.List;
+
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
+import org.apache.hyracks.api.job.resource.IReadOnlyClusterCapacity;
import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -97,4 +103,30 @@
*/
ObjectNode asJson();
+ /**
+ * Called when the job is created.
+ *
+ * @param jobId the job id
+ * @param requiredClusterCapacity the required resources by the job
+ * @param status the status of the job; whether it will be executed or queued
+ */
+ void jobCreated(JobId jobId, IReadOnlyClusterCapacity requiredClusterCapacity,
+ IJobCapacityController.JobSubmissionStatus status);
+
+ /**
+ * Called when the job starts running.
+ *
+ * @param jobId the job id
+ */
+ void jobStarted(JobId jobId);
+
+ /**
+ * Called when the job finishes.
+ *
+ * @param jobId the job id
+ * @param jobStatus the final job status
+ * @param exceptions exceptions encountered if any
+ */
+ void jobFinished(JobId jobId, JobStatus jobStatus, List<Exception> exceptions);
+
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java
index 57bff8f..04adce9 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java
@@ -18,7 +18,6 @@
*/
package org.apache.asterix.common.api;
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.http.api.IServletRequest;
@@ -36,13 +35,11 @@
/**
* Generates a {@link IClientRequest} based on the requests parameters
*
- * @param requestParameters
- * @param appCtx
+ * @param requestParameters the request parameters
* @return the client request
- * @throws HyracksDataException
+ * @throws HyracksDataException HyracksDataException
*/
- IClientRequest requestReceived(ICommonRequestParameters requestParameters, ICcApplicationContext appCtx)
- throws HyracksDataException;
+ IClientRequest requestReceived(ICommonRequestParameters requestParameters) throws HyracksDataException;
/**
* Ensures a client's request can be executed before its job is started
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestTracker.java
index 0019015..230eac3 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestTracker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestTracker.java
@@ -21,8 +21,9 @@
import java.util.Collection;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IJobLifecycleListener;
-public interface IRequestTracker {
+public interface IRequestTracker extends IJobLifecycleListener {
/**
* Starts tracking {@code request}
@@ -81,4 +82,14 @@
* @return the total number of requests since cluster start/restart
*/
long getTotalNumberOfRequests();
+
+ /**
+ * increments total number of failed requests
+ */
+ void incrementFailedRequests();
+
+ /**
+ * @return the total number of failed requests
+ */
+ long getTotalNumberOfFailedRequests();
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/NodeJobTracker.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/NodeJobTracker.java
index 56d3eea..d8a8b1a 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/NodeJobTracker.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/NodeJobTracker.java
@@ -40,6 +40,7 @@
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
import org.apache.hyracks.util.annotations.ThreadSafe;
@ThreadSafe
@@ -48,17 +49,19 @@
private final Map<String, Set<JobId>> nodeJobs = new HashMap<>();
@Override
- public synchronized void notifyJobCreation(JobId jobId, JobSpecification spec) {
+ public synchronized void notifyJobCreation(JobId jobId, JobSpecification spec,
+ IJobCapacityController.JobSubmissionStatus status) {
getJobParticipatingNodes(spec, null).stream().map(nodeJobs::get).forEach(jobsSet -> jobsSet.add(jobId));
}
@Override
- public synchronized void notifyJobStart(JobId jobId) {
+ public synchronized void notifyJobStart(JobId jobId, JobSpecification spec) {
// nothing to do
}
@Override
- public synchronized void notifyJobFinish(JobId jobId, JobStatus jobStatus, List<Exception> exceptions) {
+ public synchronized void notifyJobFinish(JobId jobId, JobSpecification spec, JobStatus jobStatus,
+ List<Exception> exceptions) {
nodeJobs.values().forEach(jobsSet -> jobsSet.remove(jobId));
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
index c9425c6..5771201 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
@@ -21,6 +21,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
@@ -31,6 +32,11 @@
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.commons.collections4.queue.CircularFifoQueue;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
public class RequestTracker implements IRequestTracker {
@@ -39,11 +45,13 @@
private final CircularFifoQueue<IClientRequest> completedRequests;
private final ICcApplicationContext ccAppCtx;
private final AtomicLong numRequests;
+ private final AtomicLong numOfFailedRequests;
public RequestTracker(ICcApplicationContext ccAppCtx) {
this.ccAppCtx = ccAppCtx;
completedRequests = new CircularFifoQueue<>(ccAppCtx.getExternalProperties().getRequestsArchiveSize());
numRequests = new AtomicLong(0);
+ numOfFailedRequests = new AtomicLong(0);
}
@Override
@@ -120,4 +128,48 @@
public long getTotalNumberOfRequests() {
return numRequests.get();
}
+
+ @Override
+ public void incrementFailedRequests() {
+ numOfFailedRequests.incrementAndGet();
+ }
+
+ @Override
+ public long getTotalNumberOfFailedRequests() {
+ return numOfFailedRequests.get();
+ }
+
+ public void notifyJobCreation(JobId jobId, JobSpecification spec, IJobCapacityController.JobSubmissionStatus status)
+ throws HyracksException {
+ String requestId = spec.getRequestId();
+ if (requestId != null) {
+ IClientRequest clientRequest = runningRequests.get(requestId);
+ if (clientRequest != null) {
+ clientRequest.jobCreated(jobId, spec.getRequiredClusterCapacity(), status);
+ }
+ }
+ }
+
+ @Override
+ public void notifyJobStart(JobId jobId, JobSpecification spec) throws HyracksException {
+ String requestId = spec.getRequestId();
+ if (requestId != null) {
+ IClientRequest clientRequest = runningRequests.get(requestId);
+ if (clientRequest != null) {
+ clientRequest.jobStarted(jobId);
+ }
+ }
+ }
+
+ @Override
+ public void notifyJobFinish(JobId jobId, JobSpecification spec, JobStatus jobStatus, List<Exception> exceptions)
+ throws HyracksException {
+ String requestId = spec.getRequestId();
+ if (requestId != null) {
+ IClientRequest clientRequest = runningRequests.get(requestId);
+ if (clientRequest != null) {
+ clientRequest.jobFinished(jobId, jobStatus, exceptions);
+ }
+ }
+ }
}
diff --git a/asterixdb/asterix-runtime/src/test/java/org/apache/asterix/runtime/job/listener/NodeJobTrackerTest.java b/asterixdb/asterix-runtime/src/test/java/org/apache/asterix/runtime/job/listener/NodeJobTrackerTest.java
index be75ecb..82d8cb7 100644
--- a/asterixdb/asterix-runtime/src/test/java/org/apache/asterix/runtime/job/listener/NodeJobTrackerTest.java
+++ b/asterixdb/asterix-runtime/src/test/java/org/apache/asterix/runtime/job/listener/NodeJobTrackerTest.java
@@ -26,6 +26,7 @@
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
@@ -50,18 +51,18 @@
jobSpec.getUserConstraints().add(new Constraint(lValueMock, unknownLocation));
JobId jobId = new JobId(1);
- nodeJobTracker.notifyJobCreation(jobId, jobSpec);
+ nodeJobTracker.notifyJobCreation(jobId, jobSpec, IJobCapacityController.JobSubmissionStatus.EXECUTE);
// make sure nc1 has a pending job
Assert.assertTrue(nodeJobTracker.getPendingJobs(nc1).size() == 1);
Assert.assertTrue(nodeJobTracker.getPendingJobs(unknown).isEmpty());
Assert.assertTrue(nodeJobTracker.getPendingJobs(nc2).isEmpty());
- nodeJobTracker.notifyJobFinish(jobId, JobStatus.TERMINATED, null);
+ nodeJobTracker.notifyJobFinish(jobId, jobSpec, JobStatus.TERMINATED, null);
// make sure nc1 doesn't have pending jobs anymore
Assert.assertTrue(nodeJobTracker.getPendingJobs(nc1).isEmpty());
// make sure node doesn't have pending jobs after failure
jobId = new JobId(2);
- nodeJobTracker.notifyJobCreation(jobId, jobSpec);
+ nodeJobTracker.notifyJobCreation(jobId, jobSpec, IJobCapacityController.JobSubmissionStatus.EXECUTE);
Assert.assertTrue(nodeJobTracker.getPendingJobs(nc1).size() == 1);
nodeJobTracker.notifyNodeFailure(Collections.singleton(nc1));
Assert.assertTrue(nodeJobTracker.getPendingJobs(nc1).isEmpty());
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobLifecycleListener.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobLifecycleListener.java
index 338c331..6773dde 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobLifecycleListener.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobLifecycleListener.java
@@ -21,6 +21,7 @@
import java.util.List;
import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
/**
* A listener for job related events
@@ -29,27 +30,33 @@
/**
* Notify the listener that a job has been created
*
- * @param jobId
- * @param spec
- * @throws HyracksException
+ * @param jobId the job id
+ * @param spec the job specification
+ * @param status the status of the job; whether it will be executed or queued
+ * @throws HyracksException HyracksException
*/
- void notifyJobCreation(JobId jobId, JobSpecification spec) throws HyracksException;
+ void notifyJobCreation(JobId jobId, JobSpecification spec, IJobCapacityController.JobSubmissionStatus status)
+ throws HyracksException;
/**
* Notify the listener that the job has started on the cluster controller
*
- * @param jobId
- * @throws HyracksException
+ * @param jobId the job id
+ * @param spec the job specification
+ *
+ * @throws HyracksException HyracksException
*/
- void notifyJobStart(JobId jobId) throws HyracksException;
+ void notifyJobStart(JobId jobId, JobSpecification spec) throws HyracksException;
/**
* Notify the listener that the job has been terminated, passing exceptions in case of failure
*
- * @param jobId
- * @param jobStatus
- * @param exceptions
- * @throws HyracksException
+ * @param jobId the job id
+ * @param spec the job specification
+ * @param jobStatus the job status
+ * @param exceptions the job exceptions
+ * @throws HyracksException HyracksException
*/
- void notifyJobFinish(JobId jobId, JobStatus jobStatus, List<Exception> exceptions) throws HyracksException;
+ void notifyJobFinish(JobId jobId, JobSpecification spec, JobStatus jobStatus, List<Exception> exceptions)
+ throws HyracksException;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java
index 8165e49..f644703 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java
@@ -82,6 +82,8 @@
private long maxWarnings;
+ private String requestId;
+
private IJobletEventListenerFactory jobletEventListenerFactory;
private IGlobalJobDataFactory globalJobDataFactory;
@@ -258,6 +260,14 @@
this.connectorPolicyAssignmentPolicy = connectorPolicyAssignmentPolicy;
}
+ public void setRequestId(String requestId) {
+ this.requestId = requestId;
+ }
+
+ public String getRequestId() {
+ return requestId;
+ }
+
public void setFrameSize(int frameSize) {
this.frameSize = frameSize;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCServiceContext.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCServiceContext.java
index de166dd..555d37e 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCServiceContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCServiceContext.java
@@ -37,6 +37,7 @@
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
import org.apache.hyracks.api.service.IControllerService;
import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.control.common.application.ServiceContext;
@@ -79,22 +80,23 @@
jobLifecycleListeners.add(jobLifecycleListener);
}
- public synchronized void notifyJobStart(JobId jobId) throws HyracksException {
+ public synchronized void notifyJobStart(JobId jobId, JobSpecification spec) throws HyracksException {
for (IJobLifecycleListener l : jobLifecycleListeners) {
- l.notifyJobStart(jobId);
+ l.notifyJobStart(jobId, spec);
}
}
- public synchronized void notifyJobFinish(JobId jobId, JobStatus jobStatus, List<Exception> exceptions)
- throws HyracksException {
+ public synchronized void notifyJobFinish(JobId jobId, JobSpecification spec, JobStatus jobStatus,
+ List<Exception> exceptions) throws HyracksException {
for (IJobLifecycleListener l : jobLifecycleListeners) {
- l.notifyJobFinish(jobId, jobStatus, exceptions);
+ l.notifyJobFinish(jobId, spec, jobStatus, exceptions);
}
}
- public synchronized void notifyJobCreation(JobId jobId, JobSpecification spec) throws HyracksException {
+ public synchronized void notifyJobCreation(JobId jobId, JobSpecification spec,
+ IJobCapacityController.JobSubmissionStatus status) throws HyracksException {
for (IJobLifecycleListener l : jobLifecycleListeners) {
- l.notifyJobCreation(jobId, spec);
+ l.notifyJobCreation(jobId, spec, status);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
index 3574acd..1b0f377 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
@@ -115,7 +115,7 @@
public void startJob() throws HyracksException {
startRunnableActivityClusters();
- ccs.getContext().notifyJobStart(jobRun.getJobId());
+ ccs.getContext().notifyJobStart(jobRun.getJobId(), jobRun.getJobSpecification());
}
public void cancelJob(IResultCallback<Void> callback) throws HyracksException {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java
index b2795d4..f65b261 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java
@@ -130,4 +130,24 @@
* @return the maximum number of jobs to queue before rejecting new jobs
*/
int getJobQueueCapacity();
+
+ /**
+ * @return total successful jobs
+ */
+ long getSuccessfulJobs();
+
+ /**
+ * @return total failed jobs
+ */
+ long getTotalFailedJobs();
+
+ /**
+ * @return total cancelled jobs
+ */
+ long getTotalCancelledJobs();
+
+ /**
+ * @return total rejected jobs
+ */
+ long getTotalRejectedJobs();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
index 4882f4a..9165953 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
@@ -30,10 +30,12 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.exceptions.IError;
import org.apache.hyracks.api.job.ActivityClusterGraph;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
@@ -66,11 +68,19 @@
private final Map<JobId, JobRun> runMapArchive;
private final Map<JobId, List<Exception>> runMapHistory;
private final IJobCapacityController jobCapacityController;
+ private final AtomicLong successfulJobs;
+ private final AtomicLong totalFailedJobs;
+ private final AtomicLong totalCancelledJobs;
+ private final AtomicLong totalRejectedJobs;
private IJobQueue jobQueue;
public JobManager(CCConfig ccConfig, ClusterControllerService ccs, IJobCapacityController jobCapacityController) {
this.ccs = ccs;
this.jobCapacityController = jobCapacityController;
+ this.successfulJobs = new AtomicLong();
+ this.totalFailedJobs = new AtomicLong();
+ this.totalCancelledJobs = new AtomicLong();
+ this.totalRejectedJobs = new AtomicLong();
try {
Constructor<?> jobQueueConstructor = this.getClass().getClassLoader().loadClass(ccConfig.getJobQueueClass())
.getConstructor(IJobManager.class, IJobCapacityController.class);
@@ -84,7 +94,7 @@
jobQueue = new FIFOJobQueue(this, jobCapacityController);
}
activeRunMap = new HashMap<>();
- runMapArchive = new LinkedHashMap<JobId, JobRun>() {
+ runMapArchive = new LinkedHashMap<>() {
private static final long serialVersionUID = -1406441385508773629L;
@Override
@@ -92,7 +102,7 @@
return size() > ccConfig.getJobHistorySize();
}
};
- runMapHistory = new LinkedHashMap<JobId, List<Exception>>() {
+ runMapHistory = new LinkedHashMap<>() {
private static final long serialVersionUID = 7572062687032652986L;
/** history size + 1 is for the case when history size = 0 */
private final int allowedSize = 100 * (ccConfig.getJobHistorySize() + 1);
@@ -108,18 +118,24 @@
public void add(JobRun jobRun) throws HyracksException {
checkJob(jobRun);
JobSpecification job = jobRun.getJobSpecification();
- IJobCapacityController.JobSubmissionStatus status = jobCapacityController.allocate(job);
- CCServiceContext serviceCtx = ccs.getContext();
- serviceCtx.notifyJobCreation(jobRun.getJobId(), job);
- switch (status) {
- case QUEUE:
- queueJob(jobRun);
- break;
- case EXECUTE:
- executeJob(jobRun);
- break;
- default:
- throw new IllegalStateException("unknown submission status: " + status);
+ IJobCapacityController.JobSubmissionStatus status;
+ try {
+ status = jobCapacityController.allocate(job);
+ CCServiceContext serviceCtx = ccs.getContext();
+ serviceCtx.notifyJobCreation(jobRun.getJobId(), job, status);
+ switch (status) {
+ case QUEUE:
+ queueJob(jobRun);
+ break;
+ case EXECUTE:
+ executeJob(jobRun);
+ break;
+ default:
+ throw new IllegalStateException("unknown submission status: " + status);
+ }
+ } catch (HyracksDataException ex) {
+ handleException(ex);
+ throw ex;
}
}
@@ -132,11 +148,13 @@
// trigger JobCleanupWork and JobCleanupNotificationWork which will update the lifecyle of the job.
// Therefore, we do not remove the job out of activeRunMap here.
jobRun.getExecutor().cancelJob(callback);
+ incrementCancelledJobs();
return;
}
// Removes a pending job.
JobRun jobRun = jobQueue.remove(jobId);
if (jobRun != null) {
+ incrementCancelledJobs();
List<Exception> exceptions =
Collections.singletonList(HyracksException.create(ErrorCode.JOB_CANCELED, jobId));
// Since the job has not been executed, we only need to update its status and lifecyle here.
@@ -146,7 +164,8 @@
CCServiceContext serviceCtx = ccs.getContext();
if (serviceCtx != null) {
try {
- serviceCtx.notifyJobFinish(jobId, JobStatus.FAILURE_BEFORE_EXECUTION, exceptions);
+ serviceCtx.notifyJobFinish(jobId, jobRun.getJobSpecification(), JobStatus.FAILURE_BEFORE_EXECUTION,
+ exceptions);
} catch (Exception e) {
LOGGER.error("Exception notifying cancel on pending job {}", jobId, e);
throw HyracksDataException.create(e);
@@ -219,11 +238,18 @@
@Override
public void finalComplete(JobRun run) throws HyracksException {
checkJob(run);
+ if (run.getPendingStatus() == JobStatus.FAILURE) {
+ incrementFailedJobs();
+ } else if (run.getPendingStatus() == JobStatus.TERMINATED) {
+ incrementSuccessfulJobs();
+ }
+
JobId jobId = run.getJobId();
Throwable caughtException = null;
CCServiceContext serviceCtx = ccs.getContext();
try {
- serviceCtx.notifyJobFinish(jobId, run.getPendingStatus(), run.getPendingExceptions());
+ serviceCtx.notifyJobFinish(jobId, run.getJobSpecification(), run.getPendingStatus(),
+ run.getPendingExceptions());
} catch (Exception e) {
LOGGER.error("Exception notifying job finish {}", jobId, e);
caughtException = e;
@@ -301,6 +327,26 @@
return ccs.getCCConfig().getJobQueueCapacity();
}
+ @Override
+ public long getSuccessfulJobs() {
+ return successfulJobs.get();
+ }
+
+ @Override
+ public long getTotalFailedJobs() {
+ return totalFailedJobs.get();
+ }
+
+ @Override
+ public long getTotalCancelledJobs() {
+ return totalCancelledJobs.get();
+ }
+
+ @Override
+ public long getTotalRejectedJobs() {
+ return totalRejectedJobs.get();
+ }
+
private void pickJobsToRun() throws HyracksException {
List<JobRun> selectedRuns = jobQueue.pull();
for (JobRun run : selectedRuns) {
@@ -356,4 +402,31 @@
final JobSpecification job = jobRun.getJobSpecification();
jobCapacityController.release(job);
}
+
+ private void handleException(HyracksException ex) {
+ if (ex.getError().isPresent()) {
+ IError error = ex.getError().get();
+ switch ((ErrorCode) error) {
+ case JOB_QUEUE_FULL:
+ case JOB_REQUIREMENTS_EXCEED_CAPACITY:
+ incrementRejectedJobs();
+ }
+ }
+ }
+
+ private void incrementSuccessfulJobs() {
+ successfulJobs.incrementAndGet();
+ }
+
+ private void incrementFailedJobs() {
+ totalFailedJobs.incrementAndGet();
+ }
+
+ private void incrementCancelledJobs() {
+ totalCancelledJobs.incrementAndGet();
+ }
+
+ private void incrementRejectedJobs() {
+ totalRejectedJobs.incrementAndGet();
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
index 9f8a7e2..a0c2ce4 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
@@ -34,6 +34,7 @@
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
import org.apache.hyracks.api.result.IJobResultCallback;
import org.apache.hyracks.api.result.IResultMetadata;
import org.apache.hyracks.api.result.IResultStateRecord;
@@ -78,7 +79,8 @@
}
@Override
- public synchronized void notifyJobCreation(JobId jobId, JobSpecification spec) throws HyracksException {
+ public synchronized void notifyJobCreation(JobId jobId, JobSpecification spec,
+ IJobCapacityController.JobSubmissionStatus status) throws HyracksException {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(getClass().getSimpleName() + " notified of new job " + jobId);
}
@@ -89,12 +91,13 @@
}
@Override
- public synchronized void notifyJobStart(JobId jobId) throws HyracksException {
+ public synchronized void notifyJobStart(JobId jobId, JobSpecification spec) throws HyracksException {
jobResultLocations.get(jobId).getRecord().start();
}
@Override
- public void notifyJobFinish(JobId jobId, JobStatus jobStatus, List<Exception> exceptions) throws HyracksException {
+ public void notifyJobFinish(JobId jobId, JobSpecification spec, JobStatus jobStatus, List<Exception> exceptions)
+ throws HyracksException {
if (exceptions == null || exceptions.isEmpty()) {
final ResultJobRecord resultJobRecord = getResultJobRecord(jobId);
if (resultJobRecord == null) {
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TestJobLifecycleListener.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TestJobLifecycleListener.java
index 008be29..19fdcfe 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TestJobLifecycleListener.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TestJobLifecycleListener.java
@@ -30,6 +30,7 @@
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -47,7 +48,8 @@
private final Set<JobId> finishWithoutStart = new HashSet<>();
@Override
- public void notifyJobCreation(JobId jobId, JobSpecification spec) throws HyracksException {
+ public void notifyJobCreation(JobId jobId, JobSpecification spec, IJobCapacityController.JobSubmissionStatus status)
+ throws HyracksException {
if (created.containsKey(jobId)) {
LOGGER.log(Level.WARN, "Job " + jobId + "has been created before");
increment(doubleCreated, jobId);
@@ -62,7 +64,7 @@
}
@Override
- public void notifyJobStart(JobId jobId) throws HyracksException {
+ public void notifyJobStart(JobId jobId, JobSpecification spec) throws HyracksException {
if (!created.containsKey(jobId)) {
LOGGER.log(Level.WARN, "Job " + jobId + "has not been created");
startWithoutCreate.add(jobId);
@@ -75,7 +77,8 @@
}
@Override
- public void notifyJobFinish(JobId jobId, JobStatus jobStatus, List<Exception> exceptions) throws HyracksException {
+ public void notifyJobFinish(JobId jobId, JobSpecification spec, JobStatus jobStatus, List<Exception> exceptions)
+ throws HyracksException {
if (!started.contains(jobId)) {
LOGGER.log(Level.WARN, "Job " + jobId + "has not been started");
finishWithoutStart.add(jobId);