[ASTERIXDB-3460][*DB][HYR] Do not run jobs while cluster is not ACTIVE
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
Do not run jobs while cluster is not ACTIVE.
- Allow certain jobs to run regardless of cluster state.
Ext-ref: MB-62635
Change-Id: I9a027e46a9067e18e2fd5de57112f0d15addc702
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18488
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Tested-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java
index bb428fa6..d8775c8 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java
@@ -282,7 +282,7 @@
JobSpecification jobSpec = compiler.createJob(appCtx, new JobEventListenerFactory(newTxnId, false));
- JobId jobId = JobUtils.runJob(appCtx.getHcc(), jobSpec, true);
+ JobId jobId = JobUtils.runJobIfActive(appCtx.getHcc(), jobSpec, true);
IResultSetReader resultSetReader = appCtx.getResultSet().createReader(jobId, resultSetId);
FrameManager frameManager = new FrameManager(queryOptCtx.getPhysicalOptimizationConfig().getFrameSize());
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
index 3a81c09..4674f7e 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
@@ -107,7 +107,7 @@
// TODO(Yingyi): currently we do not check IFrameWriter protocol violations for Feed jobs.
// We will need to design general exception handling mechanism for feeds.
setLocations(jobInfo.getRight());
- return JobUtils.runJob(hcc, feedJob, false);
+ return JobUtils.runJobIfActive(hcc, feedJob, false);
} catch (Exception e) {
throw HyracksDataException.create(e);
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index ad74d6c..0c736a2 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -302,7 +302,7 @@
protected final APIFramework apiFramework;
protected final IRewriterFactory rewriterFactory;
protected final ExecutorService executorService;
- protected final EnumSet<JobFlag> jobFlags = EnumSet.noneOf(JobFlag.class);
+ protected final EnumSet<JobFlag> jobFlags = EnumSet.of(JobFlag.ENSURE_RUNNABLE);
protected final IMetadataLockManager lockManager;
protected final IMetadataLockUtil lockUtil;
protected final IResponsePrinter responsePrinter;
@@ -2435,7 +2435,7 @@
requestParameters.isForceDropDataset());
}
for (JobSpecification jobSpec : jobsToExecute) {
- JobUtils.runJob(hcc, jobSpec, true);
+ JobUtils.runJobIfActive(hcc, jobSpec, true);
}
} catch (Exception e2) {
// do no throw exception since still the metadata needs to be compensated.
@@ -4298,7 +4298,7 @@
private static JobId runTrackJob(IHyracksClientConnection hcc, JobSpecification jobSpec, EnumSet<JobFlag> jobFlags,
String reqId, String clientCtxId, ClientRequest clientRequest) throws Exception {
jobSpec.setRequestId(reqId);
- JobId jobId = JobUtils.runJob(hcc, jobSpec, jobFlags, false);
+ JobId jobId = JobUtils.runJobIfActive(hcc, jobSpec, jobFlags, false);
LOGGER.info("Created job {} for uuid:{}, clientContextID:{}", jobId, reqId, clientCtxId);
clientRequest.setJobId(jobId);
return jobId;
@@ -5407,12 +5407,12 @@
private static void runJob(IHyracksClientConnection hcc, JobSpecification jobSpec, EnumSet<JobFlag> jobFlags)
throws Exception {
- JobUtils.runJob(hcc, jobSpec, jobFlags, true);
+ JobUtils.runJobIfActive(hcc, jobSpec, jobFlags, true);
}
private static List<IOperatorStats> runJob(IHyracksClientConnection hcc, JobSpecification jobSpec,
EnumSet<JobFlag> jobFlags, List<String> statOperatorNames) throws Exception {
- Pair<JobId, List<IOperatorStats>> p = JobUtils.runJob(hcc, jobSpec, jobFlags, true, statOperatorNames);
+ Pair<JobId, List<IOperatorStats>> p = JobUtils.runJobIfActive(hcc, jobSpec, jobFlags, true, statOperatorNames);
return p.second;
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index 9724174..0b7e7d0 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -22,6 +22,8 @@
import static org.apache.asterix.algebra.base.ILangExtension.Language.SQLPP;
import static org.apache.asterix.api.http.server.ServletConstants.ASTERIX_APP_CONTEXT_INFO_ATTR;
import static org.apache.asterix.api.http.server.ServletConstants.HYRACKS_CONNECTION_ATTR;
+import static org.apache.asterix.common.api.IClusterManagementWork.ClusterState.ACTIVE;
+import static org.apache.asterix.common.api.IClusterManagementWork.ClusterState.REBALANCE_REQUIRED;
import static org.apache.asterix.common.api.IClusterManagementWork.ClusterState.SHUTTING_DOWN;
import static org.apache.hyracks.control.common.controllers.ControllerConfig.Option.CLOUD_DEPLOYMENT;
@@ -35,6 +37,7 @@
import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
+import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import org.apache.asterix.api.http.IQueryWebServerRegistrant;
@@ -69,6 +72,7 @@
import org.apache.asterix.common.api.INamespaceResolver;
import org.apache.asterix.common.api.INodeJobTracker;
import org.apache.asterix.common.api.IReceptionistFactory;
+import org.apache.asterix.common.cluster.IClusterStateManager;
import org.apache.asterix.common.cluster.IGlobalRecoveryManager;
import org.apache.asterix.common.cluster.IGlobalTxManager;
import org.apache.asterix.common.config.AsterixExtension;
@@ -111,6 +115,7 @@
import org.apache.hyracks.api.control.IGatekeeper;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.IODeviceHandle;
+import org.apache.hyracks.api.job.JobFlag;
import org.apache.hyracks.api.job.resource.IJobCapacityController;
import org.apache.hyracks.api.lifecycle.LifeCycleComponentManager;
import org.apache.hyracks.api.result.IJobResultCallback;
@@ -212,7 +217,7 @@
ccServiceCtx.addClusterLifecycleListener(nodeJobTracker);
ccServiceCtx.addJobLifecycleListener(globalTxManager);
- jobCapacityController = new JobCapacityController(controllerService.getResourceManager());
+ jobCapacityController = new JobCapacityController(controllerService.getResourceManager(), this);
}
protected INamespaceResolver createNamespaceResolver(boolean useDatabaseResolution) {
@@ -441,4 +446,14 @@
public IJobResultCallback getJobResultCallback() {
return new JobResultCallback(appCtx);
}
+
+ @Override
+ public boolean acceptingJobs(Set<JobFlag> flags) {
+ // flags == null should not be needed since currently it's not null (but not enforced)
+ if (flags == null || !flags.contains(JobFlag.ENSURE_RUNNABLE)) {
+ return true;
+ }
+ IClusterStateManager csm = appCtx.getClusterStateManager();
+ return csm.getState() == ACTIVE || csm.getState() == REBALANCE_REQUIRED;
+ }
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
index ed9d0c6..08972d4 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
@@ -40,8 +40,6 @@
import org.apache.asterix.metadata.utils.DatasetUtil;
import org.apache.hyracks.api.application.ICCServiceContext;
import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.util.ExitUtil;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -75,12 +73,6 @@
return Collections.emptySet();
}
- private void executeHyracksJob(JobSpecification spec) throws Exception {
- spec.setMaxReattempts(0);
- JobId jobId = hcc.startJob(spec);
- hcc.waitForCompletion(jobId);
- }
-
@Override
public void startGlobalRecovery(ICcApplicationContext appCtx) {
if (!recoveryCompleted && !recovering) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FlushDatasetUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FlushDatasetUtil.java
index f38c18b..1c17704 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FlushDatasetUtil.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FlushDatasetUtil.java
@@ -75,7 +75,7 @@
JobEventListenerFactory jobEventListenerFactory = new JobEventListenerFactory(txnId, true);
spec.setJobletEventListenerFactory(jobEventListenerFactory);
- JobUtils.runJob(hcc, spec, true);
+ JobUtils.runJobIfActive(hcc, spec, true);
}
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
index 180b6cc..cc9f1ad 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
@@ -305,7 +305,7 @@
private static void createRebalanceTarget(Dataset target, MetadataProvider metadataProvider,
IHyracksClientConnection hcc) throws Exception {
JobSpecification spec = DatasetUtil.createDatasetJobSpec(target, metadataProvider);
- JobUtils.runJob(hcc, spec, true);
+ JobUtils.forceRunJob(hcc, spec, true);
}
// Populates the data from the source dataset to the rebalance target dataset.
@@ -348,7 +348,7 @@
spec.connect(new OneToOneConnectorDescriptor(spec), upsertOp, 0, commitOp, 0);
// Executes the job.
- JobUtils.runJob(hcc, spec, true);
+ JobUtils.forceRunJob(hcc, spec, true);
}
private static ITupleProjectorFactory createTupleProjectorFactory(Dataset source, MetadataProvider metadataProvider)
@@ -403,7 +403,7 @@
EnumSet.of(DropOption.IF_EXISTS, DropOption.WAIT_ON_IN_USE), null));
}
for (JobSpecification jobSpec : jobs) {
- JobUtils.runJob(hcc, jobSpec, true);
+ JobUtils.forceRunJob(hcc, jobSpec, true);
}
}
@@ -427,12 +427,12 @@
// Creates the secondary index.
JobSpecification indexCreationJobSpec =
IndexUtil.buildSecondaryIndexCreationJobSpec(target, index, metadataProvider, null);
- JobUtils.runJob(hcc, indexCreationJobSpec, true);
+ JobUtils.forceRunJob(hcc, indexCreationJobSpec, true);
// Loads the secondary index.
JobSpecification indexLoadingJobSpec =
IndexUtil.buildSecondaryIndexLoadingJobSpec(target, index, metadataProvider, null);
- JobUtils.runJob(hcc, indexLoadingJobSpec, true);
+ JobUtils.forceRunJob(hcc, indexLoadingJobSpec, true);
}
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/JobUtils.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/JobUtils.java
index c1c6f18..e3f57f3 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/JobUtils.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/JobUtils.java
@@ -36,12 +36,28 @@
ADDED_PENDINGOP_RECORD_TO_METADATA
}
- public static JobId runJob(IHyracksClientConnection hcc, JobSpecification spec, boolean waitForCompletion)
+ public static JobId forceRunJob(IHyracksClientConnection hcc, JobSpecification spec, boolean waitForCompletion)
throws Exception {
return runJob(hcc, spec, EnumSet.noneOf(JobFlag.class), waitForCompletion);
}
- public static JobId runJob(IHyracksClientConnection hcc, JobSpecification spec, EnumSet<JobFlag> jobFlags,
+ public static JobId runJobIfActive(IHyracksClientConnection hcc, JobSpecification spec, boolean waitForCompletion)
+ throws Exception {
+ return runJob(hcc, spec, EnumSet.of(JobFlag.ENSURE_RUNNABLE), waitForCompletion);
+ }
+
+ public static JobId runJobIfActive(IHyracksClientConnection hcc, JobSpecification spec, EnumSet<JobFlag> jobFlags,
+ boolean waitForCompletion) throws Exception {
+ if (jobFlags.contains(JobFlag.ENSURE_RUNNABLE)) {
+ return runJob(hcc, spec, jobFlags, waitForCompletion);
+ } else {
+ EnumSet<JobFlag> flags = EnumSet.copyOf(jobFlags);
+ flags.add(JobFlag.ENSURE_RUNNABLE);
+ return runJob(hcc, spec, flags, waitForCompletion);
+ }
+ }
+
+ private static JobId runJob(IHyracksClientConnection hcc, JobSpecification spec, EnumSet<JobFlag> jobFlags,
boolean waitForCompletion) throws Exception {
spec.setMaxReattempts(0);
final JobId jobId = hcc.startJob(spec, jobFlags);
@@ -57,8 +73,12 @@
return jobId;
}
- public static Pair<JobId, List<IOperatorStats>> runJob(IHyracksClientConnection hcc, JobSpecification spec,
+ public static Pair<JobId, List<IOperatorStats>> runJobIfActive(IHyracksClientConnection hcc, JobSpecification spec,
EnumSet<JobFlag> jobFlags, boolean waitForCompletion, List<String> statOperatorNames) throws Exception {
+ if (!jobFlags.contains(JobFlag.ENSURE_RUNNABLE)) {
+ jobFlags = EnumSet.copyOf(jobFlags);
+ jobFlags.add(JobFlag.ENSURE_RUNNABLE);
+ }
spec.setMaxReattempts(0);
final JobId jobId = hcc.startJob(spec, jobFlags);
List<IOperatorStats> opStats = null;
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
index 9aa5d84..132efbc 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
@@ -403,7 +403,7 @@
// #. run the jobs
for (JobSpecification jobSpec : jobsToExecute) {
- JobUtils.runJob(hcc, jobSpec, true);
+ JobUtils.runJobIfActive(hcc, jobSpec, true);
}
mdTxnCtx.setValue(MetadataManager.INSTANCE.beginTransaction());
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java
index ae903d1..236056c 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java
@@ -19,8 +19,14 @@
package org.apache.asterix.runtime.job.resource;
+import java.util.Set;
+
+import org.apache.hyracks.api.application.ICCApplication;
import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.JobFlag;
+import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.api.job.resource.IClusterCapacity;
import org.apache.hyracks.api.job.resource.IJobCapacityController;
@@ -36,13 +42,19 @@
private static final Logger LOGGER = LogManager.getLogger();
private final IResourceManager resourceManager;
+ private final ICCApplication ccApp;
- public JobCapacityController(IResourceManager resourceManager) {
+ public JobCapacityController(IResourceManager resourceManager, ICCApplication ccApp) {
this.resourceManager = resourceManager;
+ this.ccApp = ccApp;
}
@Override
- public JobSubmissionStatus allocate(JobSpecification job) throws HyracksException {
+ public JobSubmissionStatus allocate(JobSpecification job, JobId jobId, Set<JobFlag> jobFlags)
+ throws HyracksException {
+ if (!ccApp.acceptingJobs(jobFlags)) {
+ throw HyracksDataException.create(ErrorCode.JOB_REJECTED, job);
+ }
IClusterCapacity requiredCapacity = job.getRequiredClusterCapacity();
long reqAggregatedMemoryByteSize = requiredCapacity.getAggregatedMemoryByteSize();
int reqAggregatedNumCores = requiredCapacity.getAggregatedCores();
diff --git a/asterixdb/asterix-runtime/src/test/java/org/apache/asterix/runtime/job/resource/JobCapacityControllerTest.java b/asterixdb/asterix-runtime/src/test/java/org/apache/asterix/runtime/job/resource/JobCapacityControllerTest.java
index 48c61b4..e70b306 100644
--- a/asterixdb/asterix-runtime/src/test/java/org/apache/asterix/runtime/job/resource/JobCapacityControllerTest.java
+++ b/asterixdb/asterix-runtime/src/test/java/org/apache/asterix/runtime/job/resource/JobCapacityControllerTest.java
@@ -22,8 +22,13 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import java.util.EnumSet;
+
+import org.apache.hyracks.api.application.ICCApplication;
import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.JobFlag;
+import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.api.job.resource.ClusterCapacity;
import org.apache.hyracks.api.job.resource.IClusterCapacity;
@@ -36,31 +41,34 @@
public class JobCapacityControllerTest {
+ private static final EnumSet<JobFlag> none = EnumSet.noneOf(JobFlag.class);
+
@Test
public void test() throws HyracksException {
+ JobId jobId = new JobId(0);
IResourceManager resourceManager = makeResourceManagerWithCapacity(4294967296L, 33);
- JobCapacityController capacityController = new JobCapacityController(resourceManager);
+ JobCapacityController capacityController = new JobCapacityController(resourceManager, makeCCApp());
// Verifies the correctness of the allocate method.
- Assert.assertTrue(capacityController.allocate(
- makeJobWithRequiredCapacity(4294967296L, 16)) == IJobCapacityController.JobSubmissionStatus.EXECUTE);
- Assert.assertTrue(capacityController.allocate(
- makeJobWithRequiredCapacity(2147483648L, 16)) == IJobCapacityController.JobSubmissionStatus.QUEUE);
- Assert.assertTrue(capacityController.allocate(
- makeJobWithRequiredCapacity(2147483648L, 32)) == IJobCapacityController.JobSubmissionStatus.QUEUE);
+ Assert.assertTrue(capacityController.allocate(makeJobWithRequiredCapacity(4294967296L, 16), jobId,
+ none) == IJobCapacityController.JobSubmissionStatus.EXECUTE);
+ Assert.assertTrue(capacityController.allocate(makeJobWithRequiredCapacity(2147483648L, 16), jobId,
+ none) == IJobCapacityController.JobSubmissionStatus.QUEUE);
+ Assert.assertTrue(capacityController.allocate(makeJobWithRequiredCapacity(2147483648L, 32), jobId,
+ none) == IJobCapacityController.JobSubmissionStatus.QUEUE);
boolean exceedCapacity = false;
try {
- capacityController.allocate(makeJobWithRequiredCapacity(2147483648L, 64));
+ capacityController.allocate(makeJobWithRequiredCapacity(2147483648L, 64), jobId, none);
} catch (HyracksException e) {
exceedCapacity = e.matches(ErrorCode.JOB_REQUIREMENTS_EXCEED_CAPACITY);
}
Assert.assertTrue(exceedCapacity);
- Assert.assertTrue(capacityController.allocate(
- makeJobWithRequiredCapacity(4294967296L, 32)) == IJobCapacityController.JobSubmissionStatus.QUEUE);
+ Assert.assertTrue(capacityController.allocate(makeJobWithRequiredCapacity(4294967296L, 32), jobId,
+ none) == IJobCapacityController.JobSubmissionStatus.QUEUE);
exceedCapacity = false;
try {
- capacityController.allocate(makeJobWithRequiredCapacity(4294967297L, 33));
+ capacityController.allocate(makeJobWithRequiredCapacity(4294967297L, 33), jobId, none);
} catch (HyracksException e) {
exceedCapacity = e.matches(ErrorCode.JOB_REQUIREMENTS_EXCEED_CAPACITY);
}
@@ -95,4 +103,9 @@
return clusterCapacity;
}
+ private ICCApplication makeCCApp() {
+ ICCApplication ccApp = mock(ICCApplication.class);
+ when(ccApp.acceptingJobs(none)).thenReturn(true);
+ return ccApp;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplication.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplication.java
index 5cc8f69..6c22386 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplication.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplication.java
@@ -18,8 +18,11 @@
*/
package org.apache.hyracks.api.application;
+import java.util.Set;
+
import org.apache.hyracks.api.config.IConfigManager;
import org.apache.hyracks.api.control.IGatekeeper;
+import org.apache.hyracks.api.job.JobFlag;
import org.apache.hyracks.api.job.resource.IJobCapacityController;
import org.apache.hyracks.api.result.IJobResultCallback;
@@ -37,4 +40,9 @@
* @return the job result callback
*/
IJobResultCallback getJobResultCallback();
+
+ /**
+ * @return true if the application is accepting jobs. False, otherwise.
+ */
+ boolean acceptingJobs(Set<JobFlag> jobFlags);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
index be965eb..66f4fab 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
@@ -89,6 +89,66 @@
JobId startJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception;
/**
+ * Used to run a deployed Job Spec by id
+ *
+ * @param deployedJobSpecId
+ * The id of the deployed job spec
+ * @param jobParameters
+ * The serialized job parameters
+ * @throws Exception
+ */
+ JobId startJob(DeployedJobSpecId deployedJobSpecId, Map<byte[], byte[]> jobParameters) throws Exception;
+
+ /**
+ * Start the specified Job.
+ *
+ * @param acggf
+ * Activity Cluster Graph Generator Factory
+ * @param jobFlags
+ * Flags
+ * @throws Exception
+ */
+ JobId startJob(IActivityClusterGraphGeneratorFactory acggf, EnumSet<JobFlag> jobFlags) throws Exception;
+
+ /**
+ * Start the specified Job.
+ *
+ * @param deploymentId
+ * the id of the specific deployment
+ * @param jobSpec
+ * Job Specification
+ * @throws Exception
+ */
+ JobId startJob(DeploymentId deploymentId, JobSpecification jobSpec) throws Exception;
+
+ /**
+ * Start the specified Job.
+ *
+ * @param deploymentId
+ * the id of the specific deployment
+ * @param jobSpec
+ * Job Specification
+ * @param jobFlags
+ * Flags
+ * @throws Exception
+ */
+ JobId startJob(DeploymentId deploymentId, JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception;
+
+ /**
+ * Start the specified Job.
+ *
+ * @param deploymentId
+ * the id of the specific deployment
+ * @param acggf
+ * Activity Cluster Graph Generator Factory
+ * @param jobFlags
+ * Flags
+ * @throws Exception
+ */
+ JobId startJob(DeploymentId deploymentId, IActivityClusterGraphGeneratorFactory acggf, EnumSet<JobFlag> jobFlags)
+ throws Exception;
+
+ /**
* Distribute the specified Job.
*
* @param jobSpec
@@ -118,28 +178,6 @@
void undeployJobSpec(DeployedJobSpecId deployedJobSpecId) throws Exception;
/**
- * Used to run a deployed Job Spec by id
- *
- * @param deployedJobSpecId
- * The id of the deployed job spec
- * @param jobParameters
- * The serialized job parameters
- * @throws Exception
- */
- JobId startJob(DeployedJobSpecId deployedJobSpecId, Map<byte[], byte[]> jobParameters) throws Exception;
-
- /**
- * Start the specified Job.
- *
- * @param acggf
- * Activity Cluster Graph Generator Factory
- * @param jobFlags
- * Flags
- * @throws Exception
- */
- JobId startJob(IActivityClusterGraphGeneratorFactory acggf, EnumSet<JobFlag> jobFlags) throws Exception;
-
- /**
* Gets the IP Address and port for the ResultDirectoryService wrapped in NetworkAddress
*
* @return {@link NetworkAddress}
@@ -196,44 +234,6 @@
void unDeployBinary(DeploymentId deploymentId) throws Exception;
/**
- * Start the specified Job.
- *
- * @param deploymentId
- * the id of the specific deployment
- * @param jobSpec
- * Job Specification
- * @throws Exception
- */
- JobId startJob(DeploymentId deploymentId, JobSpecification jobSpec) throws Exception;
-
- /**
- * Start the specified Job.
- *
- * @param deploymentId
- * the id of the specific deployment
- * @param jobSpec
- * Job Specification
- * @param jobFlags
- * Flags
- * @throws Exception
- */
- JobId startJob(DeploymentId deploymentId, JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception;
-
- /**
- * Start the specified Job.
- *
- * @param deploymentId
- * the id of the specific deployment
- * @param acggf
- * Activity Cluster Graph Generator Factory
- * @param jobFlags
- * Flags
- * @throws Exception
- */
- JobId startJob(DeploymentId deploymentId, IActivityClusterGraphGeneratorFactory acggf, EnumSet<JobFlag> jobFlags)
- throws Exception;
-
- /**
* Shuts down all NCs and then the CC.
*
* @param terminateNCService
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
index be47014..f63e0ea 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
@@ -42,6 +42,8 @@
public JobId startJob(DeployedJobSpecId deployedJobSpecId, Map<byte[], byte[]> jobParameters) throws Exception;
+ public JobId startJob(DeploymentId deploymentId, byte[] acggfBytes, EnumSet<JobFlag> jobFlags) throws Exception;
+
public void cancelJob(JobId jobId) throws Exception;
public DeployedJobSpecId deployJobSpec(byte[] acggfBytes) throws Exception;
@@ -64,8 +66,6 @@
public void unDeployBinary(DeploymentId deploymentId) throws Exception;
- public JobId startJob(DeploymentId deploymentId, byte[] acggfBytes, EnumSet<JobFlag> jobFlags) throws Exception;
-
public JobInfo getJobInfo(JobId jobId) throws Exception;
public void stopCluster(boolean terminateNCService) throws Exception;
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index e46c0ef..766fb26 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -156,6 +156,7 @@
ILLEGAL_STATE(126),
INVALID_STRING_UNICODE(127),
UNSUPPORTED_WRITE_SPEC(128),
+ JOB_REJECTED(129),
// Compilation error codes.
RULECOLLECTION_NOT_INSTANCE_OF_LIST(10000),
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobFlag.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobFlag.java
index 7225cd4..93848a1 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobFlag.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobFlag.java
@@ -20,5 +20,6 @@
public enum JobFlag {
PROFILE_RUNTIME,
- ENFORCE_CONTRACT
+ ENFORCE_CONTRACT,
+ ENSURE_RUNNABLE
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/DefaultJobCapacityController.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/DefaultJobCapacityController.java
index b18bcb1..0bfdb37 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/DefaultJobCapacityController.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/DefaultJobCapacityController.java
@@ -19,6 +19,10 @@
package org.apache.hyracks.api.job.resource;
+import java.util.Set;
+
+import org.apache.hyracks.api.job.JobFlag;
+import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
public class DefaultJobCapacityController implements IJobCapacityController {
@@ -34,7 +38,7 @@
}
@Override
- public JobSubmissionStatus allocate(JobSpecification job) {
+ public JobSubmissionStatus allocate(JobSpecification job, JobId jobId, Set<JobFlag> jobFlags) {
return JobSubmissionStatus.EXECUTE;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IJobCapacityController.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IJobCapacityController.java
index f88baa2..f2c03f6 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IJobCapacityController.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IJobCapacityController.java
@@ -19,7 +19,11 @@
package org.apache.hyracks.api.job.resource;
+import java.util.Set;
+
import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.JobFlag;
+import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
/**
@@ -41,13 +45,14 @@
* Allocates required cluster capacity for a job.
*
* @param job,
- * the job specification.
- * @return EXECUTE, if the job can be executed immediately;
- * QUEUE, if the job cannot be executed
+ * the job specification.
+ * @param jobId
+ * the job id.
+ * @return EXECUTE, if the job can be executed immediately; QUEUE, if the job cannot be executed
* @throws HyracksException
- * if the job's capacity requirement exceeds the maximum capacity of the cluster.
+ * if the job's capacity requirement exceeds the maximum capacity of the cluster.
*/
- JobSubmissionStatus allocate(JobSpecification job) throws HyracksException;
+ JobSubmissionStatus allocate(JobSpecification job, JobId jobId, Set<JobFlag> jobFlags) throws HyracksException;
/**
* Releases cluster capacity for a job when it completes.
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index b3c2d7b..fa52bc6 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -146,6 +146,7 @@
126 = Illegal state. %1$s
127 = Decoding error - %1$s
128 = Unsupported copy to specification: PARTITION BY %1$s, ORDER BY %2$s
+129 = Job %1$s not run. Cluster is not accepting jobs
10000 = The given rule collection %1$s is not an instance of the List class.
10001 = Cannot compose partition constraint %1$s with %2$s
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java
index efd42c9..22e4bba 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java
@@ -19,6 +19,7 @@
package org.apache.hyracks.control.cc;
import java.util.Arrays;
+import java.util.Set;
import org.apache.hyracks.api.application.ICCApplication;
import org.apache.hyracks.api.application.ICCServiceContext;
@@ -26,6 +27,7 @@
import org.apache.hyracks.api.config.IConfigManager;
import org.apache.hyracks.api.config.Section;
import org.apache.hyracks.api.control.IGatekeeper;
+import org.apache.hyracks.api.job.JobFlag;
import org.apache.hyracks.api.job.resource.DefaultJobCapacityController;
import org.apache.hyracks.api.job.resource.IJobCapacityController;
import org.apache.hyracks.api.result.IJobResultCallback;
@@ -125,4 +127,9 @@
// no op
};
}
+
+ @Override
+ public boolean acceptingJobs(Set<JobFlag> flags) {
+ return true;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
index be3daae..7c7111f 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
@@ -123,7 +123,7 @@
JobSpecification job = jobRun.getJobSpecification();
IJobCapacityController.JobSubmissionStatus status;
try {
- status = jobCapacityController.allocate(job);
+ status = jobCapacityController.allocate(job, jobRun.getJobId(), jobRun.getFlags());
CCServiceContext serviceCtx = ccs.getContext();
serviceCtx.notifyJobCreation(jobRun.getJobId(), job, status);
switch (status) {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java
index 38277c2..d003853 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java
@@ -90,7 +90,8 @@
// Cluster maximum capacity can change over time, thus we have to re-check if the job should be rejected
// or not.
try {
- IJobCapacityController.JobSubmissionStatus status = jobCapacityController.allocate(job);
+ IJobCapacityController.JobSubmissionStatus status =
+ jobCapacityController.allocate(job, run.getJobId(), run.getFlags());
// Checks if the job can be executed immediately.
if (status == IJobCapacityController.JobSubmissionStatus.EXECUTE) {
jobRuns.add(run);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java
index 19340d0..3e87bb3 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java
@@ -30,12 +30,14 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.JobFlag;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.api.job.JobStatus;
@@ -57,6 +59,8 @@
public class JobManagerTest {
+ private static final EnumSet<JobFlag> none = EnumSet.noneOf(JobFlag.class);
+
private CCConfig ccConfig;
@Before
@@ -77,7 +81,8 @@
JobRun run = mockJobRun(id);
JobSpecification job = mock(JobSpecification.class);
when(run.getJobSpecification()).thenReturn(job);
- when(jobCapacityController.allocate(job)).thenReturn(IJobCapacityController.JobSubmissionStatus.EXECUTE);
+ when(jobCapacityController.allocate(job, run.getJobId(), none))
+ .thenReturn(IJobCapacityController.JobSubmissionStatus.EXECUTE);
// Submits the job.
acceptedRuns.add(run);
@@ -93,7 +98,8 @@
JobRun run = mockJobRun(id);
JobSpecification job = mock(JobSpecification.class);
when(run.getJobSpecification()).thenReturn(job);
- when(jobCapacityController.allocate(job)).thenReturn(IJobCapacityController.JobSubmissionStatus.QUEUE)
+ when(jobCapacityController.allocate(job, run.getJobId(), none))
+ .thenReturn(IJobCapacityController.JobSubmissionStatus.QUEUE)
.thenReturn(IJobCapacityController.JobSubmissionStatus.EXECUTE);
// Submits the job.
@@ -109,7 +115,8 @@
JobRun run = mockJobRun(8193);
JobSpecification job = mock(JobSpecification.class);
when(run.getJobSpecification()).thenReturn(job);
- when(jobCapacityController.allocate(job)).thenReturn(IJobCapacityController.JobSubmissionStatus.QUEUE)
+ when(jobCapacityController.allocate(job, run.getJobId(), none))
+ .thenReturn(IJobCapacityController.JobSubmissionStatus.QUEUE)
.thenReturn(IJobCapacityController.JobSubmissionStatus.EXECUTE);
jobManager.add(run);
} catch (HyracksException e) {
@@ -149,7 +156,7 @@
JobRun run = mockJobRun(1);
JobSpecification job = mock(JobSpecification.class);
when(run.getJobSpecification()).thenReturn(job);
- when(jobCapacityController.allocate(job))
+ when(jobCapacityController.allocate(job, run.getJobId(), none))
.thenThrow(HyracksException.create(ErrorCode.JOB_REQUIREMENTS_EXCEED_CAPACITY, "1", "0"));
jobManager.add(run);
} catch (HyracksException e) {
@@ -172,14 +179,16 @@
JobRun run1 = mockJobRun(1);
JobSpecification job1 = mock(JobSpecification.class);
when(run1.getJobSpecification()).thenReturn(job1);
- when(jobCapacityController.allocate(job1)).thenReturn(IJobCapacityController.JobSubmissionStatus.EXECUTE);
+ when(jobCapacityController.allocate(job1, run1.getJobId(), none))
+ .thenReturn(IJobCapacityController.JobSubmissionStatus.EXECUTE);
jobManager.add(run1);
// A failure run.
JobRun run2 = mockJobRun(2);
JobSpecification job2 = mock(JobSpecification.class);
when(run2.getJobSpecification()).thenReturn(job2);
- when(jobCapacityController.allocate(job2)).thenReturn(IJobCapacityController.JobSubmissionStatus.QUEUE)
+ when(jobCapacityController.allocate(job2, run2.getJobId(), none))
+ .thenReturn(IJobCapacityController.JobSubmissionStatus.QUEUE)
.thenThrow(HyracksException.create(ErrorCode.JOB_REQUIREMENTS_EXCEED_CAPACITY, "1", "0"));
jobManager.add(run2);
@@ -220,7 +229,8 @@
JobRun run = mockJobRun(id);
JobSpecification job = mock(JobSpecification.class);
when(run.getJobSpecification()).thenReturn(job);
- when(jobCapacityController.allocate(job)).thenReturn(IJobCapacityController.JobSubmissionStatus.EXECUTE);
+ when(jobCapacityController.allocate(job, run.getJobId(), none))
+ .thenReturn(IJobCapacityController.JobSubmissionStatus.EXECUTE);
// Submits the job.
acceptedRuns.add(run);
@@ -236,7 +246,8 @@
JobRun run = mockJobRun(id);
JobSpecification job = mock(JobSpecification.class);
when(run.getJobSpecification()).thenReturn(job);
- when(jobCapacityController.allocate(job)).thenReturn(IJobCapacityController.JobSubmissionStatus.QUEUE)
+ when(jobCapacityController.allocate(job, run.getJobId(), none))
+ .thenReturn(IJobCapacityController.JobSubmissionStatus.QUEUE)
.thenReturn(IJobCapacityController.JobSubmissionStatus.EXECUTE);
// Submits the job.
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
index 7a75a0f..97e1533 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
@@ -23,6 +23,7 @@
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
+import java.util.Set;
import org.apache.commons.io.FileUtils;
import org.apache.hyracks.api.client.IHyracksClientConnection;
@@ -247,7 +248,8 @@
private long maxRAM = Runtime.getRuntime().maxMemory();
@Override
- public JobSubmissionStatus allocate(JobSpecification job) throws HyracksException {
+ public JobSubmissionStatus allocate(JobSpecification job, JobId jobId, Set<JobFlag> jobFlags)
+ throws HyracksException {
return maxRAM > job.getRequiredClusterCapacity().getAggregatedMemoryByteSize()
? JobSubmissionStatus.EXECUTE : JobSubmissionStatus.QUEUE;
}